This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 500b73f996 Substrait support for ParquetExec round trip for simple
select (#10949)
500b73f996 is described below
commit 500b73f996d871e4f7c1ec508413f8a2e312756f
Author: Xin Li <[email protected]>
AuthorDate: Tue Jun 18 21:27:15 2024 +0800
Substrait support for ParquetExec round trip for simple select (#10949)
* physical plan support no statistics
* implement simple physical plan substrait
* fix ci
* fix misc
* Add README.md for substrait test data
---
datafusion/substrait/src/physical_plan/consumer.rs | 87 ++++++++++++++++---
datafusion/substrait/src/physical_plan/producer.rs | 93 ++++++++++++++++++++-
.../tests/cases/roundtrip_physical_plan.rs | 92 +++++++++++++++++++-
datafusion/substrait/tests/testdata/Readme.md | 51 +++++++++++
datafusion/substrait/tests/testdata/data.parquet | Bin 0 -> 4342 bytes
datafusion/substrait/tests/testdata/empty.parquet | Bin 0 -> 976 bytes
6 files changed, 308 insertions(+), 15 deletions(-)
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs
b/datafusion/substrait/src/physical_plan/consumer.rs
index 39b38c94ec..5a8b888ef1 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -18,8 +18,8 @@
use std::collections::HashMap;
use std::sync::Arc;
-use datafusion::arrow::datatypes::Schema;
-use datafusion::common::not_impl_err;
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::common::{not_impl_err, substrait_err};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
@@ -30,7 +30,9 @@ use datafusion::prelude::SessionContext;
use async_recursion::async_recursion;
use chrono::DateTime;
use object_store::ObjectMeta;
+use substrait::proto::r#type::{Kind, Nullability};
use substrait::proto::read_rel::local_files::file_or_files::PathType;
+use substrait::proto::Type;
use substrait::proto::{
expression::MaskExpression, read_rel::ReadType, rel::RelType, Rel,
};
@@ -42,17 +44,42 @@ pub async fn from_substrait_rel(
rel: &Rel,
_extensions: &HashMap<u32, &String>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ let mut base_config;
+
match &rel.rel_type {
Some(RelType::Read(read)) => {
if read.filter.is_some() || read.best_effort_filter.is_some() {
return not_impl_err!("Read with filter is not supported");
}
- if read.base_schema.is_some() {
- return not_impl_err!("Read with schema is not supported");
- }
+
if read.advanced_extension.is_some() {
return not_impl_err!("Read with AdvancedExtension is not
supported");
}
+
+ let Some(schema) = read.base_schema.as_ref() else {
+ return substrait_err!("Missing base schema in the read");
+ };
+
+ let Some(r#struct) = schema.r#struct.as_ref() else {
+ return substrait_err!("Missing struct in the schema");
+ };
+
+ match schema
+ .names
+ .iter()
+ .zip(r#struct.types.iter())
+ .map(|(name, r#type)| to_field(name, r#type))
+ .collect::<Result<Vec<Field>>>()
+ {
+ Ok(fields) => {
+ base_config = FileScanConfig::new(
+ ObjectStoreUrl::local_filesystem(),
+ Arc::new(Schema::new(fields)),
+ );
+ }
+ Err(e) => return Err(e),
+ };
+
match &read.as_ref().read_type {
Some(ReadType::LocalFiles(files)) => {
let mut file_groups = vec![];
@@ -104,11 +131,7 @@ pub async fn from_substrait_rel(
file_groups[part_index].push(partitioned_file)
}
- let mut base_config = FileScanConfig::new(
- ObjectStoreUrl::local_filesystem(),
- Arc::new(Schema::empty()),
- )
- .with_file_groups(file_groups);
+ base_config = base_config.with_file_groups(file_groups);
if let Some(MaskExpression { select, .. }) =
&read.projection {
if let Some(projection) = &select.as_ref() {
@@ -132,3 +155,47 @@ pub async fn from_substrait_rel(
_ => not_impl_err!("Unsupported RelType: {:?}", rel.rel_type),
}
}
+
+fn to_field(name: &String, r#type: &Type) -> Result<Field> {
+ let Some(kind) = r#type.kind.as_ref() else {
+ return substrait_err!("Missing kind in the type with name {}", name);
+ };
+
+ let mut nullable = false;
+ let data_type = match kind {
+ Kind::Bool(boolean) => {
+ nullable = is_nullable(boolean.nullability);
+ Ok(DataType::Boolean)
+ }
+ Kind::I64(i64) => {
+ nullable = is_nullable(i64.nullability);
+ Ok(DataType::Int64)
+ }
+ Kind::Fp64(fp64) => {
+ nullable = is_nullable(fp64.nullability);
+ Ok(DataType::Float64)
+ }
+ Kind::String(string) => {
+ nullable = is_nullable(string.nullability);
+ Ok(DataType::Utf8)
+ }
+ _ => substrait_err!(
+ "Unsupported kind: {:?} in the type with name {}",
+ kind,
+ name
+ ),
+ }?;
+
+ Ok(Field::new(name, data_type, nullable))
+}
+
+fn is_nullable(nullability: i32) -> bool {
+ let Ok(nullability) = Nullability::try_from(nullability) else {
+ return true;
+ };
+
+ match nullability {
+ Nullability::Nullable | Nullability::Unspecified => true,
+ Nullability::Required => false,
+ }
+}
diff --git a/datafusion/substrait/src/physical_plan/producer.rs
b/datafusion/substrait/src/physical_plan/producer.rs
index ad87d7afb0..57fe68c4a7 100644
--- a/datafusion/substrait/src/physical_plan/producer.rs
+++ b/datafusion/substrait/src/physical_plan/producer.rs
@@ -15,12 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::arrow::datatypes::DataType;
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::{displayable, ExecutionPlan};
use std::collections::HashMap;
+use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
use substrait::proto::expression::MaskExpression;
-use substrait::proto::extensions;
+use substrait::proto::r#type::{
+ Boolean, Fp64, Kind, Nullability, String as SubstraitString, Struct, I64,
+};
use substrait::proto::read_rel::local_files::file_or_files::ParquetReadOptions;
use substrait::proto::read_rel::local_files::file_or_files::{FileFormat,
PathType};
use substrait::proto::read_rel::local_files::FileOrFiles;
@@ -29,6 +33,7 @@ use substrait::proto::read_rel::ReadType;
use substrait::proto::rel::RelType;
use substrait::proto::ReadRel;
use substrait::proto::Rel;
+use substrait::proto::{extensions, NamedStruct, Type};
/// Convert DataFusion ExecutionPlan to Substrait Rel
pub fn to_substrait_rel(
@@ -55,15 +60,56 @@ pub fn to_substrait_rel(
}
}
+ let mut names = vec![];
+ let mut types = vec![];
+
+ for field in base_config.file_schema.fields.iter() {
+ match to_substrait_type(field.data_type(), field.is_nullable()) {
+ Ok(t) => {
+ names.push(field.name().clone());
+ types.push(t);
+ }
+ Err(e) => return Err(e),
+ }
+ }
+
+ let type_info = Struct {
+ types,
+ // FIXME: duckdb doesn't set this field, keep it as default
variant 0.
+ //
https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1106-L1127
+ type_variation_reference: 0,
+ nullability: Nullability::Required.into(),
+ };
+
+ let mut select_struct = None;
+ if let Some(projection) = base_config.projection.as_ref() {
+ let struct_items = projection
+ .iter()
+ .map(|index| StructItem {
+ field: *index as i32,
+ // FIXME: duckdb sets this to None, but it's not clear why.
+ //
https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191
+ child: None,
+ })
+ .collect();
+
+ select_struct = Some(StructSelect { struct_items });
+ }
+
Ok(Box::new(Rel {
rel_type: Some(RelType::Read(Box::new(ReadRel {
common: None,
- base_schema: None,
+ base_schema: Some(NamedStruct {
+ names,
+ r#struct: Some(type_info),
+ }),
filter: None,
best_effort_filter: None,
projection: Some(MaskExpression {
- select: None,
- maintain_singular_struct: false,
+ select: select_struct,
+ // FIXME: duckdb set this to true, but it's not clear why.
+ //
https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1186.
+ maintain_singular_struct: true,
}),
advanced_extension: None,
read_type: Some(ReadType::LocalFiles(LocalFiles {
@@ -79,3 +125,42 @@ pub fn to_substrait_rel(
)))
}
}
+
+// see
https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L954-L1094.
+fn to_substrait_type(data_type: &DataType, nullable: bool) -> Result<Type> {
+ let nullability = if nullable {
+ Nullability::Nullable.into()
+ } else {
+ Nullability::Required.into()
+ };
+
+ match data_type {
+ DataType::Boolean => Ok(Type {
+ kind: Some(Kind::Bool(Boolean {
+ type_variation_reference: 0,
+ nullability,
+ })),
+ }),
+ DataType::Int64 => Ok(Type {
+ kind: Some(Kind::I64(I64 {
+ type_variation_reference: 0,
+ nullability,
+ })),
+ }),
+ DataType::Float64 => Ok(Type {
+ kind: Some(Kind::Fp64(Fp64 {
+ type_variation_reference: 0,
+ nullability,
+ })),
+ }),
+ DataType::Utf8 => Ok(Type {
+ kind: Some(Kind::String(SubstraitString {
+ type_variation_reference: 0,
+ nullability,
+ })),
+ }),
+ _ => Err(DataFusionError::Substrait(format!(
+ "Logical type {data_type} not implemented as substrait type"
+ ))),
+ }
+}
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index 4014670a7c..57fb3e2ee7 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -19,12 +19,13 @@ use std::collections::HashMap;
use std::sync::Arc;
use datafusion::arrow::datatypes::Schema;
+use datafusion::dataframe::DataFrame;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::error::Result;
use datafusion::physical_plan::{displayable, ExecutionPlan};
-use datafusion::prelude::SessionContext;
+use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_substrait::physical_plan::{consumer, producer};
use substrait::proto::extensions;
@@ -71,3 +72,92 @@ async fn parquet_exec() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn simple_select() -> Result<()> {
+ roundtrip("SELECT a, b FROM data").await
+}
+
+#[tokio::test]
+#[ignore = "This test is failing because the translation of the substrait plan
to the physical plan is not implemented yet"]
+async fn simple_select_alltypes() -> Result<()> {
+ roundtrip_alltypes("SELECT bool_col, int_col FROM alltypes_plain").await
+}
+
+#[tokio::test]
+async fn wildcard_select() -> Result<()> {
+ roundtrip("SELECT * FROM data").await
+}
+
+#[tokio::test]
+#[ignore = "This test is failing because the translation of the substrait plan
to the physical plan is not implemented yet"]
+async fn wildcard_select_alltypes() -> Result<()> {
+ roundtrip_alltypes("SELECT * FROM alltypes_plain").await
+}
+
+async fn roundtrip(sql: &str) -> Result<()> {
+ let ctx = create_parquet_context().await?;
+ let df = ctx.sql(sql).await?;
+
+ roundtrip_parquet(df).await?;
+
+ Ok(())
+}
+
+async fn roundtrip_alltypes(sql: &str) -> Result<()> {
+ let ctx = create_all_types_context().await?;
+ let df = ctx.sql(sql).await?;
+
+ roundtrip_parquet(df).await?;
+
+ Ok(())
+}
+
+async fn roundtrip_parquet(df: DataFrame) -> Result<()> {
+ let physical_plan = df.create_physical_plan().await?;
+
+ // Convert the plan into a substrait (protobuf) Rel
+ let mut extension_info = (vec![], HashMap::new());
+ let substrait_plan =
+ producer::to_substrait_rel(physical_plan.as_ref(), &mut
extension_info)?;
+
+ // Convert the substrait Rel back into a physical plan
+ let ctx = create_parquet_context().await?;
+ let physical_plan_roundtrip =
+ consumer::from_substrait_rel(&ctx, substrait_plan.as_ref(),
&HashMap::new())
+ .await?;
+
+ // Compare the original and roundtrip physical plans
+ let expected = format!("{}",
displayable(physical_plan.as_ref()).indent(true));
+ let actual = format!(
+ "{}",
+ displayable(physical_plan_roundtrip.as_ref()).indent(true)
+ );
+ assert_eq!(expected, actual);
+
+ Ok(())
+}
+
+async fn create_parquet_context() -> Result<SessionContext> {
+ let ctx = SessionContext::new();
+ let explicit_options = ParquetReadOptions::default();
+
+ ctx.register_parquet("data", "tests/testdata/data.parquet",
explicit_options)
+ .await?;
+
+ Ok(ctx)
+}
+
+async fn create_all_types_context() -> Result<SessionContext> {
+ let ctx = SessionContext::new();
+
+ let testdata = datafusion::test_util::parquet_test_data();
+ ctx.register_parquet(
+ "alltypes_plain",
+ &format!("{testdata}/alltypes_plain.parquet"),
+ ParquetReadOptions::default(),
+ )
+ .await?;
+
+ Ok(ctx)
+}
diff --git a/datafusion/substrait/tests/testdata/Readme.md
b/datafusion/substrait/tests/testdata/Readme.md
new file mode 100644
index 0000000000..c1bd48abf9
--- /dev/null
+++ b/datafusion/substrait/tests/testdata/Readme.md
@@ -0,0 +1,51 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Apache DataFusion Substrait Test Data
+
+This folder contains test data for the [substrait] crate.
+
+The substrait crate is at an init stage and many functions not implemented
yet. Compared to the
[parquet-testing](https://github.com/apache/parquet-testing) submodule, this
folder contains only simple test data evolving around the substrait producers
and consumers for [logical
plans](https://github.com/apache/datafusion/tree/main/datafusion/substrait/src/logical_plan)
and [physical
plans](https://github.com/apache/datafusion/tree/main/datafusion/substrait/src/physical_plan).
+
+## Test Data
+
+### Example Data
+
+-
[empty.csv](https://github.com/apache/datafusion/blob/main/datafusion/substrait/tests/testdata/empty.csv):
An empty CSV file.
+-
[empty.parquet](https://github.com/apache/datafusion/blob/main/datafusion/substrait/tests/testdata/empty.parquet):
An empty Parquet file with metadata only.
+-
[data.csv](https://github.com/apache/datafusion/blob/main/datafusion/substrait/tests/testdata/data.csv):
A simple CSV file with 6 columns and 2 rows.
+-
[data.parquet](https://github.com/apache/datafusion/blob/main/datafusion/substrait/tests/testdata/data.parquet):
A simple Parquet generated from the CSV file using `pandas`, e.g.,
+
+ ```python
+ import pandas as pd
+
+ df = pandas.read_csv('data.csv')
+ df.to_parquet('data.parquet')
+ ```
+
+### Add new test data
+
+To add a new test data, create a new file in this folder, reference it in the
test source file, e.g.,
+
+```rust
+let ctx = SessionContext::new();
+let explicit_options = ParquetReadOptions::default();
+
+ctx.register_parquet("data", "tests/testdata/data.parquet", explicit_options)
+```
diff --git a/datafusion/substrait/tests/testdata/data.parquet
b/datafusion/substrait/tests/testdata/data.parquet
new file mode 100644
index 0000000000..f9c03394db
Binary files /dev/null and b/datafusion/substrait/tests/testdata/data.parquet
differ
diff --git a/datafusion/substrait/tests/testdata/empty.parquet
b/datafusion/substrait/tests/testdata/empty.parquet
new file mode 100644
index 0000000000..3f135e77f4
Binary files /dev/null and b/datafusion/substrait/tests/testdata/empty.parquet
differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]