This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new dd9cd15 remove hard coded value (#1044)
dd9cd15 is described below
commit dd9cd15d840dfafdf85a36bbf58008a12d86d1af
Author: carlos <[email protected]>
AuthorDate: Fri Sep 24 23:27:42 2021 +0800
remove hard coded value (#1044)
---
ballista/rust/core/proto/ballista.proto | 1 +
.../rust/core/src/serde/logical_plan/from_proto.rs | 4 ++--
.../rust/core/src/serde/logical_plan/to_proto.rs | 1 +
benchmarks/src/bin/tpch.rs | 4 ++--
datafusion/src/datasource/parquet.rs | 21 +++++++++++++--------
datafusion/src/logical_plan/builder.rs | 8 ++++----
datafusion/src/physical_plan/parquet.rs | 10 +++++-----
7 files changed, 28 insertions(+), 21 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 3fc291e..a3ed18a 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -296,6 +296,7 @@ message ParquetTableScanNode {
TableDescriptor table_desc = 2;
ProjectionColumns projection = 3;
repeated LogicalExprNode filters = 4;
+ uint32 target_partitions = 5;
}
message AvroTableScanNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 8ffdb65..9c7658c 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -161,14 +161,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
let parquet_table = ParquetTable::try_new_with_desc(
Arc::new(ParquetTableDescriptor { descriptor }),
- 24,
+ scan.target_partitions as usize,
true,
)?;
LogicalPlanBuilder::scan(
&scan.table_name,
Arc::new(parquet_table),
projection,
- )? //TODO remove hard-coded max_partitions
+ )?
.build()
.map_err(|e| e.into())
}
diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
index b74f663..f5c2414 100644
--- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs
@@ -758,6 +758,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
table_desc: Some(table_desc),
projection,
filters,
+ target_partitions:
parquet.get_target_partitions() as u32,
},
)),
})
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 2e9b2ff..203c186 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -443,7 +443,7 @@ fn get_table(
path: &str,
table: &str,
table_format: &str,
- max_partitions: usize,
+ target_partitions: usize,
) -> Result<Arc<dyn TableProvider>> {
match table_format {
// dbgen creates .tbl ('|' delimited) files without header
@@ -471,7 +471,7 @@ fn get_table(
Ok(Arc::new(ParquetTable::try_new_with_schema(
&path,
schema,
- max_partitions,
+ target_partitions,
false,
)?))
}
diff --git a/datafusion/src/datasource/parquet.rs
b/datafusion/src/datasource/parquet.rs
index 65c9089..d044ed9 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -44,18 +44,18 @@ use crate::scalar::ScalarValue;
pub struct ParquetTable {
/// Descriptor of the table, including schema, files, etc.
pub desc: Arc<ParquetTableDescriptor>,
- max_partitions: usize,
+ target_partitions: usize,
enable_pruning: bool,
}
impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path.
- pub fn try_new(path: impl Into<String>, max_partitions: usize) ->
Result<Self> {
+ pub fn try_new(path: impl Into<String>, target_partitions: usize) ->
Result<Self> {
let path = path.into();
let table_desc = ParquetTableDescriptor::new(path.as_str());
Ok(Self {
desc: Arc::new(table_desc?),
- max_partitions,
+ target_partitions,
enable_pruning: true,
})
}
@@ -65,7 +65,7 @@ impl ParquetTable {
pub fn try_new_with_schema(
path: impl Into<String>,
schema: Schema,
- max_partitions: usize,
+ target_partitions: usize,
collect_statistics: bool,
) -> Result<Self> {
let path = path.into();
@@ -76,7 +76,7 @@ impl ParquetTable {
);
Ok(Self {
desc: Arc::new(table_desc?),
- max_partitions,
+ target_partitions,
enable_pruning: true,
})
}
@@ -84,12 +84,12 @@ impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a table descriptor.
pub fn try_new_with_desc(
desc: Arc<ParquetTableDescriptor>,
- max_partitions: usize,
+ target_partitions: usize,
enable_pruning: bool,
) -> Result<Self> {
Ok(Self {
desc,
- max_partitions,
+ target_partitions,
enable_pruning,
})
}
@@ -109,6 +109,11 @@ impl ParquetTable {
self.enable_pruning = enable_pruning;
self
}
+
+ /// Get Target partitions
+ pub fn get_target_partitions(&self) -> usize {
+ self.target_partitions
+ }
}
#[async_trait]
@@ -153,7 +158,7 @@ impl TableProvider for ParquetTable {
limit
.map(|l| std::cmp::min(l, batch_size))
.unwrap_or(batch_size),
- self.max_partitions,
+ self.target_partitions,
limit,
)?))
}
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index c032273..d902d6f 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -139,20 +139,20 @@ impl LogicalPlanBuilder {
pub fn scan_parquet(
path: impl Into<String>,
projection: Option<Vec<usize>>,
- max_partitions: usize,
+ target_partitions: usize,
) -> Result<Self> {
let path = path.into();
- Self::scan_parquet_with_name(path.clone(), projection, max_partitions,
path)
+ Self::scan_parquet_with_name(path.clone(), projection,
target_partitions, path)
}
/// Scan a Parquet data source and register it with a given table name
pub fn scan_parquet_with_name(
path: impl Into<String>,
projection: Option<Vec<usize>>,
- max_partitions: usize,
+ target_partitions: usize,
table_name: impl Into<String>,
) -> Result<Self> {
- let provider = Arc::new(ParquetTable::try_new(path, max_partitions)?);
+ let provider = Arc::new(ParquetTable::try_new(path,
target_partitions)?);
Self::scan(table_name, provider, projection)
}
diff --git a/datafusion/src/physical_plan/parquet.rs
b/datafusion/src/physical_plan/parquet.rs
index feed181..f4ac4c8 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -117,7 +117,7 @@ impl ParquetExec {
projection: Option<Vec<usize>>,
predicate: Option<Expr>,
batch_size: usize,
- max_partitions: usize,
+ target_partitions: usize,
limit: Option<usize>,
) -> Result<Self> {
// build a list of filenames from the specified path, which could be a
single file or
@@ -128,7 +128,7 @@ impl ParquetExec {
projection,
predicate,
batch_size,
- max_partitions,
+ target_partitions,
limit,
)
}
@@ -139,7 +139,7 @@ impl ParquetExec {
projection: Option<Vec<usize>>,
predicate: Option<Expr>,
batch_size: usize,
- max_partitions: usize,
+ target_partitions: usize,
limit: Option<usize>,
) -> Result<Self> {
debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate:
{:?}, limit: {:?}",
@@ -149,8 +149,8 @@ impl ParquetExec {
let (all_files, statistics) =
get_statistics_with_limit(&desc.descriptor, limit);
let schema = desc.schema();
- let mut partitions = Vec::with_capacity(max_partitions);
- let chunked_files = split_files(&all_files, max_partitions);
+ let mut partitions = Vec::with_capacity(target_partitions);
+ let chunked_files = split_files(&all_files, target_partitions);
for (index, group) in chunked_files.iter().enumerate() {
partitions.push(ParquetPartition::new(
Vec::from(*group),