This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch comet-parquet-exec
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/comet-parquet-exec by this
push:
new e3672f7e [comet-parquet-exec] Add unit test for reading a struct field
from Parquet (#1075)
e3672f7e is described below
commit e3672f7e45908e48435c7b91709880393e939c33
Author: Andy Grove <[email protected]>
AuthorDate: Wed Dec 4 12:17:02 2024 -0700
[comet-parquet-exec] Add unit test for reading a struct field from Parquet
(#1075)
* implement basic native code for casting struct to struct
* add another test
* rustdoc
* add scala side
* code cleanup
* clippy
* clippy
* add scala test
* improve test
* simple struct case passes
* save progress
* copy schema adapter code from DataFusion
* more tests pass
* save progress
* remove debug println
* remove debug println
---
native/core/src/execution/datafusion/mod.rs | 1 +
native/core/src/execution/datafusion/planner.rs | 8 +-
.../src/execution/datafusion/schema_adapter.rs | 278 +++++++++++++++++++++
native/spark-expr/src/cast.rs | 1 +
.../apache/comet/CometSparkSessionExtensions.scala | 3 +-
.../scala/org/apache/comet/DataTypeSupport.scala | 1 +
.../org/apache/comet/serde/QueryPlanSerde.scala | 2 +-
.../org/apache/comet/CometExpressionSuite.scala | 97 +++++++
8 files changed, 387 insertions(+), 4 deletions(-)
diff --git a/native/core/src/execution/datafusion/mod.rs
b/native/core/src/execution/datafusion/mod.rs
index 6f81ee91..fb9c8829 100644
--- a/native/core/src/execution/datafusion/mod.rs
+++ b/native/core/src/execution/datafusion/mod.rs
@@ -20,5 +20,6 @@
pub mod expressions;
mod operators;
pub mod planner;
+mod schema_adapter;
pub mod shuffle_writer;
mod util;
diff --git a/native/core/src/execution/datafusion/planner.rs
b/native/core/src/execution/datafusion/planner.rs
index babc6869..c5147d77 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -84,6 +84,7 @@ use datafusion::{
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder,
AggregateFunctionExpr};
+use crate::execution::datafusion::schema_adapter::CometSchemaAdapterFactory;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::FileScanConfig;
@@ -1094,8 +1095,11 @@ impl PhysicalPlanner {
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;
- let mut builder = ParquetExecBuilder::new(file_scan_config)
- .with_table_parquet_options(table_parquet_options);
+ let mut builder = ParquetExecBuilder::new(file_scan_config)
+ .with_table_parquet_options(table_parquet_options)
+ .with_schema_adapter_factory(
+ Arc::new(CometSchemaAdapterFactory::default()),
+ );
if let Some(filter) = test_data_filters {
builder = builder.with_predicate(filter);
diff --git a/native/core/src/execution/datafusion/schema_adapter.rs
b/native/core/src/execution/datafusion/schema_adapter.rs
new file mode 100644
index 00000000..16d4b9d6
--- /dev/null
+++ b/native/core/src/execution/datafusion/schema_adapter.rs
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Custom schema adapter that uses Spark-compatible casts
+
+use arrow::compute::can_cast_types;
+use arrow_array::{new_null_array, Array, RecordBatch, RecordBatchOptions};
+use arrow_schema::{DataType, Schema, SchemaRef};
+use datafusion::datasource::schema_adapter::{SchemaAdapter,
SchemaAdapterFactory, SchemaMapper};
+use datafusion_comet_spark_expr::{spark_cast, EvalMode};
+use datafusion_common::plan_err;
+use datafusion_expr::ColumnarValue;
+use std::sync::Arc;
+
+#[derive(Clone, Debug, Default)]
+pub struct CometSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for CometSchemaAdapterFactory {
+ /// Create a new factory for mapping batches from a file schema to a table
+ /// schema.
+ ///
+ /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
+ /// the same schema for both the projected table schema and the table
+ /// schema.
+ fn create(
+ &self,
+ projected_table_schema: SchemaRef,
+ table_schema: SchemaRef,
+ ) -> Box<dyn SchemaAdapter> {
+ Box::new(CometSchemaAdapter {
+ projected_table_schema,
+ table_schema,
+ })
+ }
+}
+
+/// This SchemaAdapter requires both the table schema and the projected table
+/// schema. See [`SchemaMapping`] for more details
+#[derive(Clone, Debug)]
+pub struct CometSchemaAdapter {
+ /// The schema for the table, projected to include only the fields being
output (projected) by the
+ /// associated ParquetExec
+ projected_table_schema: SchemaRef,
+ /// The entire table schema for the table we're using this to adapt.
+ ///
+ /// This is used to evaluate any filters pushed down into the scan
+ /// which may refer to columns that are not referred to anywhere
+ /// else in the plan.
+ table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for CometSchemaAdapter {
+ /// Map a column index in the table schema to a column index in a
particular
+ /// file schema
+ ///
+ /// Panics if index is not in range for the table schema
+ fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
+ let field = self.projected_table_schema.field(index);
+ Some(file_schema.fields.find(field.name())?.0)
+ }
+
+ /// Creates a `SchemaMapping` for casting or mapping the columns from the
+ /// file schema to the table schema.
+ ///
+ /// If the provided `file_schema` contains columns of a different type to
+ /// the expected `table_schema`, the method will attempt to cast the array
+ /// data from the file schema to the table schema where possible.
+ ///
+ /// Returns a [`SchemaMapping`] that can be applied to the output batch
+ /// along with an ordered list of columns to project from the file
+ fn map_schema(
+ &self,
+ file_schema: &Schema,
+ ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+ let mut projection = Vec::with_capacity(file_schema.fields().len());
+ let mut field_mappings = vec![None;
self.projected_table_schema.fields().len()];
+
+ for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+ if let Some((table_idx, table_field)) =
+ self.projected_table_schema.fields().find(file_field.name())
+ {
+ // workaround for struct casting
+ match (file_field.data_type(), table_field.data_type()) {
+ // TODO need to use Comet cast logic to determine which
casts are supported,
+ // but for now just add a hack to support casting between
struct types
+ (DataType::Struct(_), DataType::Struct(_)) => {
+ field_mappings[table_idx] = Some(projection.len());
+ projection.push(file_idx);
+ }
+ _ => {
+ if can_cast_types(file_field.data_type(),
table_field.data_type()) {
+ field_mappings[table_idx] = Some(projection.len());
+ projection.push(file_idx);
+ } else {
+ return plan_err!(
+ "Cannot cast file schema field {} of type {:?}
to table schema field of type {:?}",
+ file_field.name(),
+ file_field.data_type(),
+ table_field.data_type()
+ );
+ }
+ }
+ }
+ }
+ }
+
+ Ok((
+ Arc::new(SchemaMapping {
+ projected_table_schema: self.projected_table_schema.clone(),
+ field_mappings,
+ table_schema: self.table_schema.clone(),
+ }),
+ projection,
+ ))
+ }
+}
+
+// TODO SchemaMapping is mostly copied from DataFusion but calls spark_cast
+// instead of arrow cast - can we reduce the amount of code copied here and
make
+// the DataFusion version more extensible?
+
+/// The SchemaMapping struct holds a mapping from the file schema to the table
+/// schema and any necessary type conversions.
+///
+/// Note, because `map_batch` and `map_partial_batch` functions have different
+/// needs, this struct holds two schemas:
+///
+/// 1. The projected **table** schema
+/// 2. The full table schema
+///
+/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
+/// has the projected schema, since that's the schema which is supposed to come
+/// out of the execution of this query. Thus `map_batch` uses
+/// `projected_table_schema` as it can only operate on the projected fields.
+///
+/// [`map_partial_batch`] is used to create a RecordBatch with a schema that
+/// can be used for Parquet predicate pushdown, meaning that it may contain
+/// fields which are not in the projected schema (as the fields that parquet
+/// pushdown filters operate can be completely distinct from the fields that
are
+/// projected (output) out of the ParquetExec). `map_partial_batch` thus uses
+/// `table_schema` to create the resulting RecordBatch (as it could be
operating
+/// on any fields in the schema).
+///
+/// [`map_batch`]: Self::map_batch
+/// [`map_partial_batch`]: Self::map_partial_batch
+#[derive(Debug)]
+pub struct SchemaMapping {
+ /// The schema of the table. This is the expected schema after conversion
+ /// and it should match the schema of the query result.
+ projected_table_schema: SchemaRef,
+ /// Mapping from field index in `projected_table_schema` to index in
+ /// projected file_schema.
+ ///
+ /// They are Options instead of just plain `usize`s because the table could
+ /// have fields that don't exist in the file.
+ field_mappings: Vec<Option<usize>>,
+ /// The entire table schema, as opposed to the projected_table_schema
(which
+ /// only contains the columns that we are projecting out of this query).
+ /// This contains all fields in the table, regardless of if they will be
+ /// projected out or not.
+ table_schema: SchemaRef,
+}
+
+impl SchemaMapper for SchemaMapping {
+ /// Adapts a `RecordBatch` to match the `projected_table_schema` using the
stored mapping and
+ /// conversions. The produced RecordBatch has a schema that contains only
the projected
+ /// columns, so if one needs a RecordBatch with a schema that references
columns which are not
+ /// in the projected, it would be better to use `map_partial_batch`
+ fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch> {
+ let batch_rows = batch.num_rows();
+ let batch_cols = batch.columns().to_vec();
+
+ let cols = self
+ .projected_table_schema
+ // go through each field in the projected schema
+ .fields()
+ .iter()
+ // and zip it with the index that maps fields from the projected
table schema to the
+ // projected file schema in `batch`
+ .zip(&self.field_mappings)
+ // and for each one...
+ .map(|(field, file_idx)| {
+ file_idx.map_or_else(
+ // If this field only exists in the table, and not in the
file, then we know
+ // that it's null, so just return that.
+ || Ok(new_null_array(field.data_type(), batch_rows)),
+ // However, if it does exist in both, then try to cast it
to the correct output
+ // type
+ |batch_idx| {
+ spark_cast(
+
ColumnarValue::Array(Arc::clone(&batch_cols[batch_idx])),
+ field.data_type(),
+ // TODO need to pass in configs here
+ EvalMode::Legacy,
+ "UTC",
+ false,
+ )?
+ .into_array(batch_rows)
+ },
+ )
+ })
+ .collect::<datafusion_common::Result<Vec<_>, _>>()?;
+
+ // Necessary to handle empty batches
+ let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+
+ let schema = self.projected_table_schema.clone();
+ let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
+ Ok(record_batch)
+ }
+
+ /// Adapts a [`RecordBatch`]'s schema into one that has all the correct
output types and only
+ /// contains the fields that exist in both the file schema and table
schema.
+ ///
+ /// Unlike `map_batch` this method also preserves the columns that
+ /// may not appear in the final output (`projected_table_schema`) but may
+ /// appear in push down predicates
+ fn map_partial_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch> {
+ let batch_cols = batch.columns().to_vec();
+ let schema = batch.schema();
+
+ // for each field in the batch's schema (which is based on a file, not
a table)...
+ let (cols, fields) = schema
+ .fields()
+ .iter()
+ .zip(batch_cols.iter())
+ .flat_map(|(field, batch_col)| {
+ self.table_schema
+ // try to get the same field from the table schema that we
have stored in self
+ .field_with_name(field.name())
+ // and if we don't have it, that's fine, ignore it. This
may occur when we've
+ // created an external table whose fields are a subset of
the fields in this
+ // file, then tried to read data from the file into this
table. If that is the
+ // case here, it's fine to ignore because we don't care
about this field
+ // anyways
+ .ok()
+ // but if we do have it,
+ .map(|table_field| {
+ // try to cast it into the correct output type. we
don't want to ignore this
+ // error, though, so it's propagated.
+ spark_cast(
+ ColumnarValue::Array(Arc::clone(batch_col)),
+ table_field.data_type(),
+ // TODO need to pass in configs here
+ EvalMode::Legacy,
+ "UTC",
+ false,
+ )?.into_array(batch_col.len())
+ // and if that works, return the field and column.
+ .map(|new_col| (new_col, table_field.clone()))
+ })
+ })
+ .collect::<Result<Vec<_>, _>>()?
+ .into_iter()
+ .unzip::<_, _, Vec<_>, Vec<_>>();
+
+ // Necessary to handle empty batches
+ let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+
+ let schema = Arc::new(Schema::new_with_metadata(fields,
schema.metadata().clone()));
+ let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
+ Ok(record_batch)
+ }
+}
diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs
index 13263a59..8ef9ca29 100644
--- a/native/spark-expr/src/cast.rs
+++ b/native/spark-expr/src/cast.rs
@@ -623,6 +623,7 @@ fn cast_array(
) -> DataFusionResult<ArrayRef> {
let array = array_with_timezone(array, timezone.clone(), Some(to_type))?;
let from_type = array.data_type().clone();
+
let array = match &from_type {
DataType::Dictionary(key_type, value_type)
if key_type.as_ref() == &DataType::Int32
diff --git
a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
index 1028d046..32668f0d 100644
--- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
+++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala
@@ -977,7 +977,8 @@ class CometSparkSessionExtensions
if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
val info = new ExtendedExplainInfo()
if (info.extensionInfo(newPlan).nonEmpty) {
- logWarning(
+ // scalastyle:off println
+ println(
"Comet cannot execute some parts of this plan natively " +
s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false "
+
"to disable this logging):\n" +
diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
index c49a2c46..09c062b8 100644
--- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
+++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala
@@ -39,6 +39,7 @@ trait DataTypeSupport {
BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
true
case t: DataType if t.typeName == "timestamp_ntz" => true
+ case _: StructType => true
case _ => false
}
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 0cd4b3d9..7473e932 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -62,7 +62,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde
with CometExprShim
logWarning(s"Comet native execution is disabled due to: $reason")
}
- def supportedDataType(dt: DataType, allowStruct: Boolean = false): Boolean =
dt match {
+ def supportedDataType(dt: DataType, allowStruct: Boolean = true): Boolean =
dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _:
FloatType |
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _:
DecimalType |
_: DateType | _: BooleanType | _: NullType =>
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 0d00867d..85ac6138 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2200,6 +2200,103 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
+ test("get_struct_field with DataFusion ParquetExec - simple case") {
+ withTempPath { dir =>
+ // create input file with Comet disabled
+ withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+ val df = spark
+ .range(5)
+ // Add both a null struct and null inner value
+ .select(when(col("id") > 1, struct(when(col("id") > 2,
col("id")).alias("id")))
+ .alias("nested1"))
+
+ df.write.parquet(dir.toString())
+ }
+
+ Seq("parquet").foreach { v1List =>
+ withSQLConf(
+ SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
+
+ val df = spark.read.parquet(dir.toString())
+ checkSparkAnswerAndOperator(df.select("nested1.id"))
+ }
+ }
+ }
+ }
+
+ test("get_struct_field with DataFusion ParquetExec - select subset of
struct") {
+ withTempPath { dir =>
+ // create input file with Comet disabled
+ withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+ val df = spark
+ .range(5)
+ // Add both a null struct and null inner value
+ .select(
+ when(
+ col("id") > 1,
+ struct(
+ when(col("id") > 2, col("id")).alias("id"),
+ when(col("id") > 2, struct(when(col("id") > 3,
col("id")).alias("id")))
+ .as("nested2")))
+ .alias("nested1"))
+
+ df.write.parquet(dir.toString())
+ }
+
+ Seq("parquet").foreach { v1List =>
+ withSQLConf(
+ SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
+
+ val df = spark.read.parquet(dir.toString())
+
+ checkSparkAnswerAndOperator(df.select("nested1.id"))
+
+ checkSparkAnswerAndOperator(df.select("nested1.id",
"nested1.nested2.id"))
+
+ // unsupported cast from Int64 to Struct([Field { name: "id",
data_type: Int64, ...
+ // checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
+ }
+ }
+ }
+ }
+
+ // TODO this is not using DataFusion's ParquetExec for some reason
+ ignore("get_struct_field with DataFusion ParquetExec - read entire struct") {
+ withTempPath { dir =>
+ // create input file with Comet disabled
+ withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
+ val df = spark
+ .range(5)
+ // Add both a null struct and null inner value
+ .select(
+ when(
+ col("id") > 1,
+ struct(
+ when(col("id") > 2, col("id")).alias("id"),
+ when(col("id") > 2, struct(when(col("id") > 3,
col("id")).alias("id")))
+ .as("nested2")))
+ .alias("nested1"))
+
+ df.write.parquet(dir.toString())
+ }
+
+ Seq("parquet").foreach { v1List =>
+ withSQLConf(
+ SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
+ CometConf.COMET_ENABLED.key -> "true",
+ CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
+
+ val df = spark.read.parquet(dir.toString())
+ checkSparkAnswerAndOperator(df.select("nested1"))
+ }
+ }
+ }
+ }
+
test("CreateArray") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]