This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b180a12 Enable physical plan round-trip tests (#666)
5b180a12 is described below

commit 5b180a12bceee27c7da0201cc60389069b64ee0f
Author: Andy Grove <[email protected]>
AuthorDate: Tue Feb 14 18:32:06 2023 -0700

    Enable physical plan round-trip tests (#666)
---
 benchmarks/src/bin/tpch.rs | 166 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 119 insertions(+), 47 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 40b2ccb9..0608f3e1 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -1559,7 +1559,7 @@ mod tests {
         use datafusion_proto::physical_plan::AsExecutionPlan;
         use std::ops::Deref;
 
-        async fn round_trip_query(n: usize) -> Result<()> {
+        async fn round_trip_logical_plan(n: usize) -> Result<()> {
             let config = SessionConfig::new()
                 .with_target_partitions(1)
                 .with_batch_size(10);
@@ -1611,65 +1611,137 @@ mod tests {
                     format!("{round_trip:?}"),
                     "optimized logical plan round trip failed"
                 );
+            }
+
+            Ok(())
+        }
+
+        async fn round_trip_physical_plan(n: usize) -> Result<()> {
+            let config = SessionConfig::new()
+                .with_target_partitions(1)
+                .with_batch_size(10);
+            let ctx = SessionContext::with_config(config);
+            let session_state = ctx.state();
+            let codec: BallistaCodec<
+                datafusion_proto::protobuf::LogicalPlanNode,
+                datafusion_proto::protobuf::PhysicalPlanNode,
+            > = BallistaCodec::default();
+
+            // set tpch_data_path to dummy value and skip physical plan serde 
test when TPCH_DATA
+            // is not set.
+            let tpch_data_path =
+                env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string());
+            let path = ListingTableUrl::parse(tpch_data_path)?;
+
+            for &table in TABLES {
+                let schema = get_schema(table);
+                let options = CsvReadOptions::new()
+                    .schema(&schema)
+                    .delimiter(b'|')
+                    .has_header(false)
+                    .file_extension(".tbl");
+                let cfg = SessionConfig::new();
+                let listing_options = options.to_listing_options(&cfg);
+                let config = ListingTableConfig::new(path.clone())
+                    .with_listing_options(listing_options)
+                    .with_schema(Arc::new(schema));
+                let provider = ListingTable::try_new(config)?;
+                ctx.register_table(table, Arc::new(provider))?;
+            }
+
+            // test logical plan round trip
+            let plans = create_logical_plans(&ctx, n).await?;
+            for plan in plans {
+                let plan = session_state.optimize(&plan)?;
 
                 // test physical plan roundtrip
-                if env::var("TPCH_DATA").is_ok() {
-                    let physical_plan = 
session_state.create_physical_plan(&plan).await?;
-                    let proto: datafusion_proto::protobuf::PhysicalPlanNode =
-                        
datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
-                            physical_plan.clone(),
-                            codec.physical_extension_codec(),
-                        )
-                        .unwrap();
-                    let runtime = ctx.runtime_env();
-                    let round_trip: Arc<dyn ExecutionPlan> = proto
-                        .try_into_physical_plan(
-                            &ctx,
-                            runtime.deref(),
-                            codec.physical_extension_codec(),
-                        )
-                        .unwrap();
-                    assert_eq!(
-                        format!("{physical_plan:?}"),
-                        format!("{round_trip:?}"),
-                        "physical plan round trip failed"
-                    );
-                }
+                let physical_plan = 
session_state.create_physical_plan(&plan).await?;
+                let proto: datafusion_proto::protobuf::PhysicalPlanNode =
+                    
datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
+                        physical_plan.clone(),
+                        codec.physical_extension_codec(),
+                    )
+                    .unwrap();
+                let runtime = ctx.runtime_env();
+                let round_trip: Arc<dyn ExecutionPlan> = proto
+                    .try_into_physical_plan(
+                        &ctx,
+                        runtime.deref(),
+                        codec.physical_extension_codec(),
+                    )
+                    .unwrap();
+                assert_eq!(
+                    format!("{}", 
displayable(physical_plan.as_ref()).indent()),
+                    format!("{}", displayable(round_trip.as_ref()).indent()),
+                    "physical plan round trip failed"
+                );
             }
 
             Ok(())
         }
 
-        macro_rules! test_round_trip {
+        macro_rules! test_round_trip_logical {
+            ($tn:ident, $query:expr) => {
+                #[tokio::test]
+                async fn $tn() -> Result<()> {
+                    round_trip_logical_plan($query).await
+                }
+            };
+        }
+
+        test_round_trip_logical!(q1, 1);
+        test_round_trip_logical!(q2, 2);
+        test_round_trip_logical!(q3, 3);
+        test_round_trip_logical!(q4, 4);
+        test_round_trip_logical!(q5, 5);
+        test_round_trip_logical!(q6, 6);
+        test_round_trip_logical!(q7, 7);
+        test_round_trip_logical!(q8, 8);
+        test_round_trip_logical!(q9, 9);
+        test_round_trip_logical!(q10, 10);
+        test_round_trip_logical!(q11, 11);
+        test_round_trip_logical!(q12, 12);
+        test_round_trip_logical!(q13, 13);
+        test_round_trip_logical!(q14, 14);
+        //test_round_trip_logical!(q15, 15); // 
https://github.com/apache/arrow-ballista/issues/330
+        test_round_trip_logical!(q16, 16);
+        test_round_trip_logical!(q17, 17);
+        test_round_trip_logical!(q18, 18);
+        test_round_trip_logical!(q19, 19);
+        test_round_trip_logical!(q20, 20);
+        test_round_trip_logical!(q21, 21);
+        test_round_trip_logical!(q22, 22);
+
+        macro_rules! test_round_trip_physical {
             ($tn:ident, $query:expr) => {
                 #[tokio::test]
                 async fn $tn() -> Result<()> {
-                    round_trip_query($query).await
+                    round_trip_physical_plan($query).await
                 }
             };
         }
 
-        test_round_trip!(q1, 1);
-        test_round_trip!(q2, 2);
-        test_round_trip!(q3, 3);
-        test_round_trip!(q4, 4);
-        test_round_trip!(q5, 5);
-        test_round_trip!(q6, 6);
-        test_round_trip!(q7, 7);
-        test_round_trip!(q8, 8);
-        test_round_trip!(q9, 9);
-        test_round_trip!(q10, 10);
-        test_round_trip!(q11, 11);
-        test_round_trip!(q12, 12);
-        test_round_trip!(q13, 13);
-        test_round_trip!(q14, 14);
-        // test_round_trip!(q15, 15); 
https://github.com/apache/arrow-ballista/issues/330
-        // test_round_trip!(q16, 16); 
https://github.com/apache/arrow-ballista/issues/330
-        test_round_trip!(q17, 17);
-        test_round_trip!(q18, 18);
-        test_round_trip!(q19, 19);
-        test_round_trip!(q20, 20);
-        test_round_trip!(q21, 21);
-        // test_round_trip!(q22, 22); 
https://github.com/apache/arrow-ballista/issues/330
+        test_round_trip_physical!(physical_round_trip_q1, 1);
+        test_round_trip_physical!(physical_round_trip_q2, 2);
+        test_round_trip_physical!(physical_round_trip_q3, 3);
+        test_round_trip_physical!(physical_round_trip_q4, 4);
+        test_round_trip_physical!(physical_round_trip_q5, 5);
+        test_round_trip_physical!(physical_round_trip_q6, 6);
+        test_round_trip_physical!(physical_round_trip_q7, 7);
+        test_round_trip_physical!(physical_round_trip_q8, 8);
+        test_round_trip_physical!(physical_round_trip_q9, 9);
+        test_round_trip_physical!(physical_round_trip_q10, 10);
+        test_round_trip_physical!(physical_round_trip_q11, 11);
+        test_round_trip_physical!(physical_round_trip_q12, 12);
+        test_round_trip_physical!(physical_round_trip_q13, 13);
+        test_round_trip_physical!(physical_round_trip_q14, 14);
+        // test_round_trip_physical!(physical_round_trip_q15, 15); // 
https://github.com/apache/arrow-ballista/issues/330
+        test_round_trip_physical!(physical_round_trip_q16, 16);
+        test_round_trip_physical!(physical_round_trip_q17, 17);
+        test_round_trip_physical!(physical_round_trip_q18, 18);
+        test_round_trip_physical!(physical_round_trip_q19, 19);
+        test_round_trip_physical!(physical_round_trip_q20, 20);
+        test_round_trip_physical!(physical_round_trip_q21, 21);
+        test_round_trip_physical!(physical_round_trip_q22, 22);
     }
 }

Reply via email to