adriangb commented on code in PR #3047:
URL: https://github.com/apache/datafusion-comet/pull/3047#discussion_r2665779336
##########
native/core/src/parquet/schema_adapter.rs:
##########
@@ -16,19 +16,241 @@
// under the License.
//! Custom schema adapter that uses Spark-compatible conversions
+//!
+//! This module provides both:
+//! - The deprecated `SchemaAdapter` approach (for backwards compatibility)
+//! - The new `PhysicalExprAdapter` approach (recommended, works at planning
time)
use crate::parquet::parquet_support::{spark_parquet_convert,
SparkParquetOptions};
-use arrow::array::{RecordBatch, RecordBatchOptions};
+use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Schema, SchemaRef};
-use datafusion::common::ColumnStatistics;
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::{ColumnStatistics, Result as DataFusionResult};
use datafusion::datasource::schema_adapter::{SchemaAdapter,
SchemaAdapterFactory, SchemaMapper};
+use datafusion::physical_expr::expressions::Column;
+use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ColumnarValue;
use datafusion::scalar::ScalarValue;
+use datafusion_comet_spark_expr::{Cast, SparkCastOptions};
+use datafusion_physical_expr_adapter::{
+ DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter,
PhysicalExprAdapterFactory,
+ replace_columns_with_literals,
+};
use std::collections::HashMap;
use std::sync::Arc;
+// ============================================================================
+// New PhysicalExprAdapter Implementation (Recommended)
+// ============================================================================
+
+/// Factory for creating Spark-compatible physical expression adapters.
+///
+/// This factory creates adapters that rewrite expressions at planning time
+/// to inject Spark-compatible casts where needed.
+#[derive(Clone, Debug)]
+pub struct SparkPhysicalExprAdapterFactory {
+ /// Spark-specific parquet options for type conversions
+ parquet_options: SparkParquetOptions,
+ /// Default values for columns that may be missing from the physical
schema.
+ /// The key is the column index in the logical schema.
+ default_values: Option<HashMap<usize, ScalarValue>>,
+}
+
+impl SparkPhysicalExprAdapterFactory {
+ /// Create a new factory with the given options.
+ pub fn new(
+ parquet_options: SparkParquetOptions,
+ default_values: Option<HashMap<usize, ScalarValue>>,
+ ) -> Self {
+ Self {
+ parquet_options,
+ default_values,
+ }
+ }
+}
+
+impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
+ fn create(
+ &self,
+ logical_file_schema: SchemaRef,
+ physical_file_schema: SchemaRef,
+ ) -> Arc<dyn PhysicalExprAdapter> {
+ let default_factory = DefaultPhysicalExprAdapterFactory;
+ let default_adapter = default_factory.create(
+ Arc::clone(&logical_file_schema),
+ Arc::clone(&physical_file_schema),
+ );
+
+ Arc::new(SparkPhysicalExprAdapter {
+ logical_file_schema,
+ physical_file_schema,
+ parquet_options: self.parquet_options.clone(),
+ default_values: self.default_values.clone(),
+ default_adapter,
+ })
+ }
+}
+
+/// Spark-compatible physical expression adapter.
+///
+/// This adapter rewrites expressions at planning time to:
+/// 1. Replace references to missing columns with default values or nulls
+/// 2. Replace standard DataFusion cast expressions with Spark-compatible casts
+/// 3. Handle case-insensitive column matching
+#[derive(Debug)]
+struct SparkPhysicalExprAdapter {
+ /// The logical schema expected by the query
+ logical_file_schema: SchemaRef,
+ /// The physical schema of the actual file being read
+ physical_file_schema: SchemaRef,
+ /// Spark-specific options for type conversions
+ parquet_options: SparkParquetOptions,
+ /// Default values for missing columns (keyed by logical schema index)
+ default_values: Option<HashMap<usize, ScalarValue>>,
+ /// The default DataFusion adapter to delegate standard handling to
+ default_adapter: Arc<dyn PhysicalExprAdapter>,
+}
+
+impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
+ fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> DataFusionResult<Arc<dyn
PhysicalExpr>> {
+ // Step 1: Handle default values for missing columns
+ let expr = self.replace_missing_with_defaults(expr)?;
+
+ // Step 2: Delegate to default adapter for standard handling
+ // This handles: missing columns → nulls, type mismatches →
CastColumnExpr
+ let expr = self.default_adapter.rewrite(expr)?;
+
+ // Step 3: Replace CastColumnExpr with Spark-compatible Cast
expressions
+ expr.transform(|e| self.replace_with_spark_cast(e)).data()
+ }
+}
+
+impl SparkPhysicalExprAdapter {
+ /// Replace CastColumnExpr (DataFusion's cast) with Spark's Cast
expression.
+ fn replace_with_spark_cast(
+ &self,
+ expr: Arc<dyn PhysicalExpr>,
+ ) -> DataFusionResult<Transformed<Arc<dyn PhysicalExpr>>> {
+ // Check for CastColumnExpr and replace with spark_expr::Cast
+ // CastColumnExpr is in datafusion_physical_expr::expressions
+ if let Some(cast) = expr
+ .as_any()
+
.downcast_ref::<datafusion::physical_expr::expressions::CastColumnExpr>()
+ {
+ let child = cast.expr().clone();
+ let target_type = cast.target_field().data_type().clone();
+
+ // Create Spark-compatible cast options
+ let mut cast_options = SparkCastOptions::new(
+ self.parquet_options.eval_mode,
+ &self.parquet_options.timezone,
+ self.parquet_options.allow_incompat,
+ );
+ cast_options.allow_cast_unsigned_ints =
self.parquet_options.allow_cast_unsigned_ints;
+ cast_options.is_adapting_schema = true;
+
+ let spark_cast = Arc::new(Cast::new(child, target_type,
cast_options));
+
+ return Ok(Transformed::yes(spark_cast as Arc<dyn PhysicalExpr>));
+ }
+
+ Ok(Transformed::no(expr))
+ }
+
+ /// Replace references to missing columns with default values.
+ fn replace_missing_with_defaults(
+ &self,
+ expr: Arc<dyn PhysicalExpr>,
+ ) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
+ let Some(defaults) = &self.default_values else {
+ return Ok(expr);
+ };
+
+ if defaults.is_empty() {
+ return Ok(expr);
+ }
+
+ // Convert index-based defaults to name-based for
replace_columns_with_literals
+ let name_based: HashMap<&str, &ScalarValue> = defaults
+ .iter()
+ .filter_map(|(idx, val)| {
+ self.logical_file_schema
+ .fields()
+ .get(*idx)
+ .map(|f| (f.name().as_str(), val))
+ })
+ .collect();
+
+ if name_based.is_empty() {
+ return Ok(expr);
+ }
+
+ replace_columns_with_literals(expr, &name_based)
+ }
+}
+
+/// Adapt a batch to match the target schema using expression evaluation.
+///
+/// This function is useful for cases like Iceberg scanning where batches
+/// are read directly and need to be adapted to the expected schema.
+pub fn adapt_batch_with_expressions(
+ batch: RecordBatch,
+ target_schema: &SchemaRef,
+ parquet_options: &SparkParquetOptions,
+) -> DataFusionResult<RecordBatch> {
+ let file_schema = batch.schema();
+
+ // If schemas match, no adaptation needed
+ if file_schema.as_ref() == target_schema.as_ref() {
+ return Ok(batch);
+ }
+
+ // Create adapter
+ let factory =
SparkPhysicalExprAdapterFactory::new(parquet_options.clone(), None);
+ let adapter = factory.create(Arc::clone(target_schema),
Arc::clone(&file_schema));
+
+ // Create column projection expressions for target schema
+ let projection_exprs: Vec<Arc<dyn PhysicalExpr>> = target_schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, _field)| {
+ let col_expr: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema(
+ target_schema.field(i).name(),
+ target_schema.as_ref(),
+ )?);
+ adapter.rewrite(col_expr)
+ })
+ .collect::<DataFusionResult<Vec<_>>>()?;
+
+ // Evaluate expressions against batch
+ let columns: Vec<ArrayRef> = projection_exprs
+ .iter()
+ .map(|expr| {
+ expr.evaluate(&batch)?
+ .into_array(batch.num_rows())
+ .map_err(|e| e.into())
+ })
+ .collect::<DataFusionResult<Vec<_>>>()?;
+
+ RecordBatch::try_new(Arc::clone(target_schema), columns).map_err(|e|
e.into())
+}
+
+// ============================================================================
+// Legacy SchemaAdapter Implementation (Deprecated)
Review Comment:
Maybe just delete it?
##########
native/core/src/parquet/schema_adapter.rs:
##########
@@ -16,19 +16,241 @@
// under the License.
//! Custom schema adapter that uses Spark-compatible conversions
+//!
+//! This module provides both:
+//! - The deprecated `SchemaAdapter` approach (for backwards compatibility)
+//! - The new `PhysicalExprAdapter` approach (recommended, works at planning
time)
use crate::parquet::parquet_support::{spark_parquet_convert,
SparkParquetOptions};
-use arrow::array::{RecordBatch, RecordBatchOptions};
+use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Schema, SchemaRef};
-use datafusion::common::ColumnStatistics;
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::{ColumnStatistics, Result as DataFusionResult};
use datafusion::datasource::schema_adapter::{SchemaAdapter,
SchemaAdapterFactory, SchemaMapper};
+use datafusion::physical_expr::expressions::Column;
+use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ColumnarValue;
use datafusion::scalar::ScalarValue;
+use datafusion_comet_spark_expr::{Cast, SparkCastOptions};
+use datafusion_physical_expr_adapter::{
+ DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter,
PhysicalExprAdapterFactory,
+ replace_columns_with_literals,
+};
use std::collections::HashMap;
use std::sync::Arc;
+// ============================================================================
+// New PhysicalExprAdapter Implementation (Recommended)
+// ============================================================================
+
+/// Factory for creating Spark-compatible physical expression adapters.
+///
+/// This factory creates adapters that rewrite expressions at planning time
+/// to inject Spark-compatible casts where needed.
+#[derive(Clone, Debug)]
+pub struct SparkPhysicalExprAdapterFactory {
+ /// Spark-specific parquet options for type conversions
+ parquet_options: SparkParquetOptions,
+ /// Default values for columns that may be missing from the physical
schema.
+ /// The key is the column index in the logical schema.
+ default_values: Option<HashMap<usize, ScalarValue>>,
+}
+
+impl SparkPhysicalExprAdapterFactory {
+ /// Create a new factory with the given options.
+ pub fn new(
+ parquet_options: SparkParquetOptions,
+ default_values: Option<HashMap<usize, ScalarValue>>,
+ ) -> Self {
+ Self {
+ parquet_options,
+ default_values,
+ }
+ }
+}
+
+impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory {
+ fn create(
+ &self,
+ logical_file_schema: SchemaRef,
+ physical_file_schema: SchemaRef,
+ ) -> Arc<dyn PhysicalExprAdapter> {
+ let default_factory = DefaultPhysicalExprAdapterFactory;
+ let default_adapter = default_factory.create(
+ Arc::clone(&logical_file_schema),
+ Arc::clone(&physical_file_schema),
+ );
+
+ Arc::new(SparkPhysicalExprAdapter {
+ logical_file_schema,
+ physical_file_schema,
+ parquet_options: self.parquet_options.clone(),
+ default_values: self.default_values.clone(),
+ default_adapter,
+ })
+ }
+}
+
+/// Spark-compatible physical expression adapter.
+///
+/// This adapter rewrites expressions at planning time to:
+/// 1. Replace references to missing columns with default values or nulls
+/// 2. Replace standard DataFusion cast expressions with Spark-compatible casts
+/// 3. Handle case-insensitive column matching
+#[derive(Debug)]
+struct SparkPhysicalExprAdapter {
+ /// The logical schema expected by the query
+ logical_file_schema: SchemaRef,
+ /// The physical schema of the actual file being read
+ physical_file_schema: SchemaRef,
+ /// Spark-specific options for type conversions
+ parquet_options: SparkParquetOptions,
+ /// Default values for missing columns (keyed by logical schema index)
+ default_values: Option<HashMap<usize, ScalarValue>>,
+ /// The default DataFusion adapter to delegate standard handling to
+ default_adapter: Arc<dyn PhysicalExprAdapter>,
+}
+
+impl PhysicalExprAdapter for SparkPhysicalExprAdapter {
+ fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> DataFusionResult<Arc<dyn
PhysicalExpr>> {
+ // Step 1: Handle default values for missing columns
+ let expr = self.replace_missing_with_defaults(expr)?;
+
+ // Step 2: Delegate to default adapter for standard handling
+ // This handles: missing columns → nulls, type mismatches →
CastColumnExpr
+ let expr = self.default_adapter.rewrite(expr)?;
+
+ // Step 3: Replace CastColumnExpr with Spark-compatible Cast
expressions
+ expr.transform(|e| self.replace_with_spark_cast(e)).data()
+ }
+}
+
+impl SparkPhysicalExprAdapter {
+ /// Replace CastColumnExpr (DataFusion's cast) with Spark's Cast
expression.
+ fn replace_with_spark_cast(
+ &self,
+ expr: Arc<dyn PhysicalExpr>,
+ ) -> DataFusionResult<Transformed<Arc<dyn PhysicalExpr>>> {
+ // Check for CastColumnExpr and replace with spark_expr::Cast
+ // CastColumnExpr is in datafusion_physical_expr::expressions
+ if let Some(cast) = expr
+ .as_any()
+
.downcast_ref::<datafusion::physical_expr::expressions::CastColumnExpr>()
+ {
+ let child = cast.expr().clone();
+ let target_type = cast.target_field().data_type().clone();
+
+ // Create Spark-compatible cast options
+ let mut cast_options = SparkCastOptions::new(
+ self.parquet_options.eval_mode,
+ &self.parquet_options.timezone,
+ self.parquet_options.allow_incompat,
+ );
+ cast_options.allow_cast_unsigned_ints =
self.parquet_options.allow_cast_unsigned_ints;
+ cast_options.is_adapting_schema = true;
+
+ let spark_cast = Arc::new(Cast::new(child, target_type,
cast_options));
+
+ return Ok(Transformed::yes(spark_cast as Arc<dyn PhysicalExpr>));
+ }
+
+ Ok(Transformed::no(expr))
+ }
+
+ /// Replace references to missing columns with default values.
+ fn replace_missing_with_defaults(
+ &self,
+ expr: Arc<dyn PhysicalExpr>,
+ ) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
+ let Some(defaults) = &self.default_values else {
+ return Ok(expr);
+ };
+
+ if defaults.is_empty() {
+ return Ok(expr);
+ }
+
+ // Convert index-based defaults to name-based for
replace_columns_with_literals
+ let name_based: HashMap<&str, &ScalarValue> = defaults
+ .iter()
+ .filter_map(|(idx, val)| {
+ self.logical_file_schema
+ .fields()
+ .get(*idx)
+ .map(|f| (f.name().as_str(), val))
+ })
+ .collect();
+
+ if name_based.is_empty() {
+ return Ok(expr);
+ }
+
+ replace_columns_with_literals(expr, &name_based)
+ }
+}
+
+/// Adapt a batch to match the target schema using expression evaluation.
+///
+/// This function is useful for cases like Iceberg scanning where batches
+/// are read directly and need to be adapted to the expected schema.
+pub fn adapt_batch_with_expressions(
+ batch: RecordBatch,
+ target_schema: &SchemaRef,
+ parquet_options: &SparkParquetOptions,
+) -> DataFusionResult<RecordBatch> {
+ let file_schema = batch.schema();
+
+ // If schemas match, no adaptation needed
+ if file_schema.as_ref() == target_schema.as_ref() {
+ return Ok(batch);
+ }
+
+ // Create adapter
+ let factory =
SparkPhysicalExprAdapterFactory::new(parquet_options.clone(), None);
+ let adapter = factory.create(Arc::clone(target_schema),
Arc::clone(&file_schema));
+
+ // Create column projection expressions for target schema
+ let projection_exprs: Vec<Arc<dyn PhysicalExpr>> = target_schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, _field)| {
+ let col_expr: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema(
+ target_schema.field(i).name(),
+ target_schema.as_ref(),
+ )?);
+ adapter.rewrite(col_expr)
+ })
Review Comment:
```suggestion
.map(|(i, field)| {
let col_expr: Arc<dyn PhysicalExpr> = Arc::new(Column::new(
field.name(),
i,
));
adapter.rewrite(col_expr)
})
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]