This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch comet-parquet-exec
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/comet-parquet-exec by this 
push:
     new e3672f7e [comet-parquet-exec] Add unit test for reading a struct field 
from Parquet (#1075)
e3672f7e is described below

commit e3672f7e45908e48435c7b91709880393e939c33
Author: Andy Grove <[email protected]>
AuthorDate: Wed Dec 4 12:17:02 2024 -0700

    [comet-parquet-exec] Add unit test for reading a struct field from Parquet 
(#1075)
    
    * implement basic native code for casting struct to struct
    
    * add another test
    
    * rustdoc
    
    * add scala side
    
    * code cleanup
    
    * clippy
    
    * clippy
    
    * add scala test
    
    * improve test
    
    * simple struct case passes
    
    * save progress
    
    * copy schema adapter code from DataFusion
    
    * more tests pass
    
    * save progress
    
    * remove debug println
    
    * remove debug println
---
 native/core/src/execution/datafusion/mod.rs        |   1 +
 native/core/src/execution/datafusion/planner.rs    |   8 +-
 .../src/execution/datafusion/schema_adapter.rs     | 278 +++++++++++++++++++++
 native/spark-expr/src/cast.rs                      |   1 +
 .../apache/comet/CometSparkSessionExtensions.scala |   3 +-
 .../scala/org/apache/comet/DataTypeSupport.scala   |   1 +
 .../org/apache/comet/serde/QueryPlanSerde.scala    |   2 +-
 .../org/apache/comet/CometExpressionSuite.scala    |  97 +++++++
 8 files changed, 387 insertions(+), 4 deletions(-)

diff --git a/native/core/src/execution/datafusion/mod.rs 
b/native/core/src/execution/datafusion/mod.rs
index 6f81ee91..fb9c8829 100644
--- a/native/core/src/execution/datafusion/mod.rs
+++ b/native/core/src/execution/datafusion/mod.rs
@@ -20,5 +20,6 @@
 pub mod expressions;
 mod operators;
 pub mod planner;
+mod schema_adapter;
 pub mod shuffle_writer;
 mod util;
diff --git a/native/core/src/execution/datafusion/planner.rs 
b/native/core/src/execution/datafusion/planner.rs
index babc6869..c5147d77 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -84,6 +84,7 @@ use datafusion::{
 };
 use datafusion_physical_expr::aggregate::{AggregateExprBuilder, 
AggregateFunctionExpr};
 
+use crate::execution::datafusion::schema_adapter::CometSchemaAdapterFactory;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
 use datafusion::datasource::physical_plan::FileScanConfig;
@@ -1094,8 +1095,11 @@ impl PhysicalPlanner {
                 table_parquet_options.global.pushdown_filters = true;
                 table_parquet_options.global.reorder_filters = true;
 
-                let mut builder = ParquetExecBuilder::new(file_scan_config)
-                    .with_table_parquet_options(table_parquet_options);
+                    let mut builder = ParquetExecBuilder::new(file_scan_config)
+                        .with_table_parquet_options(table_parquet_options)
+                        .with_schema_adapter_factory(
+                            Arc::new(CometSchemaAdapterFactory::default()),
+                        );
 
                 if let Some(filter) = test_data_filters {
                     builder = builder.with_predicate(filter);
diff --git a/native/core/src/execution/datafusion/schema_adapter.rs 
b/native/core/src/execution/datafusion/schema_adapter.rs
new file mode 100644
index 00000000..16d4b9d6
--- /dev/null
+++ b/native/core/src/execution/datafusion/schema_adapter.rs
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Custom schema adapter that uses Spark-compatible casts
+
+use arrow::compute::can_cast_types;
+use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchOptions};
+use arrow_schema::{DataType, Schema, SchemaRef};
+use datafusion::datasource::schema_adapter::{SchemaAdapter, 
SchemaAdapterFactory, SchemaMapper};
+use datafusion_comet_spark_expr::{spark_cast, EvalMode};
+use datafusion_common::plan_err;
+use datafusion_expr::ColumnarValue;
+use std::sync::Arc;
+
+#[derive(Clone, Debug, Default)]
+pub struct CometSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for CometSchemaAdapterFactory {
+    /// Create a new factory for mapping batches from a file schema to a table
+    /// schema.
+    ///
+    /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
+    /// the same schema for both the projected table schema and the table
+    /// schema.
+    fn create(
+        &self,
+        projected_table_schema: SchemaRef,
+        table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter> {
+        Box::new(CometSchemaAdapter {
+            projected_table_schema,
+            table_schema,
+        })
+    }
+}
+
+/// This SchemaAdapter requires both the table schema and the projected table
+/// schema. See  [`SchemaMapping`] for more details
+#[derive(Clone, Debug)]
+pub struct CometSchemaAdapter {
+    /// The schema for the table, projected to include only the fields being 
output (projected) by the
+    /// associated ParquetExec
+    projected_table_schema: SchemaRef,
+    /// The entire table schema for the table we're using this to adapt.
+    ///
+    /// This is used to evaluate any filters pushed down into the scan
+    /// which may refer to columns that are not referred to anywhere
+    /// else in the plan.
+    table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for CometSchemaAdapter {
+    /// Map a column index in the table schema to a column index in a 
particular
+    /// file schema
+    ///
+    /// Panics if index is not in range for the table schema
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
+        let field = self.projected_table_schema.field(index);
+        Some(file_schema.fields.find(field.name())?.0)
+    }
+
+    /// Creates a `SchemaMapping` for casting or mapping the columns from the
+    /// file schema to the table schema.
+    ///
+    /// If the provided `file_schema` contains columns of a different type to
+    /// the expected `table_schema`, the method will attempt to cast the array
+    /// data from the file schema to the table schema where possible.
+    ///
+    /// Returns a [`SchemaMapping`] that can be applied to the output batch
+    /// along with an ordered list of columns to project from the file
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+        let mut projection = Vec::with_capacity(file_schema.fields().len());
+        let mut field_mappings = vec![None; 
self.projected_table_schema.fields().len()];
+
+        for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+            if let Some((table_idx, table_field)) =
+                self.projected_table_schema.fields().find(file_field.name())
+            {
+                // workaround for struct casting
+                match (file_field.data_type(), table_field.data_type()) {
+                    // TODO need to use Comet cast logic to determine which 
casts are supported,
+                    // but for now just add a hack to support casting between 
struct types
+                    (DataType::Struct(_), DataType::Struct(_)) => {
+                        field_mappings[table_idx] = Some(projection.len());
+                        projection.push(file_idx);
+                    }
+                    _ => {
+                        if can_cast_types(file_field.data_type(), 
table_field.data_type()) {
+                            field_mappings[table_idx] = Some(projection.len());
+                            projection.push(file_idx);
+                        } else {
+                            return plan_err!(
+                                "Cannot cast file schema field {} of type {:?} 
to table schema field of type {:?}",
+                                file_field.name(),
+                                file_field.data_type(),
+                                table_field.data_type()
+                            );
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok((
+            Arc::new(SchemaMapping {
+                projected_table_schema: self.projected_table_schema.clone(),
+                field_mappings,
+                table_schema: self.table_schema.clone(),
+            }),
+            projection,
+        ))
+    }
+}
+
+// TODO SchemaMapping is mostly copied from DataFusion but calls spark_cast
+// instead of arrow cast - can we reduce the amount of code copied here and 
make
+// the DataFusion version more extensible?
+
+/// The SchemaMapping struct holds a mapping from the file schema to the table
+/// schema and any necessary type conversions.
+///
+/// Note, because `map_batch` and `map_partial_batch` functions have different
+/// needs, this struct holds two schemas:
+///
+/// 1. The projected **table** schema
+/// 2. The full table schema
+///
+/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
+/// has the projected schema, since that's the schema which is supposed to come
+/// out of the execution of this query. Thus `map_batch` uses
+/// `projected_table_schema` as it can only operate on the projected fields.
+///
+/// [`map_partial_batch`]  is used to create a RecordBatch with a schema that
+/// can be used for Parquet predicate pushdown, meaning that it may contain
+/// fields which are not in the projected schema (as the fields that parquet
+/// pushdown filters operate can be completely distinct from the fields that 
are
+/// projected (output) out of the ParquetExec). `map_partial_batch` thus uses
+/// `table_schema` to create the resulting RecordBatch (as it could be 
operating
+/// on any fields in the schema).
+///
+/// [`map_batch`]: Self::map_batch
+/// [`map_partial_batch`]: Self::map_partial_batch
+#[derive(Debug)]
+pub struct SchemaMapping {
+    /// The schema of the table. This is the expected schema after conversion
+    /// and it should match the schema of the query result.
+    projected_table_schema: SchemaRef,
+    /// Mapping from field index in `projected_table_schema` to index in
+    /// projected file_schema.
+    ///
+    /// They are Options instead of just plain `usize`s because the table could
+    /// have fields that don't exist in the file.
+    field_mappings: Vec<Option<usize>>,
+    /// The entire table schema, as opposed to the projected_table_schema 
(which
+    /// only contains the columns that we are projecting out of this query).
+    /// This contains all fields in the table, regardless of if they will be
+    /// projected out or not.
+    table_schema: SchemaRef,
+}
+
+impl SchemaMapper for SchemaMapping {
+    /// Adapts a `RecordBatch` to match the `projected_table_schema` using the 
stored mapping and
+    /// conversions. The produced RecordBatch has a schema that contains only 
the projected
+    /// columns, so if one needs a RecordBatch with a schema that references 
columns which are not
+    /// in the projected, it would be better to use `map_partial_batch`
+    fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch> {
+        let batch_rows = batch.num_rows();
+        let batch_cols = batch.columns().to_vec();
+
+        let cols = self
+            .projected_table_schema
+            // go through each field in the projected schema
+            .fields()
+            .iter()
+            // and zip it with the index that maps fields from the projected 
table schema to the
+            // projected file schema in `batch`
+            .zip(&self.field_mappings)
+            // and for each one...
+            .map(|(field, file_idx)| {
+                file_idx.map_or_else(
+                    // If this field only exists in the table, and not in the 
file, then we know
+                    // that it's null, so just return that.
+                    || Ok(new_null_array(field.data_type(), batch_rows)),
+                    // However, if it does exist in both, then try to cast it 
to the correct output
+                    // type
+                    |batch_idx| {
+                        spark_cast(
+                            
ColumnarValue::Array(Arc::clone(&batch_cols[batch_idx])),
+                            field.data_type(),
+                            // TODO need to pass in configs here
+                            EvalMode::Legacy,
+                            "UTC",
+                            false,
+                        )?
+                        .into_array(batch_rows)
+                    },
+                )
+            })
+            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
+
+        // Necessary to handle empty batches
+        let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+
+        let schema = self.projected_table_schema.clone();
+        let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
+        Ok(record_batch)
+    }
+
+    /// Adapts a [`RecordBatch`]'s schema into one that has all the correct 
output types and only
+    /// contains the fields that exist in both the file schema and table 
schema.
+    ///
+    /// Unlike `map_batch` this method also preserves the columns that
+    /// may not appear in the final output (`projected_table_schema`) but may
+    /// appear in push down predicates
+    fn map_partial_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch> {
+        let batch_cols = batch.columns().to_vec();
+        let schema = batch.schema();
+
+        // for each field in the batch's schema (which is based on a file, not 
a table)...
+        let (cols, fields) = schema
+            .fields()
+            .iter()
+            .zip(batch_cols.iter())
+            .flat_map(|(field, batch_col)| {
+                self.table_schema
+                    // try to get the same field from the table schema that we 
have stored in self
+                    .field_with_name(field.name())
+                    // and if we don't have it, that's fine, ignore it. This 
may occur when we've
+                    // created an external table whose fields are a subset of 
the fields in this
+                    // file, then tried to read data from the file into this 
table. If that is the
+                    // case here, it's fine to ignore because we don't care 
about this field
+                    // anyways
+                    .ok()
+                    // but if we do have it,
+                    .map(|table_field| {
+                        // try to cast it into the correct output type. we 
don't want to ignore this
+                        // error, though, so it's propagated.
+                        spark_cast(
+                            ColumnarValue::Array(Arc::clone(batch_col)),
+                            table_field.data_type(),
+                            // TODO need to pass in configs here
+                            EvalMode::Legacy,
+                            "UTC",
+                            false,
+                        )?.into_array(batch_col.len())
+                        // and if that works, return the field and column.
+                        .map(|new_col| (new_col, table_field.clone()))
+                    })
+            })
+            .collect::<Result<Vec<_>, _>>()?
+            .into_iter()
+            .unzip::<_, _, Vec<_>, Vec<_>>();
+
+        // Necessary to handle empty batches
+        let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+
+        let schema = Arc::new(Schema::new_with_metadata(fields, 
schema.metadata().clone()));
+        let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
+        Ok(record_batch)
+    }
+}
diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs
index 13263a59..8ef9ca29 100644
--- a/native/spark-expr/src/cast.rs
+++ b/native/spark-expr/src/cast.rs
@@ -623,6 +623,7 @@ fn cast_array(
 ) -> DataFusionResult<ArrayRef> {
     let array = array_with_timezone(array, timezone.clone(), Some(to_type))?;
     let from_type = array.data_type().clone();
+
     let array = match &from_type {
         DataType::Dictionary(key_type, value_type)
             if key_type.as_ref() == &DataType::Int32
diff --git 
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala 
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 1028d046..32668f0d 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -977,7 +977,8 @@ class CometSparkSessionExtensions
         if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
           val info = new ExtendedExplainInfo()
           if (info.extensionInfo(newPlan).nonEmpty) {
-            logWarning(
+            // scalastyle:off println
+            println(
               "Comet cannot execute some parts of this plan natively " +
                 s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " 
+
                 "to disable this logging):\n" +
diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala 
b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
index c49a2c46..09c062b8 100644
--- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
+++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
@@ -39,6 +39,7 @@ trait DataTypeSupport {
         BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
       true
     case t: DataType if t.typeName == "timestamp_ntz" => true
+    case _: StructType => true
     case _ => false
   }
 
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 0cd4b3d9..7473e932 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -62,7 +62,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde 
with CometExprShim
     logWarning(s"Comet native execution is disabled due to: $reason")
   }
 
-  def supportedDataType(dt: DataType, allowStruct: Boolean = false): Boolean = 
dt match {
+  def supportedDataType(dt: DataType, allowStruct: Boolean = true): Boolean = 
dt match {
     case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: 
FloatType |
         _: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: 
DecimalType |
         _: DateType | _: BooleanType | _: NullType =>
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 0d00867d..85ac6138 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2200,6 +2200,103 @@ class CometExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  test("get_struct_field with DataFusion ParquetExec - simple case") {
+    withTempPath { dir =>
+      // create input file with Comet disabled
+      withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+        val df = spark
+          .range(5)
+          // Add both a null struct and null inner value
+          .select(when(col("id") > 1, struct(when(col("id") > 2, 
col("id")).alias("id")))
+            .alias("nested1"))
+
+        df.write.parquet(dir.toString())
+      }
+
+      Seq("parquet").foreach { v1List =>
+        withSQLConf(
+          SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
+          CometConf.COMET_ENABLED.key -> "true",
+          CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
+
+          val df = spark.read.parquet(dir.toString())
+          checkSparkAnswerAndOperator(df.select("nested1.id"))
+        }
+      }
+    }
+  }
+
+  test("get_struct_field with DataFusion ParquetExec - select subset of 
struct") {
+    withTempPath { dir =>
+      // create input file with Comet disabled
+      withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+        val df = spark
+          .range(5)
+          // Add both a null struct and null inner value
+          .select(
+            when(
+              col("id") > 1,
+              struct(
+                when(col("id") > 2, col("id")).alias("id"),
+                when(col("id") > 2, struct(when(col("id") > 3, 
col("id")).alias("id")))
+                  .as("nested2")))
+              .alias("nested1"))
+
+        df.write.parquet(dir.toString())
+      }
+
+      Seq("parquet").foreach { v1List =>
+        withSQLConf(
+          SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
+          CometConf.COMET_ENABLED.key -> "true",
+          CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
+
+          val df = spark.read.parquet(dir.toString())
+
+          checkSparkAnswerAndOperator(df.select("nested1.id"))
+
+          checkSparkAnswerAndOperator(df.select("nested1.id", 
"nested1.nested2.id"))
+
+          // unsupported cast from Int64 to Struct([Field { name: "id", 
data_type: Int64, ...
+          // checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
+        }
+      }
+    }
+  }
+
+  // TODO this is not using DataFusion's ParquetExec for some reason
+  ignore("get_struct_field with DataFusion ParquetExec - read entire struct") {
+    withTempPath { dir =>
+      // create input file with Comet disabled
+      withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+        val df = spark
+          .range(5)
+          // Add both a null struct and null inner value
+          .select(
+            when(
+              col("id") > 1,
+              struct(
+                when(col("id") > 2, col("id")).alias("id"),
+                when(col("id") > 2, struct(when(col("id") > 3, 
col("id")).alias("id")))
+                  .as("nested2")))
+              .alias("nested1"))
+
+        df.write.parquet(dir.toString())
+      }
+
+      Seq("parquet").foreach { v1List =>
+        withSQLConf(
+          SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
+          CometConf.COMET_ENABLED.key -> "true",
+          CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
+
+          val df = spark.read.parquet(dir.toString())
+          checkSparkAnswerAndOperator(df.select("nested1"))
+        }
+      }
+    }
+  }
+
   test("CreateArray") {
     Seq(true, false).foreach { dictionaryEnabled =>
       withTempDir { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to