This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5bdc880 round trip TPCH queries in tests (#630)
5bdc880 is described below
commit 5bdc880e9e45538a7d86a1b56ac613b2dfca176c
Author: QP Hou <[email protected]>
AuthorDate: Sun Jun 27 00:52:06 2021 -0700
round trip TPCH queries in tests (#630)
* honor table name for csv/parquet scan in ballista plan serde
* disable query 7,8,9 in ballista integration test
* add tpch query ballista roundtrip test
* also roud trip physical plan
* fix clippy
* simplify test code
---
benchmarks/Cargo.toml | 3 ++
benchmarks/src/bin/tpch.rs | 85 +++++++++++++++++++++++++++++++++++++++-
datafusion/src/datasource/mod.rs | 1 +
3 files changed, 88 insertions(+), 1 deletion(-)
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 6a76342..19a67a5 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -39,3 +39,6 @@ futures = "0.3"
env_logger = "^0.8"
mimalloc = { version = "0.1", optional = true, default-features = false }
snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] }
+
+[dev-dependencies]
+ballista-core = { path = "../ballista/rust/core" }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 286fe45..77c69f0 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -573,7 +573,6 @@ mod tests {
use datafusion::arrow::array::*;
use datafusion::arrow::util::display::array_value_to_string;
-
use datafusion::logical_plan::Expr;
use datafusion::logical_plan::Expr::Cast;
@@ -1042,4 +1041,88 @@ mod tests {
Ok(())
}
+
+ mod ballista_round_trip {
+ use super::*;
+ use ballista_core::serde::protobuf;
+ use datafusion::physical_plan::ExecutionPlan;
+ use std::convert::TryInto;
+
+ fn round_trip_query(n: usize) -> Result<()> {
+ let config = ExecutionConfig::new()
+ .with_concurrency(1)
+ .with_batch_size(10);
+ let mut ctx = ExecutionContext::with_config(config);
+
+ // set tpch_data_path to dummy value and skip physical plan serde
test when TPCH_DATA
+ // is not set.
+ let tpch_data_path =
+ env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string());
+
+ for &table in TABLES {
+ let schema = get_schema(table);
+ let options = CsvReadOptions::new()
+ .schema(&schema)
+ .delimiter(b'|')
+ .has_header(false)
+ .file_extension(".tbl");
+ let provider = CsvFile::try_new(
+ &format!("{}/{}.tbl", tpch_data_path, table),
+ options,
+ )?;
+ ctx.register_table(table, Arc::new(provider))?;
+ }
+
+ // test logical plan round trip
+ let plan = create_logical_plan(&mut ctx, n)?;
+ let proto: protobuf::LogicalPlanNode = (&plan).try_into().unwrap();
+ let round_trip: LogicalPlan = (&proto).try_into().unwrap();
+ assert_eq!(
+ format!("{:?}", plan),
+ format!("{:?}", round_trip),
+ "logical plan round trip failed"
+ );
+
+ // test optimized logical plan round trip
+ let plan = ctx.optimize(&plan)?;
+ let proto: protobuf::LogicalPlanNode = (&plan).try_into().unwrap();
+ let round_trip: LogicalPlan = (&proto).try_into().unwrap();
+ assert_eq!(
+ format!("{:?}", plan),
+ format!("{:?}", round_trip),
+ "opitmized logical plan round trip failed"
+ );
+
+ // test physical plan roundtrip
+ if env::var("TPCH_DATA").is_ok() {
+ let physical_plan = ctx.create_physical_plan(&plan)?;
+ let proto: protobuf::PhysicalPlanNode =
+ (physical_plan.clone()).try_into().unwrap();
+ let round_trip: Arc<dyn ExecutionPlan> =
(&proto).try_into().unwrap();
+ assert_eq!(
+ format!("{:?}", physical_plan),
+ format!("{:?}", round_trip),
+ "physical plan round trip failed"
+ );
+ }
+
+ Ok(())
+ }
+
+ macro_rules! test_round_trip {
+ ($tn:ident, $query:expr) => {
+ #[test]
+ fn $tn() -> Result<()> {
+ round_trip_query($query)
+ }
+ };
+ }
+
+ test_round_trip!(q1, 1);
+ test_round_trip!(q3, 3);
+ test_round_trip!(q5, 5);
+ test_round_trip!(q6, 6);
+ test_round_trip!(q10, 10);
+ test_round_trip!(q12, 12);
+ }
}
diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs
index b46b9cc..9699a99 100644
--- a/datafusion/src/datasource/mod.rs
+++ b/datafusion/src/datasource/mod.rs
@@ -28,6 +28,7 @@ pub use self::csv::{CsvFile, CsvReadOptions};
pub use self::datasource::{TableProvider, TableType};
pub use self::memory::MemTable;
+/// Source for table input data
pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
/// Path to a single file or a directory containing one of more files
Path(String),