This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new dd9cd15  remove hard coded value (#1044)
dd9cd15 is described below

commit dd9cd15d840dfafdf85a36bbf58008a12d86d1af
Author: carlos <[email protected]>
AuthorDate: Fri Sep 24 23:27:42 2021 +0800

    remove hard coded value (#1044)
---
 ballista/rust/core/proto/ballista.proto             |  1 +
 .../rust/core/src/serde/logical_plan/from_proto.rs  |  4 ++--
 .../rust/core/src/serde/logical_plan/to_proto.rs    |  1 +
 benchmarks/src/bin/tpch.rs                          |  4 ++--
 datafusion/src/datasource/parquet.rs                | 21 +++++++++++++--------
 datafusion/src/logical_plan/builder.rs              |  8 ++++----
 datafusion/src/physical_plan/parquet.rs             | 10 +++++-----
 7 files changed, 28 insertions(+), 21 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 3fc291e..a3ed18a 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -296,6 +296,7 @@ message ParquetTableScanNode {
   TableDescriptor table_desc = 2;
   ProjectionColumns projection = 3;
   repeated LogicalExprNode filters = 4;
+  uint32 target_partitions = 5;
 }
 
 message AvroTableScanNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 8ffdb65..9c7658c 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -161,14 +161,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
 
                 let parquet_table = ParquetTable::try_new_with_desc(
                     Arc::new(ParquetTableDescriptor { descriptor }),
-                    24,
+                    scan.target_partitions as usize,
                     true,
                 )?;
                 LogicalPlanBuilder::scan(
                     &scan.table_name,
                     Arc::new(parquet_table),
                     projection,
-                )? //TODO remove hard-coded max_partitions
+                )?
                 .build()
                 .map_err(|e| e.into())
             }
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index b74f663..f5c2414 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -758,6 +758,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
                                 table_desc: Some(table_desc),
                                 projection,
                                 filters,
+                                target_partitions: 
parquet.get_target_partitions() as u32,
                             },
                         )),
                     })
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 2e9b2ff..203c186 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -443,7 +443,7 @@ fn get_table(
     path: &str,
     table: &str,
     table_format: &str,
-    max_partitions: usize,
+    target_partitions: usize,
 ) -> Result<Arc<dyn TableProvider>> {
     match table_format {
         // dbgen creates .tbl ('|' delimited) files without header
@@ -471,7 +471,7 @@ fn get_table(
             Ok(Arc::new(ParquetTable::try_new_with_schema(
                 &path,
                 schema,
-                max_partitions,
+                target_partitions,
                 false,
             )?))
         }
diff --git a/datafusion/src/datasource/parquet.rs 
b/datafusion/src/datasource/parquet.rs
index 65c9089..d044ed9 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -44,18 +44,18 @@ use crate::scalar::ScalarValue;
 pub struct ParquetTable {
     /// Descriptor of the table, including schema, files, etc.
     pub desc: Arc<ParquetTableDescriptor>,
-    max_partitions: usize,
+    target_partitions: usize,
     enable_pruning: bool,
 }
 
 impl ParquetTable {
     /// Attempt to initialize a new `ParquetTable` from a file path.
-    pub fn try_new(path: impl Into<String>, max_partitions: usize) -> 
Result<Self> {
+    pub fn try_new(path: impl Into<String>, target_partitions: usize) -> 
Result<Self> {
         let path = path.into();
         let table_desc = ParquetTableDescriptor::new(path.as_str());
         Ok(Self {
             desc: Arc::new(table_desc?),
-            max_partitions,
+            target_partitions,
             enable_pruning: true,
         })
     }
@@ -65,7 +65,7 @@ impl ParquetTable {
     pub fn try_new_with_schema(
         path: impl Into<String>,
         schema: Schema,
-        max_partitions: usize,
+        target_partitions: usize,
         collect_statistics: bool,
     ) -> Result<Self> {
         let path = path.into();
@@ -76,7 +76,7 @@ impl ParquetTable {
         );
         Ok(Self {
             desc: Arc::new(table_desc?),
-            max_partitions,
+            target_partitions,
             enable_pruning: true,
         })
     }
@@ -84,12 +84,12 @@ impl ParquetTable {
     /// Attempt to initialize a new `ParquetTable` from a table descriptor.
     pub fn try_new_with_desc(
         desc: Arc<ParquetTableDescriptor>,
-        max_partitions: usize,
+        target_partitions: usize,
         enable_pruning: bool,
     ) -> Result<Self> {
         Ok(Self {
             desc,
-            max_partitions,
+            target_partitions,
             enable_pruning,
         })
     }
@@ -109,6 +109,11 @@ impl ParquetTable {
         self.enable_pruning = enable_pruning;
         self
     }
+
+    /// Get Target partitions
+    pub fn get_target_partitions(&self) -> usize {
+        self.target_partitions
+    }
 }
 
 #[async_trait]
@@ -153,7 +158,7 @@ impl TableProvider for ParquetTable {
             limit
                 .map(|l| std::cmp::min(l, batch_size))
                 .unwrap_or(batch_size),
-            self.max_partitions,
+            self.target_partitions,
             limit,
         )?))
     }
diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index c032273..d902d6f 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -139,20 +139,20 @@ impl LogicalPlanBuilder {
     pub fn scan_parquet(
         path: impl Into<String>,
         projection: Option<Vec<usize>>,
-        max_partitions: usize,
+        target_partitions: usize,
     ) -> Result<Self> {
         let path = path.into();
-        Self::scan_parquet_with_name(path.clone(), projection, max_partitions, 
path)
+        Self::scan_parquet_with_name(path.clone(), projection, 
target_partitions, path)
     }
 
     /// Scan a Parquet data source and register it with a given table name
     pub fn scan_parquet_with_name(
         path: impl Into<String>,
         projection: Option<Vec<usize>>,
-        max_partitions: usize,
+        target_partitions: usize,
         table_name: impl Into<String>,
     ) -> Result<Self> {
-        let provider = Arc::new(ParquetTable::try_new(path, max_partitions)?);
+        let provider = Arc::new(ParquetTable::try_new(path, 
target_partitions)?);
         Self::scan(table_name, provider, projection)
     }
 
diff --git a/datafusion/src/physical_plan/parquet.rs 
b/datafusion/src/physical_plan/parquet.rs
index feed181..f4ac4c8 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -117,7 +117,7 @@ impl ParquetExec {
         projection: Option<Vec<usize>>,
         predicate: Option<Expr>,
         batch_size: usize,
-        max_partitions: usize,
+        target_partitions: usize,
         limit: Option<usize>,
     ) -> Result<Self> {
         // build a list of filenames from the specified path, which could be a 
single file or
@@ -128,7 +128,7 @@ impl ParquetExec {
             projection,
             predicate,
             batch_size,
-            max_partitions,
+            target_partitions,
             limit,
         )
     }
@@ -139,7 +139,7 @@ impl ParquetExec {
         projection: Option<Vec<usize>>,
         predicate: Option<Expr>,
         batch_size: usize,
-        max_partitions: usize,
+        target_partitions: usize,
         limit: Option<usize>,
     ) -> Result<Self> {
         debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: 
{:?}, limit: {:?}",
@@ -149,8 +149,8 @@ impl ParquetExec {
         let (all_files, statistics) = 
get_statistics_with_limit(&desc.descriptor, limit);
         let schema = desc.schema();
 
-        let mut partitions = Vec::with_capacity(max_partitions);
-        let chunked_files = split_files(&all_files, max_partitions);
+        let mut partitions = Vec::with_capacity(target_partitions);
+        let chunked_files = split_files(&all_files, target_partitions);
         for (index, group) in chunked_files.iter().enumerate() {
             partitions.push(ParquetPartition::new(
                 Vec::from(*group),

Reply via email to