This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bdc880  round trip TPCH queries in tests (#630)
5bdc880 is described below

commit 5bdc880e9e45538a7d86a1b56ac613b2dfca176c
Author: QP Hou <[email protected]>
AuthorDate: Sun Jun 27 00:52:06 2021 -0700

    round trip TPCH queries in tests (#630)
    
    * honor table name for csv/parquet scan in ballista plan serde
    
    * disable query 7,8,9 in ballista integration test
    
    * add tpch query ballista roundtrip test
    
    * also roud trip physical plan
    
    * fix clippy
    
    * simplify test code
---
 benchmarks/Cargo.toml            |  3 ++
 benchmarks/src/bin/tpch.rs       | 85 +++++++++++++++++++++++++++++++++++++++-
 datafusion/src/datasource/mod.rs |  1 +
 3 files changed, 88 insertions(+), 1 deletion(-)

diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 6a76342..19a67a5 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -39,3 +39,6 @@ futures = "0.3"
 env_logger = "^0.8"
 mimalloc = { version = "0.1", optional = true, default-features = false }
 snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] }
+
+[dev-dependencies]
+ballista-core = { path = "../ballista/rust/core" }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 286fe45..77c69f0 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -573,7 +573,6 @@ mod tests {
 
     use datafusion::arrow::array::*;
     use datafusion::arrow::util::display::array_value_to_string;
-
     use datafusion::logical_plan::Expr;
     use datafusion::logical_plan::Expr::Cast;
 
@@ -1042,4 +1041,88 @@ mod tests {
 
         Ok(())
     }
+
+    mod ballista_round_trip {
+        use super::*;
+        use ballista_core::serde::protobuf;
+        use datafusion::physical_plan::ExecutionPlan;
+        use std::convert::TryInto;
+
+        fn round_trip_query(n: usize) -> Result<()> {
+            let config = ExecutionConfig::new()
+                .with_concurrency(1)
+                .with_batch_size(10);
+            let mut ctx = ExecutionContext::with_config(config);
+
+            // set tpch_data_path to dummy value and skip physical plan serde 
test when TPCH_DATA
+            // is not set.
+            let tpch_data_path =
+                env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string());
+
+            for &table in TABLES {
+                let schema = get_schema(table);
+                let options = CsvReadOptions::new()
+                    .schema(&schema)
+                    .delimiter(b'|')
+                    .has_header(false)
+                    .file_extension(".tbl");
+                let provider = CsvFile::try_new(
+                    &format!("{}/{}.tbl", tpch_data_path, table),
+                    options,
+                )?;
+                ctx.register_table(table, Arc::new(provider))?;
+            }
+
+            // test logical plan round trip
+            let plan = create_logical_plan(&mut ctx, n)?;
+            let proto: protobuf::LogicalPlanNode = (&plan).try_into().unwrap();
+            let round_trip: LogicalPlan = (&proto).try_into().unwrap();
+            assert_eq!(
+                format!("{:?}", plan),
+                format!("{:?}", round_trip),
+                "logical plan round trip failed"
+            );
+
+            // test optimized logical plan round trip
+            let plan = ctx.optimize(&plan)?;
+            let proto: protobuf::LogicalPlanNode = (&plan).try_into().unwrap();
+            let round_trip: LogicalPlan = (&proto).try_into().unwrap();
+            assert_eq!(
+                format!("{:?}", plan),
+                format!("{:?}", round_trip),
+                "opitmized logical plan round trip failed"
+            );
+
+            // test physical plan roundtrip
+            if env::var("TPCH_DATA").is_ok() {
+                let physical_plan = ctx.create_physical_plan(&plan)?;
+                let proto: protobuf::PhysicalPlanNode =
+                    (physical_plan.clone()).try_into().unwrap();
+                let round_trip: Arc<dyn ExecutionPlan> = 
(&proto).try_into().unwrap();
+                assert_eq!(
+                    format!("{:?}", physical_plan),
+                    format!("{:?}", round_trip),
+                    "physical plan round trip failed"
+                );
+            }
+
+            Ok(())
+        }
+
+        macro_rules! test_round_trip {
+            ($tn:ident, $query:expr) => {
+                #[test]
+                fn $tn() -> Result<()> {
+                    round_trip_query($query)
+                }
+            };
+        }
+
+        test_round_trip!(q1, 1);
+        test_round_trip!(q3, 3);
+        test_round_trip!(q5, 5);
+        test_round_trip!(q6, 6);
+        test_round_trip!(q10, 10);
+        test_round_trip!(q12, 12);
+    }
 }
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index b46b9cc..9699a99 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -28,6 +28,7 @@ pub use self::csv::{CsvFile, CsvReadOptions};
 pub use self::datasource::{TableProvider, TableType};
 pub use self::memory::MemTable;
 
+/// Source for table input data
 pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
     /// Path to a single file or a directory containing one of more files
     Path(String),

Reply via email to