avantgardnerio commented on code in PR #666:
URL: https://github.com/apache/arrow-ballista/pull/666#discussion_r1105954150
##########
benchmarks/src/bin/tpch.rs:
##########
@@ -1611,65 +1611,137 @@ mod tests {
format!("{round_trip:?}"),
"optimized logical plan round trip failed"
);
+ }
+
+ Ok(())
+ }
+
+ async fn round_trip_physical_plan(n: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_target_partitions(1)
+ .with_batch_size(10);
+ let ctx = SessionContext::with_config(config);
+ let session_state = ctx.state();
+ let codec: BallistaCodec<
+ datafusion_proto::protobuf::LogicalPlanNode,
+ datafusion_proto::protobuf::PhysicalPlanNode,
+ > = BallistaCodec::default();
+
+ // set tpch_data_path to dummy value and skip physical plan serde
test when TPCH_DATA
+ // is not set.
+ let tpch_data_path =
+ env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string());
+ let path = ListingTableUrl::parse(tpch_data_path)?;
+
+ for &table in TABLES {
+ let schema = get_schema(table);
+ let options = CsvReadOptions::new()
+ .schema(&schema)
+ .delimiter(b'|')
+ .has_header(false)
+ .file_extension(".tbl");
+ let cfg = SessionConfig::new();
+ let listing_options = options.to_listing_options(&cfg);
+ let config = ListingTableConfig::new(path.clone())
+ .with_listing_options(listing_options)
+ .with_schema(Arc::new(schema));
+ let provider = ListingTable::try_new(config)?;
+ ctx.register_table(table, Arc::new(provider))?;
+ }
+
+ // test logical plan round trip
+ let plans = create_logical_plans(&ctx, n).await?;
+ for plan in plans {
+ let plan = session_state.optimize(&plan)?;
// test physical plan roundtrip
Review Comment:
Seems straightforward
##########
benchmarks/src/bin/tpch.rs:
##########
@@ -1611,65 +1611,137 @@ mod tests {
format!("{round_trip:?}"),
"optimized logical plan round trip failed"
);
+ }
+
+ Ok(())
+ }
+
+ async fn round_trip_physical_plan(n: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_target_partitions(1)
+ .with_batch_size(10);
+ let ctx = SessionContext::with_config(config);
+ let session_state = ctx.state();
+ let codec: BallistaCodec<
+ datafusion_proto::protobuf::LogicalPlanNode,
+ datafusion_proto::protobuf::PhysicalPlanNode,
+ > = BallistaCodec::default();
+
+ // set tpch_data_path to dummy value and skip physical plan serde
test when TPCH_DATA
+ // is not set.
+ let tpch_data_path =
+ env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string());
+ let path = ListingTableUrl::parse(tpch_data_path)?;
+
+ for &table in TABLES {
+ let schema = get_schema(table);
+ let options = CsvReadOptions::new()
+ .schema(&schema)
+ .delimiter(b'|')
+ .has_header(false)
+ .file_extension(".tbl");
+ let cfg = SessionConfig::new();
+ let listing_options = options.to_listing_options(&cfg);
+ let config = ListingTableConfig::new(path.clone())
+ .with_listing_options(listing_options)
+ .with_schema(Arc::new(schema));
+ let provider = ListingTable::try_new(config)?;
+ ctx.register_table(table, Arc::new(provider))?;
+ }
+
+ // test logical plan round trip
+ let plans = create_logical_plans(&ctx, n).await?;
+ for plan in plans {
+ let plan = session_state.optimize(&plan)?;
// test physical plan roundtrip
- if env::var("TPCH_DATA").is_ok() {
- let physical_plan =
session_state.create_physical_plan(&plan).await?;
- let proto: datafusion_proto::protobuf::PhysicalPlanNode =
-
datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
- physical_plan.clone(),
- codec.physical_extension_codec(),
- )
- .unwrap();
- let runtime = ctx.runtime_env();
- let round_trip: Arc<dyn ExecutionPlan> = proto
- .try_into_physical_plan(
- &ctx,
- runtime.deref(),
- codec.physical_extension_codec(),
- )
- .unwrap();
- assert_eq!(
- format!("{physical_plan:?}"),
- format!("{round_trip:?}"),
- "physical plan round trip failed"
- );
- }
+ let physical_plan =
session_state.create_physical_plan(&plan).await?;
+ let proto: datafusion_proto::protobuf::PhysicalPlanNode =
+
datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
+ physical_plan.clone(),
+ codec.physical_extension_codec(),
+ )
+ .unwrap();
+ let runtime = ctx.runtime_env();
+ let round_trip: Arc<dyn ExecutionPlan> = proto
+ .try_into_physical_plan(
+ &ctx,
+ runtime.deref(),
+ codec.physical_extension_codec(),
+ )
+ .unwrap();
+ assert_eq!(
+ format!("{}",
displayable(physical_plan.as_ref()).indent()),
+ format!("{}", displayable(round_trip.as_ref()).indent()),
+ "physical plan round trip failed"
+ );
}
Ok(())
}
- macro_rules! test_round_trip {
+ macro_rules! test_round_trip_logical {
+ ($tn:ident, $query:expr) => {
+ #[tokio::test]
+ async fn $tn() -> Result<()> {
+ round_trip_logical_plan($query).await
+ }
+ };
+ }
+
+ test_round_trip_logical!(q1, 1);
+ test_round_trip_logical!(q2, 2);
+ test_round_trip_logical!(q3, 3);
+ test_round_trip_logical!(q4, 4);
+ test_round_trip_logical!(q5, 5);
+ test_round_trip_logical!(q6, 6);
+ test_round_trip_logical!(q7, 7);
+ test_round_trip_logical!(q8, 8);
+ test_round_trip_logical!(q9, 9);
+ test_round_trip_logical!(q10, 10);
+ test_round_trip_logical!(q11, 11);
+ test_round_trip_logical!(q12, 12);
+ test_round_trip_logical!(q13, 13);
+ test_round_trip_logical!(q14, 14);
+ //test_round_trip_logical!(q15, 15); //
https://github.com/apache/arrow-ballista/issues/330
+ test_round_trip_logical!(q16, 16);
+ test_round_trip_logical!(q17, 17);
+ test_round_trip_logical!(q18, 18);
+ test_round_trip_logical!(q19, 19);
+ test_round_trip_logical!(q20, 20);
+ test_round_trip_logical!(q21, 21);
+ test_round_trip_logical!(q22, 22);
Review Comment:
Seems like a good set of test data...
##########
benchmarks/src/bin/tpch.rs:
##########
@@ -1611,65 +1611,137 @@ mod tests {
format!("{round_trip:?}"),
"optimized logical plan round trip failed"
);
+ }
+
+ Ok(())
+ }
+
+ async fn round_trip_physical_plan(n: usize) -> Result<()> {
+ let config = SessionConfig::new()
+ .with_target_partitions(1)
+ .with_batch_size(10);
+ let ctx = SessionContext::with_config(config);
+ let session_state = ctx.state();
+ let codec: BallistaCodec<
+ datafusion_proto::protobuf::LogicalPlanNode,
+ datafusion_proto::protobuf::PhysicalPlanNode,
+ > = BallistaCodec::default();
+
+ // set tpch_data_path to dummy value and skip physical plan serde
test when TPCH_DATA
+ // is not set.
+ let tpch_data_path =
+ env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string());
+ let path = ListingTableUrl::parse(tpch_data_path)?;
+
+ for &table in TABLES {
+ let schema = get_schema(table);
+ let options = CsvReadOptions::new()
+ .schema(&schema)
+ .delimiter(b'|')
+ .has_header(false)
+ .file_extension(".tbl");
+ let cfg = SessionConfig::new();
+ let listing_options = options.to_listing_options(&cfg);
+ let config = ListingTableConfig::new(path.clone())
+ .with_listing_options(listing_options)
+ .with_schema(Arc::new(schema));
+ let provider = ListingTable::try_new(config)?;
+ ctx.register_table(table, Arc::new(provider))?;
+ }
+
+ // test logical plan round trip
+ let plans = create_logical_plans(&ctx, n).await?;
+ for plan in plans {
+ let plan = session_state.optimize(&plan)?;
// test physical plan roundtrip
- if env::var("TPCH_DATA").is_ok() {
- let physical_plan =
session_state.create_physical_plan(&plan).await?;
- let proto: datafusion_proto::protobuf::PhysicalPlanNode =
-
datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
- physical_plan.clone(),
- codec.physical_extension_codec(),
- )
- .unwrap();
- let runtime = ctx.runtime_env();
- let round_trip: Arc<dyn ExecutionPlan> = proto
- .try_into_physical_plan(
- &ctx,
- runtime.deref(),
- codec.physical_extension_codec(),
- )
- .unwrap();
- assert_eq!(
- format!("{physical_plan:?}"),
- format!("{round_trip:?}"),
Review Comment:
Makes sense...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]