This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 5b180a12 Enable physical plan round-trip tests (#666)
5b180a12 is described below
commit 5b180a12bceee27c7da0201cc60389069b64ee0f
Author: Andy Grove <[email protected]>
AuthorDate: Tue Feb 14 18:32:06 2023 -0700
Enable physical plan round-trip tests (#666)
---
benchmarks/src/bin/tpch.rs | 166 ++++++++++++++++++++++++++++++++-------------
1 file changed, 119 insertions(+), 47 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 40b2ccb9..0608f3e1 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -1559,7 +1559,7 @@ mod tests {
use datafusion_proto::physical_plan::AsExecutionPlan;
use std::ops::Deref;
- async fn round_trip_query(n: usize) -> Result<()> {
+ async fn round_trip_logical_plan(n: usize) -> Result<()> {
let config = SessionConfig::new()
.with_target_partitions(1)
.with_batch_size(10);
@@ -1611,65 +1611,137 @@ mod tests {
format!("{round_trip:?}"),
"optimized logical plan round trip failed"
);
+ }
+
+ Ok(())
+ }
+
+ async fn round_trip_physical_plan(n: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_target_partitions(1)
+ .with_batch_size(10);
+ let ctx = SessionContext::with_config(config);
+ let session_state = ctx.state();
+ let codec: BallistaCodec<
+ datafusion_proto::protobuf::LogicalPlanNode,
+ datafusion_proto::protobuf::PhysicalPlanNode,
+ > = BallistaCodec::default();
+
+ // set tpch_data_path to dummy value and skip physical plan serde
test when TPCH_DATA
+ // is not set.
+ let tpch_data_path =
+ env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string());
+ let path = ListingTableUrl::parse(tpch_data_path)?;
+
+ for &table in TABLES {
+ let schema = get_schema(table);
+ let options = CsvReadOptions::new()
+ .schema(&schema)
+ .delimiter(b'|')
+ .has_header(false)
+ .file_extension(".tbl");
+ let cfg = SessionConfig::new();
+ let listing_options = options.to_listing_options(&cfg);
+ let config = ListingTableConfig::new(path.clone())
+ .with_listing_options(listing_options)
+ .with_schema(Arc::new(schema));
+ let provider = ListingTable::try_new(config)?;
+ ctx.register_table(table, Arc::new(provider))?;
+ }
+
+ // test logical plan round trip
+ let plans = create_logical_plans(&ctx, n).await?;
+ for plan in plans {
+ let plan = session_state.optimize(&plan)?;
// test physical plan roundtrip
- if env::var("TPCH_DATA").is_ok() {
- let physical_plan =
session_state.create_physical_plan(&plan).await?;
- let proto: datafusion_proto::protobuf::PhysicalPlanNode =
-
datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
- physical_plan.clone(),
- codec.physical_extension_codec(),
- )
- .unwrap();
- let runtime = ctx.runtime_env();
- let round_trip: Arc<dyn ExecutionPlan> = proto
- .try_into_physical_plan(
- &ctx,
- runtime.deref(),
- codec.physical_extension_codec(),
- )
- .unwrap();
- assert_eq!(
- format!("{physical_plan:?}"),
- format!("{round_trip:?}"),
- "physical plan round trip failed"
- );
- }
+ let physical_plan =
session_state.create_physical_plan(&plan).await?;
+ let proto: datafusion_proto::protobuf::PhysicalPlanNode =
+
datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
+ physical_plan.clone(),
+ codec.physical_extension_codec(),
+ )
+ .unwrap();
+ let runtime = ctx.runtime_env();
+ let round_trip: Arc<dyn ExecutionPlan> = proto
+ .try_into_physical_plan(
+ &ctx,
+ runtime.deref(),
+ codec.physical_extension_codec(),
+ )
+ .unwrap();
+ assert_eq!(
+ format!("{}",
displayable(physical_plan.as_ref()).indent()),
+ format!("{}", displayable(round_trip.as_ref()).indent()),
+ "physical plan round trip failed"
+ );
}
Ok(())
}
- macro_rules! test_round_trip {
+ macro_rules! test_round_trip_logical {
+ ($tn:ident, $query:expr) => {
+ #[tokio::test]
+ async fn $tn() -> Result<()> {
+ round_trip_logical_plan($query).await
+ }
+ };
+ }
+
+ test_round_trip_logical!(q1, 1);
+ test_round_trip_logical!(q2, 2);
+ test_round_trip_logical!(q3, 3);
+ test_round_trip_logical!(q4, 4);
+ test_round_trip_logical!(q5, 5);
+ test_round_trip_logical!(q6, 6);
+ test_round_trip_logical!(q7, 7);
+ test_round_trip_logical!(q8, 8);
+ test_round_trip_logical!(q9, 9);
+ test_round_trip_logical!(q10, 10);
+ test_round_trip_logical!(q11, 11);
+ test_round_trip_logical!(q12, 12);
+ test_round_trip_logical!(q13, 13);
+ test_round_trip_logical!(q14, 14);
+ //test_round_trip_logical!(q15, 15); //
https://github.com/apache/arrow-ballista/issues/330
+ test_round_trip_logical!(q16, 16);
+ test_round_trip_logical!(q17, 17);
+ test_round_trip_logical!(q18, 18);
+ test_round_trip_logical!(q19, 19);
+ test_round_trip_logical!(q20, 20);
+ test_round_trip_logical!(q21, 21);
+ test_round_trip_logical!(q22, 22);
+
+ macro_rules! test_round_trip_physical {
($tn:ident, $query:expr) => {
#[tokio::test]
async fn $tn() -> Result<()> {
- round_trip_query($query).await
+ round_trip_physical_plan($query).await
}
};
}
- test_round_trip!(q1, 1);
- test_round_trip!(q2, 2);
- test_round_trip!(q3, 3);
- test_round_trip!(q4, 4);
- test_round_trip!(q5, 5);
- test_round_trip!(q6, 6);
- test_round_trip!(q7, 7);
- test_round_trip!(q8, 8);
- test_round_trip!(q9, 9);
- test_round_trip!(q10, 10);
- test_round_trip!(q11, 11);
- test_round_trip!(q12, 12);
- test_round_trip!(q13, 13);
- test_round_trip!(q14, 14);
- // test_round_trip!(q15, 15);
https://github.com/apache/arrow-ballista/issues/330
- // test_round_trip!(q16, 16);
https://github.com/apache/arrow-ballista/issues/330
- test_round_trip!(q17, 17);
- test_round_trip!(q18, 18);
- test_round_trip!(q19, 19);
- test_round_trip!(q20, 20);
- test_round_trip!(q21, 21);
- // test_round_trip!(q22, 22);
https://github.com/apache/arrow-ballista/issues/330
+ test_round_trip_physical!(physical_round_trip_q1, 1);
+ test_round_trip_physical!(physical_round_trip_q2, 2);
+ test_round_trip_physical!(physical_round_trip_q3, 3);
+ test_round_trip_physical!(physical_round_trip_q4, 4);
+ test_round_trip_physical!(physical_round_trip_q5, 5);
+ test_round_trip_physical!(physical_round_trip_q6, 6);
+ test_round_trip_physical!(physical_round_trip_q7, 7);
+ test_round_trip_physical!(physical_round_trip_q8, 8);
+ test_round_trip_physical!(physical_round_trip_q9, 9);
+ test_round_trip_physical!(physical_round_trip_q10, 10);
+ test_round_trip_physical!(physical_round_trip_q11, 11);
+ test_round_trip_physical!(physical_round_trip_q12, 12);
+ test_round_trip_physical!(physical_round_trip_q13, 13);
+ test_round_trip_physical!(physical_round_trip_q14, 14);
+ // test_round_trip_physical!(physical_round_trip_q15, 15); //
https://github.com/apache/arrow-ballista/issues/330
+ test_round_trip_physical!(physical_round_trip_q16, 16);
+ test_round_trip_physical!(physical_round_trip_q17, 17);
+ test_round_trip_physical!(physical_round_trip_q18, 18);
+ test_round_trip_physical!(physical_round_trip_q19, 19);
+ test_round_trip_physical!(physical_round_trip_q20, 20);
+ test_round_trip_physical!(physical_round_trip_q21, 21);
+ test_round_trip_physical!(physical_round_trip_q22, 22);
}
}