This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e752a9cd53 feat: add round trip test of physical plan in tpch unit
tests (#6918)
e752a9cd53 is described below
commit e752a9cd5390db718d6512d843afbe063ddecb48
Author: r.4ntix <[email protected]>
AuthorDate: Thu Jul 13 01:56:01 2023 +0800
feat: add round trip test of physical plan in tpch unit tests (#6918)
---
benchmarks/src/bin/tpch.rs | 223 +++++++++++++++++++++------------------------
1 file changed, 106 insertions(+), 117 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index d18dd38b9b..b5f535b38d 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -394,9 +394,24 @@ mod tests {
use std::path::Path;
use super::*;
- use datafusion_proto::bytes::{logical_plan_from_bytes,
logical_plan_to_bytes};
+ use datafusion_proto::bytes::{
+ logical_plan_from_bytes, logical_plan_to_bytes,
physical_plan_from_bytes,
+ physical_plan_to_bytes,
+ };
+
+ fn get_tpch_data_path() -> Result<String> {
+ let path =
+ std::env::var("TPCH_DATA").unwrap_or_else(|_|
"benchmarks/data".to_string());
+ if !Path::new(&path).exists() {
+ return Err(DataFusionError::Execution(format!(
+ "Benchmark data not found (set TPCH_DATA env var to override):
{}",
+ path
+ )));
+ }
+ Ok(path)
+ }
- async fn serde_round_trip(query: usize) -> Result<()> {
+ async fn round_trip_logical_plan(query: usize) -> Result<()> {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
let opt = DataFusionBenchmarkOpt {
@@ -425,125 +440,99 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn serde_q1() -> Result<()> {
- serde_round_trip(1).await
- }
-
- #[tokio::test]
- async fn serde_q2() -> Result<()> {
- serde_round_trip(2).await
- }
-
- #[tokio::test]
- async fn serde_q3() -> Result<()> {
- serde_round_trip(3).await
- }
-
- #[tokio::test]
- async fn serde_q4() -> Result<()> {
- serde_round_trip(4).await
- }
-
- #[tokio::test]
- async fn serde_q5() -> Result<()> {
- serde_round_trip(5).await
- }
-
- #[tokio::test]
- async fn serde_q6() -> Result<()> {
- serde_round_trip(6).await
- }
-
- #[tokio::test]
- async fn serde_q7() -> Result<()> {
- serde_round_trip(7).await
- }
-
- #[tokio::test]
- async fn serde_q8() -> Result<()> {
- serde_round_trip(8).await
- }
-
- #[tokio::test]
- async fn serde_q9() -> Result<()> {
- serde_round_trip(9).await
- }
-
- #[tokio::test]
- async fn serde_q10() -> Result<()> {
- serde_round_trip(10).await
- }
-
- #[tokio::test]
- async fn serde_q11() -> Result<()> {
- serde_round_trip(11).await
- }
-
- #[tokio::test]
- async fn serde_q12() -> Result<()> {
- serde_round_trip(12).await
- }
-
- #[tokio::test]
- async fn serde_q13() -> Result<()> {
- serde_round_trip(13).await
- }
-
- #[tokio::test]
- async fn serde_q14() -> Result<()> {
- serde_round_trip(14).await
- }
-
- #[tokio::test]
- async fn serde_q15() -> Result<()> {
- serde_round_trip(15).await
- }
-
- #[tokio::test]
- async fn serde_q16() -> Result<()> {
- serde_round_trip(16).await
- }
-
- #[tokio::test]
- async fn serde_q17() -> Result<()> {
- serde_round_trip(17).await
- }
-
- #[tokio::test]
- async fn serde_q18() -> Result<()> {
- serde_round_trip(18).await
- }
-
- #[tokio::test]
- async fn serde_q19() -> Result<()> {
- serde_round_trip(19).await
- }
-
- #[tokio::test]
- async fn serde_q20() -> Result<()> {
- serde_round_trip(20).await
+ async fn round_trip_physical_plan(query: usize) -> Result<()> {
+ let ctx = SessionContext::default();
+ let path = get_tpch_data_path()?;
+ let opt = DataFusionBenchmarkOpt {
+ query: Some(query),
+ debug: false,
+ iterations: 1,
+ partitions: 2,
+ batch_size: 8192,
+ path: PathBuf::from(path.to_string()),
+ file_format: "tbl".to_string(),
+ mem_table: false,
+ output_path: None,
+ disable_statistics: false,
+ };
+ register_tables(&opt, &ctx).await?;
+ let queries = get_query_sql(query)?;
+ for query in queries {
+ let plan = ctx.sql(&query).await?;
+ let plan = plan.create_physical_plan().await?;
+ let bytes = physical_plan_to_bytes(plan.clone())?;
+ let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
+ let plan_formatted = format!("{}",
displayable(plan.as_ref()).indent(false));
+ let plan2_formatted =
+ format!("{}", displayable(plan2.as_ref()).indent(false));
+ assert_eq!(plan_formatted, plan2_formatted);
+ }
+ Ok(())
}
- #[tokio::test]
- async fn serde_q21() -> Result<()> {
- serde_round_trip(21).await
+ macro_rules! test_round_trip_logical {
+ ($tn:ident, $query:expr) => {
+ #[tokio::test]
+ async fn $tn() -> Result<()> {
+ round_trip_logical_plan($query).await
+ }
+ };
}
- #[tokio::test]
- async fn serde_q22() -> Result<()> {
- serde_round_trip(22).await
+ macro_rules! test_round_trip_physical {
+ ($tn:ident, $query:expr) => {
+ #[tokio::test]
+ async fn $tn() -> Result<()> {
+ round_trip_physical_plan($query).await
+ }
+ };
}
- fn get_tpch_data_path() -> Result<String> {
- let path =
- std::env::var("TPCH_DATA").unwrap_or_else(|_|
"benchmarks/data".to_string());
- if !Path::new(&path).exists() {
- return Err(DataFusionError::Execution(format!(
- "Benchmark data not found (set TPCH_DATA env var to override):
{}",
- path
- )));
- }
- Ok(path)
- }
+ // logical plan tests
+ test_round_trip_logical!(round_trip_logical_plan_q1, 1);
+ test_round_trip_logical!(round_trip_logical_plan_q2, 2);
+ test_round_trip_logical!(round_trip_logical_plan_q3, 3);
+ test_round_trip_logical!(round_trip_logical_plan_q4, 4);
+ test_round_trip_logical!(round_trip_logical_plan_q5, 5);
+ test_round_trip_logical!(round_trip_logical_plan_q6, 6);
+ test_round_trip_logical!(round_trip_logical_plan_q7, 7);
+ test_round_trip_logical!(round_trip_logical_plan_q8, 8);
+ test_round_trip_logical!(round_trip_logical_plan_q9, 9);
+ test_round_trip_logical!(round_trip_logical_plan_q10, 10);
+ test_round_trip_logical!(round_trip_logical_plan_q11, 11);
+ test_round_trip_logical!(round_trip_logical_plan_q12, 12);
+ test_round_trip_logical!(round_trip_logical_plan_q13, 13);
+ test_round_trip_logical!(round_trip_logical_plan_q14, 14);
+ test_round_trip_logical!(round_trip_logical_plan_q15, 15);
+ test_round_trip_logical!(round_trip_logical_plan_q16, 16);
+ test_round_trip_logical!(round_trip_logical_plan_q17, 17);
+ test_round_trip_logical!(round_trip_logical_plan_q18, 18);
+ test_round_trip_logical!(round_trip_logical_plan_q19, 19);
+ test_round_trip_logical!(round_trip_logical_plan_q20, 20);
+ test_round_trip_logical!(round_trip_logical_plan_q21, 21);
+ test_round_trip_logical!(round_trip_logical_plan_q22, 22);
+
+ // physical plan tests
+ test_round_trip_physical!(round_trip_physical_plan_q1, 1);
+ test_round_trip_physical!(round_trip_physical_plan_q2, 2);
+ test_round_trip_physical!(round_trip_physical_plan_q3, 3);
+ test_round_trip_physical!(round_trip_physical_plan_q4, 4);
+ test_round_trip_physical!(round_trip_physical_plan_q5, 5);
+ test_round_trip_physical!(round_trip_physical_plan_q6, 6);
+ test_round_trip_physical!(round_trip_physical_plan_q7, 7);
+ test_round_trip_physical!(round_trip_physical_plan_q8, 8);
+ test_round_trip_physical!(round_trip_physical_plan_q9, 9);
+ test_round_trip_physical!(round_trip_physical_plan_q10, 10);
+ test_round_trip_physical!(round_trip_physical_plan_q11, 11);
+ test_round_trip_physical!(round_trip_physical_plan_q12, 12);
+ test_round_trip_physical!(round_trip_physical_plan_q13, 13);
+ test_round_trip_physical!(round_trip_physical_plan_q14, 14);
+ test_round_trip_physical!(round_trip_physical_plan_q15, 15);
+ test_round_trip_physical!(round_trip_physical_plan_q16, 16);
+ test_round_trip_physical!(round_trip_physical_plan_q17, 17);
+ test_round_trip_physical!(round_trip_physical_plan_q18, 18);
+ test_round_trip_physical!(round_trip_physical_plan_q19, 19);
+ test_round_trip_physical!(round_trip_physical_plan_q20, 20);
+ test_round_trip_physical!(round_trip_physical_plan_q21, 21);
+ test_round_trip_physical!(round_trip_physical_plan_q22, 22);
}