adriangb commented on code in PR #3052:
URL: https://github.com/apache/datafusion-comet/pull/3052#discussion_r2696454304
##########
native/core/src/parquet/schema_adapter.rs:
##########
@@ -16,19 +16,240 @@
// under the License.
//! Custom schema adapter that uses Spark-compatible conversions
+//!
+//! This module provides both:
+//! - The deprecated `SchemaAdapter` approach (for backwards compatibility)
+//! - The new `PhysicalExprAdapter` approach (recommended, works at planning
time)
+
+#![allow(deprecated)]
use crate::parquet::parquet_support::{spark_parquet_convert,
SparkParquetOptions};
-use arrow::array::{RecordBatch, RecordBatchOptions};
+use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Schema, SchemaRef};
-use datafusion::common::ColumnStatistics;
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::{ColumnStatistics, Result as DataFusionResult};
use datafusion::datasource::schema_adapter::{SchemaAdapter,
SchemaAdapterFactory, SchemaMapper};
+use datafusion::physical_expr::expressions::Column;
+use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ColumnarValue;
use datafusion::scalar::ScalarValue;
+use datafusion_comet_spark_expr::{Cast, SparkCastOptions};
+use datafusion_physical_expr_adapter::{
+ replace_columns_with_literals, DefaultPhysicalExprAdapterFactory,
PhysicalExprAdapter,
+ PhysicalExprAdapterFactory,
+};
use std::collections::HashMap;
use std::sync::Arc;
+// ============================================================================
+// New PhysicalExprAdapter Implementation (Recommended)
+// ============================================================================
+
+/// Factory for creating Spark-compatible physical expression adapters.
+///
+/// This factory creates adapters that rewrite expressions at planning time
+/// to inject Spark-compatible casts where needed.
+#[derive(Clone, Debug)]
+pub struct SparkPhysicalExprAdapterFactory {
+ /// Spark-specific parquet options for type conversions
+ parquet_options: SparkParquetOptions,
+ /// Default values for columns that may be missing from the physical
schema.
+ /// The key is the column index in the logical schema.
+ default_values: Option<HashMap<usize, ScalarValue>>,
+}
+
+impl SparkPhysicalExprAdapterFactory {
+ /// Create a new factory with the given options.
+ pub fn new(
+ parquet_options: SparkParquetOptions,
+ default_values: Option<HashMap<usize, ScalarValue>>,
+ ) -> Self {
+ Self {
+ parquet_options,
+ default_values,
+ }
+ }
+}
+
+impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
+ fn create(
+ &self,
+ logical_file_schema: SchemaRef,
+ physical_file_schema: SchemaRef,
+ ) -> Arc<dyn PhysicalExprAdapter> {
+ let default_factory = DefaultPhysicalExprAdapterFactory;
+ let default_adapter = default_factory.create(
+ Arc::clone(&logical_file_schema),
+ Arc::clone(&physical_file_schema),
+ );
+
+ Arc::new(SparkPhysicalExprAdapter {
+ logical_file_schema,
+ physical_file_schema,
+ parquet_options: self.parquet_options.clone(),
+ default_values: self.default_values.clone(),
+ default_adapter,
+ })
+ }
+}
+
+/// Spark-compatible physical expression adapter.
+///
+/// This adapter rewrites expressions at planning time to:
+/// 1. Replace references to missing columns with default values or nulls
+/// 2. Replace standard DataFusion cast expressions with Spark-compatible casts
+/// 3. Handle case-insensitive column matching
+#[derive(Debug)]
+struct SparkPhysicalExprAdapter {
+ /// The logical schema expected by the query
+ logical_file_schema: SchemaRef,
+ #[allow(dead_code)]
+ /// The physical schema of the actual file being read
+ physical_file_schema: SchemaRef,
+ /// Spark-specific options for type conversions
+ parquet_options: SparkParquetOptions,
+ /// Default values for missing columns (keyed by logical schema index)
+ default_values: Option<HashMap<usize, ScalarValue>>,
+ /// The default DataFusion adapter to delegate standard handling to
+ default_adapter: Arc<dyn PhysicalExprAdapter>,
+}
+
+impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
+ fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> DataFusionResult<Arc<dyn
PhysicalExpr>> {
+ // Step 1: Handle default values for missing columns
+ let expr = self.replace_missing_with_defaults(expr)?;
+
+ // Step 2: Delegate to default adapter for standard handling
+ // This handles: missing columns → nulls, type mismatches →
CastColumnExpr
+ let expr = self.default_adapter.rewrite(expr)?;
+
+ // Step 3: Replace CastColumnExpr with Spark-compatible Cast
expressions
+ expr.transform(|e| self.replace_with_spark_cast(e)).data()
+ }
+}
+
+impl SparkPhysicalExprAdapter {
+ /// Replace CastColumnExpr (DataFusion's cast) with Spark's Cast
expression.
+ fn replace_with_spark_cast(
+ &self,
+ expr: Arc<dyn PhysicalExpr>,
+ ) -> DataFusionResult<Transformed<Arc<dyn PhysicalExpr>>> {
+ // Check for CastColumnExpr and replace with spark_expr::Cast
+ // CastColumnExpr is in datafusion_physical_expr::expressions
+ if let Some(cast) = expr
+ .as_any()
+
.downcast_ref::<datafusion::physical_expr::expressions::CastColumnExpr>()
+ {
Review Comment:
Note from call: we are trading a `CastColumnExpr` -> `Cast`. The latter
doesn't have the ability to handle struct casts. So we should check if we are
casting from struct -> struct and if so not replace with the spark compatible
cast.
I think this is another example of why we need to unify `CastColumnExpr` and
`Cast`: ideally you'd be able to cast from `struct<c1: int, c2: int>[]` ->
`struct<c1: text>[]` while applying your Spark casting rules to the `c1: int`
-> `c2: text` cast but having DataFusion handle the list->struct->literal part.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]