This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 405171c Rename concurrency to target_partitions (#706)
405171c is described below
commit 405171cd6a8ac65f5f9532f0c8138c11df6cb9d2
Author: Andy Grove <[email protected]>
AuthorDate: Wed Aug 25 13:51:53 2021 -0600
Rename concurrency to target_partitions (#706)
* Rename concurrency to target_partitions and deprecate with_concurrency
* Rename other uses of concurrency
* update comment
* rename default_partitions to target_partitions in comments
---
.../rust/core/src/serde/logical_plan/from_proto.rs | 2 +-
.../core/src/serde/physical_plan/from_proto.rs | 1 -
ballista/rust/core/src/utils.rs | 2 +-
ballista/rust/scheduler/src/lib.rs | 4 +--
ballista/rust/scheduler/src/test_utils.rs | 3 +-
benchmarks/src/bin/nyctaxi.rs | 8 ++---
benchmarks/src/bin/tpch.rs | 35 ++++++++------------
datafusion/benches/sort_limit_query_sql.rs | 2 +-
datafusion/src/datasource/parquet.rs | 14 ++++----
datafusion/src/execution/context.rs | 38 ++++++++++++++--------
datafusion/src/logical_plan/builder.rs | 8 ++---
datafusion/src/physical_optimizer/repartition.rs | 26 +++++++--------
datafusion/src/physical_plan/mod.rs | 4 +--
datafusion/src/physical_plan/parquet.rs | 10 +++---
datafusion/src/physical_plan/planner.rs | 26 ++++++++++-----
datafusion/tests/sql.rs | 8 ++---
datafusion/tests/user_defined_plan.rs | 2 +-
17 files changed, 102 insertions(+), 91 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index ade2cb4..14fba06 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -159,7 +159,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
projection,
24,
&scan.table_name,
- )? //TODO concurrency
+ )? //TODO remove hard-coded max_partitions
.build()
.map_err(|e| e.into())
}
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 46815db..d37940e 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -453,7 +453,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for
&protobuf::PhysicalPlanNode {
}
})
.collect::<Result<Vec<_>, _>>()?;
- // Update concurrency here in the future
Ok(Arc::new(SortExec::try_new(exprs, input)?))
}
PhysicalPlanType::Unresolved(unresolved_shuffle) => {
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index bf0d152..c731e60 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -251,7 +251,7 @@ pub fn create_df_ctx_with_ballista_query_planner(
scheduler_url,
config.clone(),
)))
- .with_concurrency(config.default_shuffle_partitions());
+ .with_target_partitions(config.default_shuffle_partitions());
ExecutionContext::with_config(config)
}
diff --git a/ballista/rust/scheduler/src/lib.rs
b/ballista/rust/scheduler/src/lib.rs
index b476b77..00f2c98 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -513,8 +513,8 @@ impl SchedulerGrpc for SchedulerServer {
/// Create a DataFusion context that is compatible with Ballista
pub fn create_datafusion_context(config: &BallistaConfig) -> ExecutionContext {
- let config =
-
ExecutionConfig::new().with_concurrency(config.default_shuffle_partitions());
+ let config = ExecutionConfig::new()
+ .with_target_partitions(config.default_shuffle_partitions());
ExecutionContext::with_config(config)
}
diff --git a/ballista/rust/scheduler/src/test_utils.rs
b/ballista/rust/scheduler/src/test_utils.rs
index 5b7b685..d197309 100644
--- a/ballista/rust/scheduler/src/test_utils.rs
+++ b/ballista/rust/scheduler/src/test_utils.rs
@@ -27,7 +27,8 @@ pub const TPCH_TABLES: &[&str] = &[
pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
let default_shuffle_partitions = 2;
- let config =
ExecutionConfig::new().with_concurrency(default_shuffle_partitions);
+ let config =
+
ExecutionConfig::new().with_target_partitions(default_shuffle_partitions);
let mut ctx = ExecutionContext::with_config(config);
for table in TPCH_TABLES {
let schema = get_tpch_schema(table);
diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs
index b2a62a0..9387530 100644
--- a/benchmarks/src/bin/nyctaxi.rs
+++ b/benchmarks/src/bin/nyctaxi.rs
@@ -47,9 +47,9 @@ struct Opt {
#[structopt(short = "i", long = "iterations", default_value = "3")]
iterations: usize,
- /// Number of threads for query execution
- #[structopt(short = "c", long = "concurrency", default_value = "2")]
- concurrency: usize,
+ /// Number of partitions to process in parallel
+ #[structopt(short = "p", long = "partitions", default_value = "2")]
+ partitions: usize,
/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
@@ -70,7 +70,7 @@ async fn main() -> Result<()> {
println!("Running benchmarks with the following options: {:?}", opt);
let config = ExecutionConfig::new()
- .with_concurrency(opt.concurrency)
+ .with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size);
let mut ctx = ExecutionContext::with_config(config);
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 50fd40d..852e499 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -81,8 +81,8 @@ struct BallistaBenchmarkOpt {
#[structopt(short = "m", long = "mem-table")]
mem_table: bool,
- /// Number of partitions to create when using MemTable as input
- #[structopt(short = "n", long = "partitions", default_value = "8")]
+ /// Number of partitions to process in parallel
+ #[structopt(short = "p", long = "partitions", default_value = "2")]
partitions: usize,
/// Ballista executor host
@@ -92,10 +92,6 @@ struct BallistaBenchmarkOpt {
/// Ballista executor port
#[structopt(long = "port")]
port: Option<u16>,
-
- /// Number of shuffle partitions
- #[structopt(short, long, default_value = "2")]
- shuffle_partitions: usize,
}
#[derive(Debug, StructOpt, Clone)]
@@ -112,9 +108,9 @@ struct DataFusionBenchmarkOpt {
#[structopt(short = "i", long = "iterations", default_value = "3")]
iterations: usize,
- /// Number of threads to use for parallel execution
- #[structopt(short = "c", long = "concurrency", default_value = "2")]
- concurrency: usize,
+ /// Number of partitions to process in parallel
+ #[structopt(short = "p", long = "partitions", default_value = "2")]
+ partitions: usize,
/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
@@ -131,10 +127,6 @@ struct DataFusionBenchmarkOpt {
/// Load the data into a MemTable before executing the query
#[structopt(short = "m", long = "mem-table")]
mem_table: bool,
-
- /// Number of partitions to create when using MemTable as input
- #[structopt(short = "n", long = "partitions", default_value = "8")]
- partitions: usize,
}
#[derive(Debug, StructOpt)]
@@ -203,7 +195,7 @@ async fn main() -> Result<()> {
async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) ->
Result<Vec<RecordBatch>> {
println!("Running benchmarks with the following options: {:?}", opt);
let config = ExecutionConfig::new()
- .with_concurrency(opt.concurrency)
+ .with_target_partitions(opt.partitions)
.with_batch_size(opt.batch_size);
let mut ctx = ExecutionContext::with_config(config);
@@ -213,7 +205,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt)
-> Result<Vec<RecordB
opt.path.to_str().unwrap(),
table,
opt.file_format.as_str(),
- opt.concurrency,
+ opt.partitions,
)?;
if opt.mem_table {
println!("Loading table '{}' into memory", table);
@@ -257,7 +249,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) ->
Result<()> {
let config = BallistaConfig::builder()
.set(
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
- &format!("{}", opt.shuffle_partitions),
+ &format!("{}", opt.partitions),
)
.build()
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
@@ -451,7 +443,7 @@ fn get_table(
path: &str,
table: &str,
table_format: &str,
- max_concurrency: usize,
+ max_partitions: usize,
) -> Result<Arc<dyn TableProvider>> {
match table_format {
// dbgen creates .tbl ('|' delimited) files without header
@@ -479,7 +471,7 @@ fn get_table(
Ok(Arc::new(ParquetTable::try_new_with_schema(
&path,
schema,
- max_concurrency,
+ max_partitions,
false,
)?))
}
@@ -980,7 +972,7 @@ mod tests {
// Tests running query with empty tables, to see whether they run
succesfully.
let config = ExecutionConfig::new()
- .with_concurrency(1)
+ .with_target_partitions(1)
.with_batch_size(10);
let mut ctx = ExecutionContext::with_config(config);
@@ -1033,12 +1025,11 @@ mod tests {
query: n,
debug: false,
iterations: 1,
- concurrency: 2,
+ partitions: 2,
batch_size: 8192,
path: PathBuf::from(path.to_string()),
file_format: "tbl".to_string(),
mem_table: false,
- partitions: 16,
};
let actual = benchmark_datafusion(opt).await?;
@@ -1074,7 +1065,7 @@ mod tests {
fn round_trip_query(n: usize) -> Result<()> {
let config = ExecutionConfig::new()
- .with_concurrency(1)
+ .with_target_partitions(1)
.with_batch_size(10);
let mut ctx = ExecutionContext::with_config(config);
diff --git a/datafusion/benches/sort_limit_query_sql.rs
b/datafusion/benches/sort_limit_query_sql.rs
index 5a875d3..195bd5c 100644
--- a/datafusion/benches/sort_limit_query_sql.rs
+++ b/datafusion/benches/sort_limit_query_sql.rs
@@ -80,7 +80,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
// create local execution context
let mut ctx = ExecutionContext::new();
- ctx.state.lock().unwrap().config.concurrency = 1;
+ ctx.state.lock().unwrap().config.target_partitions = 1;
ctx.register_table("aggregate_test_100", Arc::new(mem_table))
.unwrap();
ctx_holder.lock().unwrap().push(Arc::new(Mutex::new(ctx)))
diff --git a/datafusion/src/datasource/parquet.rs
b/datafusion/src/datasource/parquet.rs
index d312c77..a9b4c0f 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -37,13 +37,13 @@ pub struct ParquetTable {
path: String,
schema: SchemaRef,
statistics: Statistics,
- max_concurrency: usize,
+ max_partitions: usize,
enable_pruning: bool,
}
impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
- pub fn try_new(path: impl Into<String>, max_concurrency: usize) ->
Result<Self> {
+ pub fn try_new(path: impl Into<String>, max_partitions: usize) ->
Result<Self> {
let path = path.into();
let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1,
None)?;
let schema = parquet_exec.schema();
@@ -51,7 +51,7 @@ impl ParquetTable {
path,
schema,
statistics: parquet_exec.statistics().to_owned(),
- max_concurrency,
+ max_partitions,
enable_pruning: true,
})
}
@@ -61,7 +61,7 @@ impl ParquetTable {
pub fn try_new_with_schema(
path: impl Into<String>,
schema: Schema,
- max_concurrency: usize,
+ max_partitions: usize,
collect_statistics: bool,
) -> Result<Self> {
let path = path.into();
@@ -71,7 +71,7 @@ impl ParquetTable {
path,
schema: Arc::new(schema),
statistics: parquet_exec.statistics().to_owned(),
- max_concurrency,
+ max_partitions,
enable_pruning: true,
})
} else {
@@ -79,7 +79,7 @@ impl ParquetTable {
path,
schema: Arc::new(schema),
statistics: Statistics::default(),
- max_concurrency,
+ max_partitions,
enable_pruning: true,
})
}
@@ -143,7 +143,7 @@ impl TableProvider for ParquetTable {
limit
.map(|l| std::cmp::min(l, batch_size))
.unwrap_or(batch_size),
- self.max_concurrency,
+ self.max_partitions,
limit,
)?))
}
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index 7a54bea..2e6a7a4 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -291,7 +291,7 @@ impl ExecutionContext {
&LogicalPlanBuilder::scan_parquet(
filename,
None,
- self.state.lock().unwrap().config.concurrency,
+ self.state.lock().unwrap().config.target_partitions,
)?
.build()?,
)))
@@ -325,7 +325,7 @@ impl ExecutionContext {
pub fn register_parquet(&mut self, name: &str, filename: &str) ->
Result<()> {
let table = {
let m = self.state.lock().unwrap();
- ParquetTable::try_new(filename, m.config.concurrency)?
+ ParquetTable::try_new(filename, m.config.target_partitions)?
.with_enable_pruning(m.config.parquet_pruning)
};
self.register_table(name, Arc::new(table))?;
@@ -645,8 +645,8 @@ impl QueryPlanner for DefaultQueryPlanner {
/// Configuration options for execution context
#[derive(Clone)]
pub struct ExecutionConfig {
- /// Number of concurrent threads for query execution.
- pub concurrency: usize,
+ /// Number of partitions for query execution. Increasing partitions can
increase concurrency.
+ pub target_partitions: usize,
/// Default batch size when reading data sources
pub batch_size: usize,
/// Responsible for optimizing a logical plan
@@ -665,13 +665,13 @@ pub struct ExecutionConfig {
/// virtual tables for displaying schema information
information_schema: bool,
/// Should DataFusion repartition data using the join keys to execute
joins in parallel
- /// using the provided `concurrency` level
+ /// using the provided `target_partitions` level
pub repartition_joins: bool,
/// Should DataFusion repartition data using the aggregate keys to execute
aggregates in parallel
- /// using the provided `concurrency` level
+ /// using the provided `target_partitions` level
pub repartition_aggregations: bool,
/// Should DataFusion repartition data using the partition keys to execute
window functions in
- /// parallel using the provided `concurrency` level
+ /// parallel using the provided `target_partitions` level
pub repartition_windows: bool,
/// Should Datafusion parquet reader using the predicate to prune data
parquet_pruning: bool,
@@ -680,7 +680,7 @@ pub struct ExecutionConfig {
impl Default for ExecutionConfig {
fn default() -> Self {
Self {
- concurrency: num_cpus::get(),
+ target_partitions: num_cpus::get(),
batch_size: 8192,
optimizers: vec![
Arc::new(ConstantFolding::new()),
@@ -716,11 +716,20 @@ impl ExecutionConfig {
Default::default()
}
- /// Customize max_concurrency
- pub fn with_concurrency(mut self, n: usize) -> Self {
- // concurrency must be greater than zero
+ /// Deprecated. Use with_target_partitions instead.
+ #[deprecated(
+ since = "5.1.0",
+ note = "This method is deprecated in favor of
`with_target_partitions`."
+ )]
+ pub fn with_concurrency(self, n: usize) -> Self {
+ self.with_target_partitions(n)
+ }
+
+ /// Customize target_partitions
+ pub fn with_target_partitions(mut self, n: usize) -> Self {
+ // partition count must be greater than zero
assert!(n > 0);
- self.concurrency = n;
+ self.target_partitions = n;
self
}
@@ -3749,8 +3758,9 @@ mod tests {
/// Generate a partitioned CSV file and register it with an execution
context
fn create_ctx(tmp_dir: &TempDir, partition_count: usize) ->
Result<ExecutionContext> {
- let mut ctx =
-
ExecutionContext::with_config(ExecutionConfig::new().with_concurrency(8));
+ let mut ctx = ExecutionContext::with_config(
+ ExecutionConfig::new().with_target_partitions(8),
+ );
let schema = populate_csv_partitions(tmp_dir, partition_count,
".csv")?;
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index d9afe2e..f31dd37 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -137,20 +137,20 @@ impl LogicalPlanBuilder {
pub fn scan_parquet(
path: impl Into<String>,
projection: Option<Vec<usize>>,
- max_concurrency: usize,
+ max_partitions: usize,
) -> Result<Self> {
let path = path.into();
- Self::scan_parquet_with_name(path.clone(), projection,
max_concurrency, path)
+ Self::scan_parquet_with_name(path.clone(), projection, max_partitions,
path)
}
/// Scan a Parquet data source and register it with a given table name
pub fn scan_parquet_with_name(
path: impl Into<String>,
projection: Option<Vec<usize>>,
- max_concurrency: usize,
+ max_partitions: usize,
table_name: impl Into<String>,
) -> Result<Self> {
- let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?);
+ let provider = Arc::new(ParquetTable::try_new(path, max_partitions)?);
Self::scan(table_name, provider, projection)
}
diff --git a/datafusion/src/physical_optimizer/repartition.rs
b/datafusion/src/physical_optimizer/repartition.rs
index 984b234..a983af4 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -35,8 +35,8 @@ impl Repartition {
}
}
-fn optimize_concurrency(
- concurrency: usize,
+fn optimize_partitions(
+ target_partitions: usize,
requires_single_partition: bool,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
@@ -50,8 +50,8 @@ fn optimize_concurrency(
.children()
.iter()
.map(|child| {
- optimize_concurrency(
- concurrency,
+ optimize_partitions(
+ target_partitions,
matches!(
plan.required_child_distribution(),
Distribution::SinglePartition
@@ -64,9 +64,9 @@ fn optimize_concurrency(
};
let perform_repartition = match new_plan.output_partitioning() {
- // Apply when underlying node has less than `self.concurrency` amount
of concurrency
- RoundRobinBatch(x) => x < concurrency,
- UnknownPartitioning(x) => x < concurrency,
+ // Apply when underlying node has less than `self.target_partitions`
amount of concurrency
+ RoundRobinBatch(x) => x < target_partitions,
+ UnknownPartitioning(x) => x < target_partitions,
// we don't want to introduce partitioning after hash partitioning
// as the plan will likely depend on this
Hash(_, _) => false,
@@ -79,7 +79,7 @@ fn optimize_concurrency(
if perform_repartition && !requires_single_partition && !is_empty_exec {
Ok(Arc::new(RepartitionExec::try_new(
new_plan,
- RoundRobinBatch(concurrency),
+ RoundRobinBatch(target_partitions),
)?))
} else {
Ok(new_plan)
@@ -92,11 +92,11 @@ impl PhysicalOptimizerRule for Repartition {
plan: Arc<dyn ExecutionPlan>,
config: &ExecutionConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
- // Don't run optimizer if concurrency == 1
- if config.concurrency == 1 {
+ // Don't run optimizer if target_partitions == 1
+ if config.target_partitions == 1 {
Ok(plan)
} else {
- optimize_concurrency(config.concurrency, true, plan)
+ optimize_partitions(config.target_partitions, true, plan)
}
}
@@ -139,7 +139,7 @@ mod tests {
let optimized = optimizer.optimize(
Arc::new(parquet_project),
- &ExecutionConfig::new().with_concurrency(10),
+ &ExecutionConfig::new().with_target_partitions(10),
)?;
assert_eq!(
@@ -180,7 +180,7 @@ mod tests {
let optimized = optimizer.optimize(
Arc::new(parquet_project),
- &ExecutionConfig::new().with_concurrency(10),
+ &ExecutionConfig::new().with_target_partitions(10),
)?;
// RepartitionExec is added to deepest node
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index 82c297c..b7a31fa 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -160,9 +160,9 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// use datafusion::prelude::*;
/// use datafusion::physical_plan::displayable;
///
-/// // Hard code concurrency as it appears in the RepartitionExec output
+/// // Hard code target_partitions as it appears in the RepartitionExec output
/// let config = ExecutionConfig::new()
-/// .with_concurrency(3);
+/// .with_target_partitions(3);
/// let mut ctx = ExecutionContext::with_config(config);
///
/// // register the a table
diff --git a/datafusion/src/physical_plan/parquet.rs
b/datafusion/src/physical_plan/parquet.rs
index f85495c..ac1655b 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -118,7 +118,7 @@ impl ParquetExec {
projection: Option<Vec<usize>>,
predicate: Option<Expr>,
batch_size: usize,
- max_concurrency: usize,
+ max_partitions: usize,
limit: Option<usize>,
) -> Result<Self> {
// build a list of filenames from the specified path, which could be a
single file or
@@ -139,7 +139,7 @@ impl ParquetExec {
projection,
predicate,
batch_size,
- max_concurrency,
+ max_partitions,
limit,
)
}
@@ -152,7 +152,7 @@ impl ParquetExec {
projection: Option<Vec<usize>>,
predicate: Option<Expr>,
batch_size: usize,
- max_concurrency: usize,
+ max_partitions: usize,
limit: Option<usize>,
) -> Result<Self> {
debug!("Creating ParquetExec, filenames: {:?}, projection {:?},
predicate: {:?}, limit: {:?}",
@@ -161,9 +161,9 @@ impl ParquetExec {
// used in this data set
let metrics = ExecutionPlanMetricsSet::new();
let mut schemas: Vec<Schema> = vec![];
- let mut partitions = Vec::with_capacity(max_concurrency);
+ let mut partitions = Vec::with_capacity(max_partitions);
let filenames: Vec<String> = filenames.iter().map(|s|
s.to_string()).collect();
- let chunks = split_files(&filenames, max_concurrency);
+ let chunks = split_files(&filenames, max_partitions);
let mut num_rows = 0;
let mut num_fields = 0;
let mut fields = Vec::new();
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index 02ab15d..dda0b66 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -326,7 +326,7 @@ impl DefaultPhysicalPlanner {
let partition_keys =
window_expr_common_partition_keys(window_expr)?;
let can_repartition = !partition_keys.is_empty()
- && ctx_state.config.concurrency > 1
+ && ctx_state.config.target_partitions > 1
&& ctx_state.config.repartition_windows;
let input_exec = if can_repartition {
@@ -343,7 +343,10 @@ impl DefaultPhysicalPlanner {
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
Arc::new(RepartitionExec::try_new(
input_exec,
- Partitioning::Hash(partition_keys,
ctx_state.config.concurrency),
+ Partitioning::Hash(
+ partition_keys,
+ ctx_state.config.target_partitions,
+ ),
)?)
} else {
input_exec
@@ -477,7 +480,7 @@ impl DefaultPhysicalPlanner {
.any(|x| matches!(x, DataType::Dictionary(_, _)));
let can_repartition = !groups.is_empty()
- && ctx_state.config.concurrency > 1
+ && ctx_state.config.target_partitions > 1
&& ctx_state.config.repartition_aggregations
&& !contains_dict;
@@ -490,7 +493,7 @@ impl DefaultPhysicalPlanner {
initial_aggr,
Partitioning::Hash(
final_group.clone(),
- ctx_state.config.concurrency,
+ ctx_state.config.target_partitions,
),
)?);
// Combine hash aggregates within the partition
@@ -668,7 +671,8 @@ impl DefaultPhysicalPlanner {
})
.collect::<Result<hash_utils::JoinOn>>()?;
- if ctx_state.config.concurrency > 1 &&
ctx_state.config.repartition_joins
+ if ctx_state.config.target_partitions > 1
+ && ctx_state.config.repartition_joins
{
let (left_expr, right_expr) = join_on
.iter()
@@ -684,11 +688,17 @@ impl DefaultPhysicalPlanner {
Ok(Arc::new(HashJoinExec::try_new(
Arc::new(RepartitionExec::try_new(
physical_left,
- Partitioning::Hash(left_expr,
ctx_state.config.concurrency),
+ Partitioning::Hash(
+ left_expr,
+ ctx_state.config.target_partitions,
+ ),
)?),
Arc::new(RepartitionExec::try_new(
physical_right,
- Partitioning::Hash(right_expr,
ctx_state.config.concurrency),
+ Partitioning::Hash(
+ right_expr,
+ ctx_state.config.target_partitions,
+ ),
)?),
join_on,
join_type,
@@ -1394,7 +1404,7 @@ mod tests {
fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
let mut ctx_state = make_ctx_state();
- ctx_state.config.concurrency = 4;
+ ctx_state.config.target_partitions = 4;
let planner = DefaultPhysicalPlanner::default();
planner.create_physical_plan(logical_plan, &ctx_state)
}
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 1c22cc3..a2e84e1 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -4169,8 +4169,8 @@ async fn test_cast_expressions_error() -> Result<()> {
#[tokio::test]
async fn test_physical_plan_display_indent() {
- // Hard code concurrency as it appears in the RepartitionExec output
- let config = ExecutionConfig::new().with_concurrency(3);
+ // Hard code target_partitions as it appears in the RepartitionExec output
+ let config = ExecutionConfig::new().with_target_partitions(3);
let mut ctx = ExecutionContext::with_config(config);
register_aggregate_csv(&mut ctx).unwrap();
let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \
@@ -4215,8 +4215,8 @@ async fn test_physical_plan_display_indent() {
#[tokio::test]
async fn test_physical_plan_display_indent_multi_children() {
- // Hard code concurrency as it appears in the RepartitionExec output
- let config = ExecutionConfig::new().with_concurrency(3);
+ // Hard code target_partitions as it appears in the RepartitionExec output
+ let config = ExecutionConfig::new().with_target_partitions(3);
let mut ctx = ExecutionContext::with_config(config);
// ensure indenting works for nodes with multiple children
register_aggregate_csv(&mut ctx).unwrap();
diff --git a/datafusion/tests/user_defined_plan.rs
b/datafusion/tests/user_defined_plan.rs
index c1269d9..dfcdcf5 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -192,7 +192,7 @@ async fn topk_plan() -> Result<()> {
fn make_topk_context() -> ExecutionContext {
let config = ExecutionConfig::new()
.with_query_planner(Arc::new(TopKQueryPlanner {}))
- .with_concurrency(48)
+ .with_target_partitions(48)
.add_optimizer_rule(Arc::new(TopKOptimizerRule {}));
ExecutionContext::with_config(config)