devinjdangelo commented on issue #9299:
URL:
https://github.com/apache/arrow-datafusion/issues/9299#issuecomment-1958303415
I looked into the example and the physical plan substrait producer/consumer
code. Unfortunately for physical plans, the subtrait consumer and producer are
only implemented for `ParquetExec` and even then they are not fully
implemented, so I do not believe any practical example will execute without
further development.
Here is an example which makes it further than the above but panics on the
roundtrip assertion:
```rust
use datafusion::prelude::*;
use std::collections::HashMap;
use datafusion::error::Result;
use datafusion_substrait::physical_plan;
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()>{
// Create a plan that scans table 't'
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet(
"alltypes_plain",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;
let df = ctx
.sql(
"SELECT * from alltypes_plain",
)
.await?;
let physical_plan = df.create_physical_plan().await?;
// Convert the plan into a substrait (protobuf) Rel
let mut extension_info= (vec![], HashMap::new());
let substrait_plan =
physical_plan::producer::to_substrait_rel(physical_plan.as_ref(), &mut
extension_info)?;
// Decode bytes from somewhere (over network, etc.) back to ExecutionPlan
let physical_round_trip = physical_plan::consumer::from_substrait_rel(
&ctx, &substrait_plan, &HashMap::new()
).await?;
assert_eq!(format!("{:?}", physical_plan), format!("{:?}",
physical_round_trip));
Ok(())
}
```
And here is the panic output:
```
thread 'main' panicked at datafusion/substrait/src/lib.rs:37:2:
assertion `left == right` failed
left: "ParquetExec { pushdown_filters: None, reorder_filters: None,
enable_page_index: None, enable_bloom_filter: None, base_config:
object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\",
cannot_be_a_base: false, username: \"\", password: None, host: None, port:
None, path: \"/\", query: None, fragment: None } }, statistics=Statistics {
num_rows: Exact(8), total_byte_size: Exact(671), column_statistics:
[ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent,
distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value:
Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics {
null_count: Absent, max_value: Absent, min_value: Absent, distinct_count:
Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value:
Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent,
max_value: Absent, min_value: Absent, distinct_count: Absent },
ColumnStatistics { null_count: Absent, max_value: Absent, min_
value: Absent, distinct_count: Absent }, ColumnStatistics { null_count:
Absent, max_value: Absent, min_value: Absent, distinct_count: Absent },
ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent,
distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value:
Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics {
null_count: Absent, max_value: Absent, min_value: Absent, distinct_count:
Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value:
Absent, distinct_count: Absent }] }, file_groups={1 group:
[[home/dev/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col],
projected_statistics: Statistics { num_rows: Exact(8), total_byte_size: Absent,
column_statistics: [ColumnStatistics { null_count: Absent, max_value: Absent,
min_value: Absent, distinct_count: Absen
t }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value:
Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent,
max_value: Absent, min_value: Absent, distinct_count: Absent },
ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent,
distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value:
Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics {
null_count: Absent, max_value: Absent, min_value: Absent, distinct_count:
Absent }, ColumnStatistics { null_count: Absent, max_value: Absent, min_value:
Absent, distinct_count: Absent }, ColumnStatistics { null_count: Absent,
max_value: Absent, min_value: Absent, distinct_count: Absent },
ColumnStatistics { null_count: Absent, max_value: Absent, min_value: Absent,
distinct_count: Absent }, ColumnStatistics { null_count: Absent, max_value:
Absent, min_value: Absent, distinct_count: Absent }, ColumnStatistics {
null_count: Absent, max_value: Absent, min_val
ue: Absent, distinct_count: Absent }] }, projected_schema: Schema { fields:
[Field { name: \"id\", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }, Field { name: \"bool_col\", data_type:
Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: \"tinyint_col\", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }, Field { name: \"smallint_col\",
data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }, Field { name: \"int_col\", data_type: Int32, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }, Field { name: \"bigint_col\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }, Field { name: \"float_col\", data_type: Float32, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }, Field { name: \"double_col\",
data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {}
}, Field { name: \"date_string_col\", data_type: Binary, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name:
\"string_col\", data_type: Binary, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }, Field { name: \"timestamp_col\", data_type:
Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }], metadata: {} }, projected_output_ordering: [], metrics:
ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [Metric {
value: Count { name: \"num_predicate_creation_errors\", count: Count { value: 0
} }, labels: [], partition: None }] } } }, predicate: None, pruning_predicate:
None, page_pruning_predicate: None, metadata_size_hint: None,
parquet_file_reader_factory: None }"
right: "ParquetExec { pushdown_filters: None, reorder_filters: None,
enable_page_index: None, enable_bloom_filter: None, base_config:
object_store_url=ObjectStoreUrl { url: Url { scheme: \"file\",
cannot_be_a_base: false, username: \"\", password: None, host: None, port:
None, path: \"/\", query: None, fragment: None } }, statistics=Statistics {
num_rows: Absent, total_byte_size: Absent, column_statistics: [] },
file_groups={1 group:
[[home/dev/arrow-datafusion/parquet-testing/data/alltypes_plain.parquet]]},
projected_statistics: Statistics { num_rows: Absent, total_byte_size: Absent,
column_statistics: [] }, projected_schema: Schema { fields: [], metadata: {} },
projected_output_ordering: [], metrics: ExecutionPlanMetricsSet { inner: Mutex
{ data: MetricsSet { metrics: [Metric { value: Count { name:
\"num_predicate_creation_errors\", count: Count { value: 0 } }, labels: [],
partition: None }] } } }, predicate: None, pruning_predicate: None,
page_pruning_predicate: None, metadata
_size_hint: None, parquet_file_reader_factory: None }"
stack backtrace:
...
You can see that the round trip lost many details about the `ParquetExec`
such as projected_schema and projected_statistics.
```
I think if we want to include a user facing example of a physical plan
substrait roundtrip, we will need to cut a ticket to complete the
implementation of `ParquetExec` to substrait first.
It looks like #5176 built the initial framework for serializing physical
plans, but it hasn't been picked up since then.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]