This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 500b73f996 Substrait support for ParquetExec round trip for simple 
select (#10949)
500b73f996 is described below

commit 500b73f996d871e4f7c1ec508413f8a2e312756f
Author: Xin Li <[email protected]>
AuthorDate: Tue Jun 18 21:27:15 2024 +0800

    Substrait support for ParquetExec round trip for simple select (#10949)
    
    * physical plan support no statistics
    
    * implement simple physical plan substrait
    
    * fix ci
    
    * fix misc
    
    * Add README.md for substrait test data
---
 datafusion/substrait/src/physical_plan/consumer.rs |  87 ++++++++++++++++---
 datafusion/substrait/src/physical_plan/producer.rs |  93 ++++++++++++++++++++-
 .../tests/cases/roundtrip_physical_plan.rs         |  92 +++++++++++++++++++-
 datafusion/substrait/tests/testdata/Readme.md      |  51 +++++++++++
 datafusion/substrait/tests/testdata/data.parquet   | Bin 0 -> 4342 bytes
 datafusion/substrait/tests/testdata/empty.parquet  | Bin 0 -> 976 bytes
 6 files changed, 308 insertions(+), 15 deletions(-)

diff --git a/datafusion/substrait/src/physical_plan/consumer.rs 
b/datafusion/substrait/src/physical_plan/consumer.rs
index 39b38c94ec..5a8b888ef1 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -18,8 +18,8 @@
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use datafusion::arrow::datatypes::Schema;
-use datafusion::common::not_impl_err;
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::common::{not_impl_err, substrait_err};
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
@@ -30,7 +30,9 @@ use datafusion::prelude::SessionContext;
 use async_recursion::async_recursion;
 use chrono::DateTime;
 use object_store::ObjectMeta;
+use substrait::proto::r#type::{Kind, Nullability};
 use substrait::proto::read_rel::local_files::file_or_files::PathType;
+use substrait::proto::Type;
 use substrait::proto::{
     expression::MaskExpression, read_rel::ReadType, rel::RelType, Rel,
 };
@@ -42,17 +44,42 @@ pub async fn from_substrait_rel(
     rel: &Rel,
     _extensions: &HashMap<u32, &String>,
 ) -> Result<Arc<dyn ExecutionPlan>> {
+    let mut base_config;
+
     match &rel.rel_type {
         Some(RelType::Read(read)) => {
             if read.filter.is_some() || read.best_effort_filter.is_some() {
                 return not_impl_err!("Read with filter is not supported");
             }
-            if read.base_schema.is_some() {
-                return not_impl_err!("Read with schema is not supported");
-            }
+
             if read.advanced_extension.is_some() {
                 return not_impl_err!("Read with AdvancedExtension is not 
supported");
             }
+
+            let Some(schema) = read.base_schema.as_ref() else {
+                return substrait_err!("Missing base schema in the read");
+            };
+
+            let Some(r#struct) = schema.r#struct.as_ref() else {
+                return substrait_err!("Missing struct in the schema");
+            };
+
+            match schema
+                .names
+                .iter()
+                .zip(r#struct.types.iter())
+                .map(|(name, r#type)| to_field(name, r#type))
+                .collect::<Result<Vec<Field>>>()
+            {
+                Ok(fields) => {
+                    base_config = FileScanConfig::new(
+                        ObjectStoreUrl::local_filesystem(),
+                        Arc::new(Schema::new(fields)),
+                    );
+                }
+                Err(e) => return Err(e),
+            };
+
             match &read.as_ref().read_type {
                 Some(ReadType::LocalFiles(files)) => {
                     let mut file_groups = vec![];
@@ -104,11 +131,7 @@ pub async fn from_substrait_rel(
                         file_groups[part_index].push(partitioned_file)
                     }
 
-                    let mut base_config = FileScanConfig::new(
-                        ObjectStoreUrl::local_filesystem(),
-                        Arc::new(Schema::empty()),
-                    )
-                    .with_file_groups(file_groups);
+                    base_config = base_config.with_file_groups(file_groups);
 
                     if let Some(MaskExpression { select, .. }) = 
&read.projection {
                         if let Some(projection) = &select.as_ref() {
@@ -132,3 +155,47 @@ pub async fn from_substrait_rel(
         _ => not_impl_err!("Unsupported RelType: {:?}", rel.rel_type),
     }
 }
+
+fn to_field(name: &String, r#type: &Type) -> Result<Field> {
+    let Some(kind) = r#type.kind.as_ref() else {
+        return substrait_err!("Missing kind in the type with name {}", name);
+    };
+
+    let mut nullable = false;
+    let data_type = match kind {
+        Kind::Bool(boolean) => {
+            nullable = is_nullable(boolean.nullability);
+            Ok(DataType::Boolean)
+        }
+        Kind::I64(i64) => {
+            nullable = is_nullable(i64.nullability);
+            Ok(DataType::Int64)
+        }
+        Kind::Fp64(fp64) => {
+            nullable = is_nullable(fp64.nullability);
+            Ok(DataType::Float64)
+        }
+        Kind::String(string) => {
+            nullable = is_nullable(string.nullability);
+            Ok(DataType::Utf8)
+        }
+        _ => substrait_err!(
+            "Unsupported kind: {:?} in the type with name {}",
+            kind,
+            name
+        ),
+    }?;
+
+    Ok(Field::new(name, data_type, nullable))
+}
+
+fn is_nullable(nullability: i32) -> bool {
+    let Ok(nullability) = Nullability::try_from(nullability) else {
+        return true;
+    };
+
+    match nullability {
+        Nullability::Nullable | Nullability::Unspecified => true,
+        Nullability::Required => false,
+    }
+}
diff --git a/datafusion/substrait/src/physical_plan/producer.rs 
b/datafusion/substrait/src/physical_plan/producer.rs
index ad87d7afb0..57fe68c4a7 100644
--- a/datafusion/substrait/src/physical_plan/producer.rs
+++ b/datafusion/substrait/src/physical_plan/producer.rs
@@ -15,12 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::arrow::datatypes::DataType;
 use datafusion::datasource::physical_plan::ParquetExec;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::physical_plan::{displayable, ExecutionPlan};
 use std::collections::HashMap;
+use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
 use substrait::proto::expression::MaskExpression;
-use substrait::proto::extensions;
+use substrait::proto::r#type::{
+    Boolean, Fp64, Kind, Nullability, String as SubstraitString, Struct, I64,
+};
 use substrait::proto::read_rel::local_files::file_or_files::ParquetReadOptions;
 use substrait::proto::read_rel::local_files::file_or_files::{FileFormat, 
PathType};
 use substrait::proto::read_rel::local_files::FileOrFiles;
@@ -29,6 +33,7 @@ use substrait::proto::read_rel::ReadType;
 use substrait::proto::rel::RelType;
 use substrait::proto::ReadRel;
 use substrait::proto::Rel;
+use substrait::proto::{extensions, NamedStruct, Type};
 
 /// Convert DataFusion ExecutionPlan to Substrait Rel
 pub fn to_substrait_rel(
@@ -55,15 +60,56 @@ pub fn to_substrait_rel(
             }
         }
 
+        let mut names = vec![];
+        let mut types = vec![];
+
+        for field in base_config.file_schema.fields.iter() {
+            match to_substrait_type(field.data_type(), field.is_nullable()) {
+                Ok(t) => {
+                    names.push(field.name().clone());
+                    types.push(t);
+                }
+                Err(e) => return Err(e),
+            }
+        }
+
+        let type_info = Struct {
+            types,
+            // FIXME: duckdb doesn't set this field, keep it as default 
variant 0.
+            // 
https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1106-L1127
+            type_variation_reference: 0,
+            nullability: Nullability::Required.into(),
+        };
+
+        let mut select_struct = None;
+        if let Some(projection) = base_config.projection.as_ref() {
+            let struct_items = projection
+                .iter()
+                .map(|index| StructItem {
+                    field: *index as i32,
+                    // FIXME: duckdb sets this to None, but it's not clear why.
+                    // 
https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191
+                    child: None,
+                })
+                .collect();
+
+            select_struct = Some(StructSelect { struct_items });
+        }
+
         Ok(Box::new(Rel {
             rel_type: Some(RelType::Read(Box::new(ReadRel {
                 common: None,
-                base_schema: None,
+                base_schema: Some(NamedStruct {
+                    names,
+                    r#struct: Some(type_info),
+                }),
                 filter: None,
                 best_effort_filter: None,
                 projection: Some(MaskExpression {
-                    select: None,
-                    maintain_singular_struct: false,
+                    select: select_struct,
+                    // FIXME: duckdb set this to true, but it's not clear why.
+                    // 
https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1186.
+                    maintain_singular_struct: true,
                 }),
                 advanced_extension: None,
                 read_type: Some(ReadType::LocalFiles(LocalFiles {
@@ -79,3 +125,42 @@ pub fn to_substrait_rel(
         )))
     }
 }
+
+// see 
https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L954-L1094.
+fn to_substrait_type(data_type: &DataType, nullable: bool) -> Result<Type> {
+    let nullability = if nullable {
+        Nullability::Nullable.into()
+    } else {
+        Nullability::Required.into()
+    };
+
+    match data_type {
+        DataType::Boolean => Ok(Type {
+            kind: Some(Kind::Bool(Boolean {
+                type_variation_reference: 0,
+                nullability,
+            })),
+        }),
+        DataType::Int64 => Ok(Type {
+            kind: Some(Kind::I64(I64 {
+                type_variation_reference: 0,
+                nullability,
+            })),
+        }),
+        DataType::Float64 => Ok(Type {
+            kind: Some(Kind::Fp64(Fp64 {
+                type_variation_reference: 0,
+                nullability,
+            })),
+        }),
+        DataType::Utf8 => Ok(Type {
+            kind: Some(Kind::String(SubstraitString {
+                type_variation_reference: 0,
+                nullability,
+            })),
+        }),
+        _ => Err(DataFusionError::Substrait(format!(
+            "Logical type {data_type} not implemented as substrait type"
+        ))),
+    }
+}
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index 4014670a7c..57fb3e2ee7 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -19,12 +19,13 @@ use std::collections::HashMap;
 use std::sync::Arc;
 
 use datafusion::arrow::datatypes::Schema;
+use datafusion::dataframe::DataFrame;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
 use datafusion::error::Result;
 use datafusion::physical_plan::{displayable, ExecutionPlan};
-use datafusion::prelude::SessionContext;
+use datafusion::prelude::{ParquetReadOptions, SessionContext};
 use datafusion_substrait::physical_plan::{consumer, producer};
 
 use substrait::proto::extensions;
@@ -71,3 +72,92 @@ async fn parquet_exec() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn simple_select() -> Result<()> {
+    roundtrip("SELECT a, b FROM data").await
+}
+
+#[tokio::test]
+#[ignore = "This test is failing because the translation of the substrait plan 
to the physical plan is not implemented yet"]
+async fn simple_select_alltypes() -> Result<()> {
+    roundtrip_alltypes("SELECT bool_col, int_col FROM alltypes_plain").await
+}
+
+#[tokio::test]
+async fn wildcard_select() -> Result<()> {
+    roundtrip("SELECT * FROM data").await
+}
+
+#[tokio::test]
+#[ignore = "This test is failing because the translation of the substrait plan 
to the physical plan is not implemented yet"]
+async fn wildcard_select_alltypes() -> Result<()> {
+    roundtrip_alltypes("SELECT * FROM alltypes_plain").await
+}
+
+async fn roundtrip(sql: &str) -> Result<()> {
+    let ctx = create_parquet_context().await?;
+    let df = ctx.sql(sql).await?;
+
+    roundtrip_parquet(df).await?;
+
+    Ok(())
+}
+
+async fn roundtrip_alltypes(sql: &str) -> Result<()> {
+    let ctx = create_all_types_context().await?;
+    let df = ctx.sql(sql).await?;
+
+    roundtrip_parquet(df).await?;
+
+    Ok(())
+}
+
+async fn roundtrip_parquet(df: DataFrame) -> Result<()> {
+    let physical_plan = df.create_physical_plan().await?;
+
+    // Convert the plan into a substrait (protobuf) Rel
+    let mut extension_info = (vec![], HashMap::new());
+    let substrait_plan =
+        producer::to_substrait_rel(physical_plan.as_ref(), &mut 
extension_info)?;
+
+    // Convert the substrait Rel back into a physical plan
+    let ctx = create_parquet_context().await?;
+    let physical_plan_roundtrip =
+        consumer::from_substrait_rel(&ctx, substrait_plan.as_ref(), 
&HashMap::new())
+            .await?;
+
+    // Compare the original and roundtrip physical plans
+    let expected = format!("{}", 
displayable(physical_plan.as_ref()).indent(true));
+    let actual = format!(
+        "{}",
+        displayable(physical_plan_roundtrip.as_ref()).indent(true)
+    );
+    assert_eq!(expected, actual);
+
+    Ok(())
+}
+
+async fn create_parquet_context() -> Result<SessionContext> {
+    let ctx = SessionContext::new();
+    let explicit_options = ParquetReadOptions::default();
+
+    ctx.register_parquet("data", "tests/testdata/data.parquet", 
explicit_options)
+        .await?;
+
+    Ok(ctx)
+}
+
+async fn create_all_types_context() -> Result<SessionContext> {
+    let ctx = SessionContext::new();
+
+    let testdata = datafusion::test_util::parquet_test_data();
+    ctx.register_parquet(
+        "alltypes_plain",
+        &format!("{testdata}/alltypes_plain.parquet"),
+        ParquetReadOptions::default(),
+    )
+    .await?;
+
+    Ok(ctx)
+}
diff --git a/datafusion/substrait/tests/testdata/Readme.md 
b/datafusion/substrait/tests/testdata/Readme.md
new file mode 100644
index 0000000000..c1bd48abf9
--- /dev/null
+++ b/datafusion/substrait/tests/testdata/Readme.md
@@ -0,0 +1,51 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# Apache DataFusion Substrait Test Data
+
+This folder contains test data for the [substrait] crate.
+
+The substrait crate is at an init stage and many functions not implemented 
yet. Compared to the 
[parquet-testing](https://github.com/apache/parquet-testing) submodule, this 
folder contains only simple test data evolving around the substrait producers 
and consumers for [logical 
plans](https://github.com/apache/datafusion/tree/main/datafusion/substrait/src/logical_plan)
 and [physical 
plans](https://github.com/apache/datafusion/tree/main/datafusion/substrait/src/physical_plan).
+
+## Test Data
+
+### Example Data
+
+- 
[empty.csv](https://github.com/apache/datafusion/blob/main/datafusion/substrait/tests/testdata/empty.csv):
 An empty CSV file.
+- 
[empty.parquet](https://github.com/apache/datafusion/blob/main/datafusion/substrait/tests/testdata/empty.parquet):
 An empty Parquet file with metadata only.
+- 
[data.csv](https://github.com/apache/datafusion/blob/main/datafusion/substrait/tests/testdata/data.csv):
 A simple CSV file with 6 columns and 2 rows.
+- 
[data.parquet](https://github.com/apache/datafusion/blob/main/datafusion/substrait/tests/testdata/data.parquet):
 A simple Parquet generated from the CSV file using `pandas`, e.g.,
+
+  ```python
+  import pandas as pd
+
+  df = pandas.read_csv('data.csv')
+  df.to_parquet('data.parquet')
+  ```
+
+### Add new test data
+
+To add a new test data, create a new file in this folder, reference it in the 
test source file, e.g.,
+
+```rust
+let ctx = SessionContext::new();
+let explicit_options = ParquetReadOptions::default();
+
+ctx.register_parquet("data", "tests/testdata/data.parquet", explicit_options)
+```
diff --git a/datafusion/substrait/tests/testdata/data.parquet 
b/datafusion/substrait/tests/testdata/data.parquet
new file mode 100644
index 0000000000..f9c03394db
Binary files /dev/null and b/datafusion/substrait/tests/testdata/data.parquet 
differ
diff --git a/datafusion/substrait/tests/testdata/empty.parquet 
b/datafusion/substrait/tests/testdata/empty.parquet
new file mode 100644
index 0000000000..3f135e77f4
Binary files /dev/null and b/datafusion/substrait/tests/testdata/empty.parquet 
differ


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to