This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c21106e  honor table name for csv/parquet scan in ballista plan serde 
(#629)
c21106e is described below

commit c21106edf2e16ae97c786a95c660bd25efbb6a87
Author: QP Hou <[email protected]>
AuthorDate: Sat Jun 26 23:43:27 2021 -0700

    honor table name for csv/parquet scan in ballista plan serde (#629)
    
    * honor table name for csv/parquet scan in ballista plan serde
    
    * disable query 7,8,9 in ballista integration test
---
 .../rust/core/src/serde/logical_plan/from_proto.rs | 22 ++++++++++++++------
 benchmarks/run.sh                                  |  2 +-
 datafusion/src/logical_plan/builder.rs             | 24 ++++++++++++++++++++--
 3 files changed, 39 insertions(+), 9 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
index 418d60d..15ee507 100644
--- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs
@@ -126,9 +126,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                     projection = Some(column_indices);
                 }
 
-                LogicalPlanBuilder::scan_csv(&scan.path, options, projection)?
-                    .build()
-                    .map_err(|e| e.into())
+                LogicalPlanBuilder::scan_csv_with_name(
+                    &scan.path,
+                    options,
+                    projection,
+                    &scan.table_name,
+                )?
+                .build()
+                .map_err(|e| e.into())
             }
             LogicalPlanType::ParquetScan(scan) => {
                 let projection = match scan.projection.as_ref() {
@@ -151,9 +156,14 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
                         Some(r?)
                     }
                 };
-                LogicalPlanBuilder::scan_parquet(&scan.path, projection, 24)? 
//TODO concurrency
-                    .build()
-                    .map_err(|e| e.into())
+                LogicalPlanBuilder::scan_parquet_with_name(
+                    &scan.path,
+                    projection,
+                    24,
+                    &scan.table_name,
+                )? //TODO concurrency
+                .build()
+                .map_err(|e| e.into())
             }
             LogicalPlanType::Sort(sort) => {
                 let input: LogicalPlan = convert_box_required!(sort.input)?;
diff --git a/benchmarks/run.sh b/benchmarks/run.sh
index 21633d3..8e36424 100755
--- a/benchmarks/run.sh
+++ b/benchmarks/run.sh
@@ -20,7 +20,7 @@ set -e
 # This bash script is meant to be run inside the docker-compose environment. 
Check the README for instructions
 
 cd /
-for query in 1 3 5 6 7 8 9 10 12
+for query in 1 3 5 6 10 12
 do
   /tpch benchmark ballista --host ballista-scheduler --port 50050 --query 
$query --path /data --format tbl --iterations 1 --debug
 done
diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index 147f832..ced77ba 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -119,8 +119,18 @@ impl LogicalPlanBuilder {
         options: CsvReadOptions,
         projection: Option<Vec<usize>>,
     ) -> Result<Self> {
+        Self::scan_csv_with_name(path, options, projection, path)
+    }
+
+    /// Scan a CSV data source and register it with a given table name
+    pub fn scan_csv_with_name(
+        path: &str,
+        options: CsvReadOptions,
+        projection: Option<Vec<usize>>,
+        table_name: &str,
+    ) -> Result<Self> {
         let provider = Arc::new(CsvFile::try_new(path, options)?);
-        Self::scan(path, provider, projection)
+        Self::scan(table_name, provider, projection)
     }
 
     /// Scan a Parquet data source
@@ -129,8 +139,18 @@ impl LogicalPlanBuilder {
         projection: Option<Vec<usize>>,
         max_concurrency: usize,
     ) -> Result<Self> {
+        Self::scan_parquet_with_name(path, projection, max_concurrency, path)
+    }
+
+    /// Scan a Parquet data source and register it with a given table name
+    pub fn scan_parquet_with_name(
+        path: &str,
+        projection: Option<Vec<usize>>,
+        max_concurrency: usize,
+        table_name: &str,
+    ) -> Result<Self> {
         let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?);
-        Self::scan(path, provider, projection)
+        Self::scan(table_name, provider, projection)
     }
 
     /// Scan an empty data source, mainly used in tests

Reply via email to