This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 405171c  Rename concurrency to target_partitions (#706)
405171c is described below

commit 405171cd6a8ac65f5f9532f0c8138c11df6cb9d2
Author: Andy Grove <[email protected]>
AuthorDate: Wed Aug 25 13:51:53 2021 -0600

    Rename concurrency to target_partitions (#706)
    
    * Rename concurrency to target_partitions and deprecate with_concurrency
    
    * Rename other uses of concurrency
    
    * update comment
    
    * rename default_partitions to target_partitions in comments
---
 .../rust/core/src/serde/logical_plan/from_proto.rs |  2 +-
 .../core/src/serde/physical_plan/from_proto.rs     |  1 -
 ballista/rust/core/src/utils.rs                    |  2 +-
 ballista/rust/scheduler/src/lib.rs                 |  4 +--
 ballista/rust/scheduler/src/test_utils.rs          |  3 +-
 benchmarks/src/bin/nyctaxi.rs                      |  8 ++---
 benchmarks/src/bin/tpch.rs                         | 35 ++++++++------------
 datafusion/benches/sort_limit_query_sql.rs         |  2 +-
 datafusion/src/datasource/parquet.rs               | 14 ++++----
 datafusion/src/execution/context.rs                | 38 ++++++++++++++--------
 datafusion/src/logical_plan/builder.rs             |  8 ++---
 datafusion/src/physical_optimizer/repartition.rs   | 26 +++++++--------
 datafusion/src/physical_plan/mod.rs                |  4 +--
 datafusion/src/physical_plan/parquet.rs            | 10 +++---
 datafusion/src/physical_plan/planner.rs            | 26 ++++++++++-----
 datafusion/tests/sql.rs                            |  8 ++---
 datafusion/tests/user_defined_plan.rs              |  2 +-
 17 files changed, 102 insertions(+), 91 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index ade2cb4..14fba06 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -159,7 +159,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                     projection,
                     24,
                     &scan.table_name,
-                )? //TODO concurrency
+                )? //TODO remove hard-coded max_partitions
                 .build()
                 .map_err(|e| e.into())
             }
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 46815db..d37940e 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -453,7 +453,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for 
&protobuf::PhysicalPlanNode {
                         }
                     })
                     .collect::<Result<Vec<_>, _>>()?;
-                // Update concurrency here in the future
                 Ok(Arc::new(SortExec::try_new(exprs, input)?))
             }
             PhysicalPlanType::Unresolved(unresolved_shuffle) => {
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index bf0d152..c731e60 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -251,7 +251,7 @@ pub fn create_df_ctx_with_ballista_query_planner(
             scheduler_url,
             config.clone(),
         )))
-        .with_concurrency(config.default_shuffle_partitions());
+        .with_target_partitions(config.default_shuffle_partitions());
     ExecutionContext::with_config(config)
 }
 
diff --git a/ballista/rust/scheduler/src/lib.rs 
b/ballista/rust/scheduler/src/lib.rs
index b476b77..00f2c98 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -513,8 +513,8 @@ impl SchedulerGrpc for SchedulerServer {
 
 /// Create a DataFusion context that is compatible with Ballista
 pub fn create_datafusion_context(config: &BallistaConfig) -> ExecutionContext {
-    let config =
-        
ExecutionConfig::new().with_concurrency(config.default_shuffle_partitions());
+    let config = ExecutionConfig::new()
+        .with_target_partitions(config.default_shuffle_partitions());
     ExecutionContext::with_config(config)
 }
 
diff --git a/ballista/rust/scheduler/src/test_utils.rs 
b/ballista/rust/scheduler/src/test_utils.rs
index 5b7b685..d197309 100644
--- a/ballista/rust/scheduler/src/test_utils.rs
+++ b/ballista/rust/scheduler/src/test_utils.rs
@@ -27,7 +27,8 @@ pub const TPCH_TABLES: &[&str] = &[
 
 pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
     let default_shuffle_partitions = 2;
-    let config = 
ExecutionConfig::new().with_concurrency(default_shuffle_partitions);
+    let config =
+        
ExecutionConfig::new().with_target_partitions(default_shuffle_partitions);
     let mut ctx = ExecutionContext::with_config(config);
     for table in TPCH_TABLES {
         let schema = get_tpch_schema(table);
diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs
index b2a62a0..9387530 100644
--- a/benchmarks/src/bin/nyctaxi.rs
+++ b/benchmarks/src/bin/nyctaxi.rs
@@ -47,9 +47,9 @@ struct Opt {
     #[structopt(short = "i", long = "iterations", default_value = "3")]
     iterations: usize,
 
-    /// Number of threads for query execution
-    #[structopt(short = "c", long = "concurrency", default_value = "2")]
-    concurrency: usize,
+    /// Number of partitions to process in parallel
+    #[structopt(short = "p", long = "partitions", default_value = "2")]
+    partitions: usize,
 
     /// Batch size when reading CSV or Parquet files
     #[structopt(short = "s", long = "batch-size", default_value = "8192")]
@@ -70,7 +70,7 @@ async fn main() -> Result<()> {
     println!("Running benchmarks with the following options: {:?}", opt);
 
     let config = ExecutionConfig::new()
-        .with_concurrency(opt.concurrency)
+        .with_target_partitions(opt.partitions)
         .with_batch_size(opt.batch_size);
     let mut ctx = ExecutionContext::with_config(config);
 
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 50fd40d..852e499 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -81,8 +81,8 @@ struct BallistaBenchmarkOpt {
     #[structopt(short = "m", long = "mem-table")]
     mem_table: bool,
 
-    /// Number of partitions to create when using MemTable as input
-    #[structopt(short = "n", long = "partitions", default_value = "8")]
+    /// Number of partitions to process in parallel
+    #[structopt(short = "p", long = "partitions", default_value = "2")]
     partitions: usize,
 
     /// Ballista executor host
@@ -92,10 +92,6 @@ struct BallistaBenchmarkOpt {
     /// Ballista executor port
     #[structopt(long = "port")]
     port: Option<u16>,
-
-    /// Number of shuffle partitions
-    #[structopt(short, long, default_value = "2")]
-    shuffle_partitions: usize,
 }
 
 #[derive(Debug, StructOpt, Clone)]
@@ -112,9 +108,9 @@ struct DataFusionBenchmarkOpt {
     #[structopt(short = "i", long = "iterations", default_value = "3")]
     iterations: usize,
 
-    /// Number of threads to use for parallel execution
-    #[structopt(short = "c", long = "concurrency", default_value = "2")]
-    concurrency: usize,
+    /// Number of partitions to process in parallel
+    #[structopt(short = "p", long = "partitions", default_value = "2")]
+    partitions: usize,
 
     /// Batch size when reading CSV or Parquet files
     #[structopt(short = "s", long = "batch-size", default_value = "8192")]
@@ -131,10 +127,6 @@ struct DataFusionBenchmarkOpt {
     /// Load the data into a MemTable before executing the query
     #[structopt(short = "m", long = "mem-table")]
     mem_table: bool,
-
-    /// Number of partitions to create when using MemTable as input
-    #[structopt(short = "n", long = "partitions", default_value = "8")]
-    partitions: usize,
 }
 
 #[derive(Debug, StructOpt)]
@@ -203,7 +195,7 @@ async fn main() -> Result<()> {
 async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> 
Result<Vec<RecordBatch>> {
     println!("Running benchmarks with the following options: {:?}", opt);
     let config = ExecutionConfig::new()
-        .with_concurrency(opt.concurrency)
+        .with_target_partitions(opt.partitions)
         .with_batch_size(opt.batch_size);
     let mut ctx = ExecutionContext::with_config(config);
 
@@ -213,7 +205,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) 
-> Result<Vec<RecordB
             opt.path.to_str().unwrap(),
             table,
             opt.file_format.as_str(),
-            opt.concurrency,
+            opt.partitions,
         )?;
         if opt.mem_table {
             println!("Loading table '{}' into memory", table);
@@ -257,7 +249,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> 
Result<()> {
     let config = BallistaConfig::builder()
         .set(
             BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
-            &format!("{}", opt.shuffle_partitions),
+            &format!("{}", opt.partitions),
         )
         .build()
         .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
@@ -451,7 +443,7 @@ fn get_table(
     path: &str,
     table: &str,
     table_format: &str,
-    max_concurrency: usize,
+    max_partitions: usize,
 ) -> Result<Arc<dyn TableProvider>> {
     match table_format {
         // dbgen creates .tbl ('|' delimited) files without header
@@ -479,7 +471,7 @@ fn get_table(
             Ok(Arc::new(ParquetTable::try_new_with_schema(
                 &path,
                 schema,
-                max_concurrency,
+                max_partitions,
                 false,
             )?))
         }
@@ -980,7 +972,7 @@ mod tests {
         // Tests running query with empty tables, to see whether they run 
succesfully.
 
         let config = ExecutionConfig::new()
-            .with_concurrency(1)
+            .with_target_partitions(1)
             .with_batch_size(10);
         let mut ctx = ExecutionContext::with_config(config);
 
@@ -1033,12 +1025,11 @@ mod tests {
                 query: n,
                 debug: false,
                 iterations: 1,
-                concurrency: 2,
+                partitions: 2,
                 batch_size: 8192,
                 path: PathBuf::from(path.to_string()),
                 file_format: "tbl".to_string(),
                 mem_table: false,
-                partitions: 16,
             };
             let actual = benchmark_datafusion(opt).await?;
 
@@ -1074,7 +1065,7 @@ mod tests {
 
         fn round_trip_query(n: usize) -> Result<()> {
             let config = ExecutionConfig::new()
-                .with_concurrency(1)
+                .with_target_partitions(1)
                 .with_batch_size(10);
             let mut ctx = ExecutionContext::with_config(config);
 
diff --git a/datafusion/benches/sort_limit_query_sql.rs 
b/datafusion/benches/sort_limit_query_sql.rs
index 5a875d3..195bd5c 100644
--- a/datafusion/benches/sort_limit_query_sql.rs
+++ b/datafusion/benches/sort_limit_query_sql.rs
@@ -80,7 +80,7 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
 
         // create local execution context
         let mut ctx = ExecutionContext::new();
-        ctx.state.lock().unwrap().config.concurrency = 1;
+        ctx.state.lock().unwrap().config.target_partitions = 1;
         ctx.register_table("aggregate_test_100", Arc::new(mem_table))
             .unwrap();
         ctx_holder.lock().unwrap().push(Arc::new(Mutex::new(ctx)))
diff --git a/datafusion/src/datasource/parquet.rs 
b/datafusion/src/datasource/parquet.rs
index d312c77..a9b4c0f 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -37,13 +37,13 @@ pub struct ParquetTable {
     path: String,
     schema: SchemaRef,
     statistics: Statistics,
-    max_concurrency: usize,
+    max_partitions: usize,
     enable_pruning: bool,
 }
 
 impl ParquetTable {
     /// Attempt to initialize a new `ParquetTable` from a file path.
-    pub fn try_new(path: impl Into<String>, max_concurrency: usize) -> 
Result<Self> {
+    pub fn try_new(path: impl Into<String>, max_partitions: usize) -> 
Result<Self> {
         let path = path.into();
         let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, 
None)?;
         let schema = parquet_exec.schema();
@@ -51,7 +51,7 @@ impl ParquetTable {
             path,
             schema,
             statistics: parquet_exec.statistics().to_owned(),
-            max_concurrency,
+            max_partitions,
             enable_pruning: true,
         })
     }
@@ -61,7 +61,7 @@ impl ParquetTable {
     pub fn try_new_with_schema(
         path: impl Into<String>,
         schema: Schema,
-        max_concurrency: usize,
+        max_partitions: usize,
         collect_statistics: bool,
     ) -> Result<Self> {
         let path = path.into();
@@ -71,7 +71,7 @@ impl ParquetTable {
                 path,
                 schema: Arc::new(schema),
                 statistics: parquet_exec.statistics().to_owned(),
-                max_concurrency,
+                max_partitions,
                 enable_pruning: true,
             })
         } else {
@@ -79,7 +79,7 @@ impl ParquetTable {
                 path,
                 schema: Arc::new(schema),
                 statistics: Statistics::default(),
-                max_concurrency,
+                max_partitions,
                 enable_pruning: true,
             })
         }
@@ -143,7 +143,7 @@ impl TableProvider for ParquetTable {
             limit
                 .map(|l| std::cmp::min(l, batch_size))
                 .unwrap_or(batch_size),
-            self.max_concurrency,
+            self.max_partitions,
             limit,
         )?))
     }
diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index 7a54bea..2e6a7a4 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -291,7 +291,7 @@ impl ExecutionContext {
             &LogicalPlanBuilder::scan_parquet(
                 filename,
                 None,
-                self.state.lock().unwrap().config.concurrency,
+                self.state.lock().unwrap().config.target_partitions,
             )?
             .build()?,
         )))
@@ -325,7 +325,7 @@ impl ExecutionContext {
     pub fn register_parquet(&mut self, name: &str, filename: &str) -> 
Result<()> {
         let table = {
             let m = self.state.lock().unwrap();
-            ParquetTable::try_new(filename, m.config.concurrency)?
+            ParquetTable::try_new(filename, m.config.target_partitions)?
                 .with_enable_pruning(m.config.parquet_pruning)
         };
         self.register_table(name, Arc::new(table))?;
@@ -645,8 +645,8 @@ impl QueryPlanner for DefaultQueryPlanner {
 /// Configuration options for execution context
 #[derive(Clone)]
 pub struct ExecutionConfig {
-    /// Number of concurrent threads for query execution.
-    pub concurrency: usize,
+    /// Number of partitions for query execution. Increasing partitions can 
increase concurrency.
+    pub target_partitions: usize,
     /// Default batch size when reading data sources
     pub batch_size: usize,
     /// Responsible for optimizing a logical plan
@@ -665,13 +665,13 @@ pub struct ExecutionConfig {
     /// virtual tables for displaying schema information
     information_schema: bool,
     /// Should DataFusion repartition data using the join keys to execute 
joins in parallel
-    /// using the provided `concurrency` level
+    /// using the provided `target_partitions` level
     pub repartition_joins: bool,
     /// Should DataFusion repartition data using the aggregate keys to execute 
aggregates in parallel
-    /// using the provided `concurrency` level
+    /// using the provided `target_partitions` level
     pub repartition_aggregations: bool,
     /// Should DataFusion repartition data using the partition keys to execute 
window functions in
-    /// parallel using the provided `concurrency` level
+    /// parallel using the provided `target_partitions` level
     pub repartition_windows: bool,
     /// Should Datafusion parquet reader using the predicate to prune data
     parquet_pruning: bool,
@@ -680,7 +680,7 @@ pub struct ExecutionConfig {
 impl Default for ExecutionConfig {
     fn default() -> Self {
         Self {
-            concurrency: num_cpus::get(),
+            target_partitions: num_cpus::get(),
             batch_size: 8192,
             optimizers: vec![
                 Arc::new(ConstantFolding::new()),
@@ -716,11 +716,20 @@ impl ExecutionConfig {
         Default::default()
     }
 
-    /// Customize max_concurrency
-    pub fn with_concurrency(mut self, n: usize) -> Self {
-        // concurrency must be greater than zero
+    /// Deprecated. Use with_target_partitions instead.
+    #[deprecated(
+        since = "5.1.0",
+        note = "This method is deprecated in favor of 
`with_target_partitions`."
+    )]
+    pub fn with_concurrency(self, n: usize) -> Self {
+        self.with_target_partitions(n)
+    }
+
+    /// Customize target_partitions
+    pub fn with_target_partitions(mut self, n: usize) -> Self {
+        // partition count must be greater than zero
         assert!(n > 0);
-        self.concurrency = n;
+        self.target_partitions = n;
         self
     }
 
@@ -3749,8 +3758,9 @@ mod tests {
 
     /// Generate a partitioned CSV file and register it with an execution 
context
     fn create_ctx(tmp_dir: &TempDir, partition_count: usize) -> 
Result<ExecutionContext> {
-        let mut ctx =
-            
ExecutionContext::with_config(ExecutionConfig::new().with_concurrency(8));
+        let mut ctx = ExecutionContext::with_config(
+            ExecutionConfig::new().with_target_partitions(8),
+        );
 
         let schema = populate_csv_partitions(tmp_dir, partition_count, 
".csv")?;
 
diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index d9afe2e..f31dd37 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -137,20 +137,20 @@ impl LogicalPlanBuilder {
     pub fn scan_parquet(
         path: impl Into<String>,
         projection: Option<Vec<usize>>,
-        max_concurrency: usize,
+        max_partitions: usize,
     ) -> Result<Self> {
         let path = path.into();
-        Self::scan_parquet_with_name(path.clone(), projection, 
max_concurrency, path)
+        Self::scan_parquet_with_name(path.clone(), projection, max_partitions, 
path)
     }
 
     /// Scan a Parquet data source and register it with a given table name
     pub fn scan_parquet_with_name(
         path: impl Into<String>,
         projection: Option<Vec<usize>>,
-        max_concurrency: usize,
+        max_partitions: usize,
         table_name: impl Into<String>,
     ) -> Result<Self> {
-        let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?);
+        let provider = Arc::new(ParquetTable::try_new(path, max_partitions)?);
         Self::scan(table_name, provider, projection)
     }
 
diff --git a/datafusion/src/physical_optimizer/repartition.rs 
b/datafusion/src/physical_optimizer/repartition.rs
index 984b234..a983af4 100644
--- a/datafusion/src/physical_optimizer/repartition.rs
+++ b/datafusion/src/physical_optimizer/repartition.rs
@@ -35,8 +35,8 @@ impl Repartition {
     }
 }
 
-fn optimize_concurrency(
-    concurrency: usize,
+fn optimize_partitions(
+    target_partitions: usize,
     requires_single_partition: bool,
     plan: Arc<dyn ExecutionPlan>,
 ) -> Result<Arc<dyn ExecutionPlan>> {
@@ -50,8 +50,8 @@ fn optimize_concurrency(
             .children()
             .iter()
             .map(|child| {
-                optimize_concurrency(
-                    concurrency,
+                optimize_partitions(
+                    target_partitions,
                     matches!(
                         plan.required_child_distribution(),
                         Distribution::SinglePartition
@@ -64,9 +64,9 @@ fn optimize_concurrency(
     };
 
     let perform_repartition = match new_plan.output_partitioning() {
-        // Apply when underlying node has less than `self.concurrency` amount 
of concurrency
-        RoundRobinBatch(x) => x < concurrency,
-        UnknownPartitioning(x) => x < concurrency,
+        // Apply when underlying node has less than `self.target_partitions` 
amount of concurrency
+        RoundRobinBatch(x) => x < target_partitions,
+        UnknownPartitioning(x) => x < target_partitions,
         // we don't want to introduce partitioning after hash partitioning
         // as the plan will likely depend on this
         Hash(_, _) => false,
@@ -79,7 +79,7 @@ fn optimize_concurrency(
     if perform_repartition && !requires_single_partition && !is_empty_exec {
         Ok(Arc::new(RepartitionExec::try_new(
             new_plan,
-            RoundRobinBatch(concurrency),
+            RoundRobinBatch(target_partitions),
         )?))
     } else {
         Ok(new_plan)
@@ -92,11 +92,11 @@ impl PhysicalOptimizerRule for Repartition {
         plan: Arc<dyn ExecutionPlan>,
         config: &ExecutionConfig,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        // Don't run optimizer if concurrency == 1
-        if config.concurrency == 1 {
+        // Don't run optimizer if target_partitions == 1
+        if config.target_partitions == 1 {
             Ok(plan)
         } else {
-            optimize_concurrency(config.concurrency, true, plan)
+            optimize_partitions(config.target_partitions, true, plan)
         }
     }
 
@@ -139,7 +139,7 @@ mod tests {
 
         let optimized = optimizer.optimize(
             Arc::new(parquet_project),
-            &ExecutionConfig::new().with_concurrency(10),
+            &ExecutionConfig::new().with_target_partitions(10),
         )?;
 
         assert_eq!(
@@ -180,7 +180,7 @@ mod tests {
 
         let optimized = optimizer.optimize(
             Arc::new(parquet_project),
-            &ExecutionConfig::new().with_concurrency(10),
+            &ExecutionConfig::new().with_target_partitions(10),
         )?;
 
         // RepartitionExec is added to deepest node
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index 82c297c..b7a31fa 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -160,9 +160,9 @@ pub trait ExecutionPlan: Debug + Send + Sync {
 /// use datafusion::prelude::*;
 /// use datafusion::physical_plan::displayable;
 ///
-/// // Hard code concurrency as it appears in the RepartitionExec output
+/// // Hard code target_partitions as it appears in the RepartitionExec output
 /// let config = ExecutionConfig::new()
-///     .with_concurrency(3);
+///     .with_target_partitions(3);
 /// let mut ctx = ExecutionContext::with_config(config);
 ///
 /// // register the a table
diff --git a/datafusion/src/physical_plan/parquet.rs 
b/datafusion/src/physical_plan/parquet.rs
index f85495c..ac1655b 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -118,7 +118,7 @@ impl ParquetExec {
         projection: Option<Vec<usize>>,
         predicate: Option<Expr>,
         batch_size: usize,
-        max_concurrency: usize,
+        max_partitions: usize,
         limit: Option<usize>,
     ) -> Result<Self> {
         // build a list of filenames from the specified path, which could be a 
single file or
@@ -139,7 +139,7 @@ impl ParquetExec {
                 projection,
                 predicate,
                 batch_size,
-                max_concurrency,
+                max_partitions,
                 limit,
             )
         }
@@ -152,7 +152,7 @@ impl ParquetExec {
         projection: Option<Vec<usize>>,
         predicate: Option<Expr>,
         batch_size: usize,
-        max_concurrency: usize,
+        max_partitions: usize,
         limit: Option<usize>,
     ) -> Result<Self> {
         debug!("Creating ParquetExec, filenames: {:?}, projection {:?}, 
predicate: {:?}, limit: {:?}",
@@ -161,9 +161,9 @@ impl ParquetExec {
         // used in this data set
         let metrics = ExecutionPlanMetricsSet::new();
         let mut schemas: Vec<Schema> = vec![];
-        let mut partitions = Vec::with_capacity(max_concurrency);
+        let mut partitions = Vec::with_capacity(max_partitions);
         let filenames: Vec<String> = filenames.iter().map(|s| 
s.to_string()).collect();
-        let chunks = split_files(&filenames, max_concurrency);
+        let chunks = split_files(&filenames, max_partitions);
         let mut num_rows = 0;
         let mut num_fields = 0;
         let mut fields = Vec::new();
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index 02ab15d..dda0b66 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -326,7 +326,7 @@ impl DefaultPhysicalPlanner {
                 let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
 
                 let can_repartition = !partition_keys.is_empty()
-                    && ctx_state.config.concurrency > 1
+                    && ctx_state.config.target_partitions > 1
                     && ctx_state.config.repartition_windows;
 
                 let input_exec = if can_repartition {
@@ -343,7 +343,10 @@ impl DefaultPhysicalPlanner {
                         .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
                     Arc::new(RepartitionExec::try_new(
                         input_exec,
-                        Partitioning::Hash(partition_keys, 
ctx_state.config.concurrency),
+                        Partitioning::Hash(
+                            partition_keys,
+                            ctx_state.config.target_partitions,
+                        ),
                     )?)
                 } else {
                     input_exec
@@ -477,7 +480,7 @@ impl DefaultPhysicalPlanner {
                     .any(|x| matches!(x, DataType::Dictionary(_, _)));
 
                 let can_repartition = !groups.is_empty()
-                    && ctx_state.config.concurrency > 1
+                    && ctx_state.config.target_partitions > 1
                     && ctx_state.config.repartition_aggregations
                     && !contains_dict;
 
@@ -490,7 +493,7 @@ impl DefaultPhysicalPlanner {
                         initial_aggr,
                         Partitioning::Hash(
                             final_group.clone(),
-                            ctx_state.config.concurrency,
+                            ctx_state.config.target_partitions,
                         ),
                     )?);
                     // Combine hash aggregates within the partition
@@ -668,7 +671,8 @@ impl DefaultPhysicalPlanner {
                     })
                     .collect::<Result<hash_utils::JoinOn>>()?;
 
-                if ctx_state.config.concurrency > 1 && 
ctx_state.config.repartition_joins
+                if ctx_state.config.target_partitions > 1
+                    && ctx_state.config.repartition_joins
                 {
                     let (left_expr, right_expr) = join_on
                         .iter()
@@ -684,11 +688,17 @@ impl DefaultPhysicalPlanner {
                     Ok(Arc::new(HashJoinExec::try_new(
                         Arc::new(RepartitionExec::try_new(
                             physical_left,
-                            Partitioning::Hash(left_expr, 
ctx_state.config.concurrency),
+                            Partitioning::Hash(
+                                left_expr,
+                                ctx_state.config.target_partitions,
+                            ),
                         )?),
                         Arc::new(RepartitionExec::try_new(
                             physical_right,
-                            Partitioning::Hash(right_expr, 
ctx_state.config.concurrency),
+                            Partitioning::Hash(
+                                right_expr,
+                                ctx_state.config.target_partitions,
+                            ),
                         )?),
                         join_on,
                         join_type,
@@ -1394,7 +1404,7 @@ mod tests {
 
     fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
         let mut ctx_state = make_ctx_state();
-        ctx_state.config.concurrency = 4;
+        ctx_state.config.target_partitions = 4;
         let planner = DefaultPhysicalPlanner::default();
         planner.create_physical_plan(logical_plan, &ctx_state)
     }
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 1c22cc3..a2e84e1 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -4169,8 +4169,8 @@ async fn test_cast_expressions_error() -> Result<()> {
 
 #[tokio::test]
 async fn test_physical_plan_display_indent() {
-    // Hard code concurrency as it appears in the RepartitionExec output
-    let config = ExecutionConfig::new().with_concurrency(3);
+    // Hard code target_partitions as it appears in the RepartitionExec output
+    let config = ExecutionConfig::new().with_target_partitions(3);
     let mut ctx = ExecutionContext::with_config(config);
     register_aggregate_csv(&mut ctx).unwrap();
     let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \
@@ -4215,8 +4215,8 @@ async fn test_physical_plan_display_indent() {
 
 #[tokio::test]
 async fn test_physical_plan_display_indent_multi_children() {
-    // Hard code concurrency as it appears in the RepartitionExec output
-    let config = ExecutionConfig::new().with_concurrency(3);
+    // Hard code target_partitions as it appears in the RepartitionExec output
+    let config = ExecutionConfig::new().with_target_partitions(3);
     let mut ctx = ExecutionContext::with_config(config);
     // ensure indenting works for nodes with multiple children
     register_aggregate_csv(&mut ctx).unwrap();
diff --git a/datafusion/tests/user_defined_plan.rs 
b/datafusion/tests/user_defined_plan.rs
index c1269d9..dfcdcf5 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -192,7 +192,7 @@ async fn topk_plan() -> Result<()> {
 fn make_topk_context() -> ExecutionContext {
     let config = ExecutionConfig::new()
         .with_query_planner(Arc::new(TopKQueryPlanner {}))
-        .with_concurrency(48)
+        .with_target_partitions(48)
         .add_optimizer_rule(Arc::new(TopKOptimizerRule {}));
 
     ExecutionContext::with_config(config)

Reply via email to