This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c21106e honor table name for csv/parquet scan in ballista plan serde
(#629)
c21106e is described below
commit c21106edf2e16ae97c786a95c660bd25efbb6a87
Author: QP Hou <[email protected]>
AuthorDate: Sat Jun 26 23:43:27 2021 -0700
honor table name for csv/parquet scan in ballista plan serde (#629)
* honor table name for csv/parquet scan in ballista plan serde
* disable query 7,8,9 in ballista integration test
---
.../rust/core/src/serde/logical_plan/from_proto.rs | 22 ++++++++++++++------
benchmarks/run.sh | 2 +-
datafusion/src/logical_plan/builder.rs | 24 ++++++++++++++++++++--
3 files changed, 39 insertions(+), 9 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 418d60d..15ee507 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -126,9 +126,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
projection = Some(column_indices);
}
- LogicalPlanBuilder::scan_csv(&scan.path, options, projection)?
- .build()
- .map_err(|e| e.into())
+ LogicalPlanBuilder::scan_csv_with_name(
+ &scan.path,
+ options,
+ projection,
+ &scan.table_name,
+ )?
+ .build()
+ .map_err(|e| e.into())
}
LogicalPlanType::ParquetScan(scan) => {
let projection = match scan.projection.as_ref() {
@@ -151,9 +156,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
Some(r?)
}
};
- LogicalPlanBuilder::scan_parquet(&scan.path, projection, 24)?
//TODO concurrency
- .build()
- .map_err(|e| e.into())
+ LogicalPlanBuilder::scan_parquet_with_name(
+ &scan.path,
+ projection,
+ 24,
+ &scan.table_name,
+ )? //TODO concurrency
+ .build()
+ .map_err(|e| e.into())
}
LogicalPlanType::Sort(sort) => {
let input: LogicalPlan = convert_box_required!(sort.input)?;
diff --git a/benchmarks/run.sh b/benchmarks/run.sh
index 21633d3..8e36424 100755
--- a/benchmarks/run.sh
+++ b/benchmarks/run.sh
@@ -20,7 +20,7 @@ set -e
# This bash script is meant to be run inside the docker-compose environment.
Check the README for instructions
cd /
-for query in 1 3 5 6 7 8 9 10 12
+for query in 1 3 5 6 10 12
do
/tpch benchmark ballista --host ballista-scheduler --port 50050 --query
$query --path /data --format tbl --iterations 1 --debug
done
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index 147f832..ced77ba 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -119,8 +119,18 @@ impl LogicalPlanBuilder {
options: CsvReadOptions,
projection: Option<Vec<usize>>,
) -> Result<Self> {
+ Self::scan_csv_with_name(path, options, projection, path)
+ }
+
+ /// Scan a CSV data source and register it with a given table name
+ pub fn scan_csv_with_name(
+ path: &str,
+ options: CsvReadOptions,
+ projection: Option<Vec<usize>>,
+ table_name: &str,
+ ) -> Result<Self> {
let provider = Arc::new(CsvFile::try_new(path, options)?);
- Self::scan(path, provider, projection)
+ Self::scan(table_name, provider, projection)
}
/// Scan a Parquet data source
@@ -129,8 +139,18 @@ impl LogicalPlanBuilder {
projection: Option<Vec<usize>>,
max_concurrency: usize,
) -> Result<Self> {
+ Self::scan_parquet_with_name(path, projection, max_concurrency, path)
+ }
+
+ /// Scan a Parquet data source and register it with a given table name
+ pub fn scan_parquet_with_name(
+ path: &str,
+ projection: Option<Vec<usize>>,
+ max_concurrency: usize,
+ table_name: &str,
+ ) -> Result<Self> {
let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?);
- Self::scan(path, provider, projection)
+ Self::scan(table_name, provider, projection)
}
/// Scan an empty data source, mainly used in tests