mbutrovich commented on code in PR #3536:
URL: https://github.com/apache/datafusion-comet/pull/3536#discussion_r2835128784


##########
native/core/src/parquet/schema_adapter.rs:
##########
@@ -15,276 +15,482 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Custom schema adapter that uses Spark-compatible conversions
-
+use crate::parquet::cast_column::CometCastColumnExpr;
 use crate::parquet::parquet_support::{spark_parquet_convert, 
SparkParquetOptions};
-use arrow::array::{RecordBatch, RecordBatchOptions};
-use arrow::datatypes::{Schema, SchemaRef};
-use datafusion::common::ColumnStatistics;
-use datafusion::datasource::schema_adapter::{SchemaAdapter, 
SchemaAdapterFactory, SchemaMapper};
+use arrow::array::{ArrayRef, RecordBatch};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::Result as DataFusionResult;
+use datafusion::physical_expr::expressions::Column;
+use datafusion::physical_expr::PhysicalExpr;
 use datafusion::physical_plan::ColumnarValue;
 use datafusion::scalar::ScalarValue;
+use datafusion_comet_spark_expr::{Cast, SparkCastOptions};
+use datafusion_physical_expr_adapter::{
+    replace_columns_with_literals, DefaultPhysicalExprAdapterFactory, 
PhysicalExprAdapter,
+    PhysicalExprAdapterFactory,
+};
 use std::collections::HashMap;
 use std::sync::Arc;
 
-/// An implementation of DataFusion's `SchemaAdapterFactory` that uses a 
Spark-compatible
-/// `cast` implementation.
+/// Factory for creating Spark-compatible physical expression adapters.
+///
+/// This factory creates adapters that rewrite expressions at planning time
+/// to inject Spark-compatible casts where needed.
 #[derive(Clone, Debug)]
-pub struct SparkSchemaAdapterFactory {
-    /// Spark cast options
+pub struct SparkPhysicalExprAdapterFactory {
+    /// Spark-specific parquet options for type conversions
     parquet_options: SparkParquetOptions,
-    default_values: Option<HashMap<usize, ScalarValue>>,
+    /// Default values for columns that may be missing from the physical 
schema.
+    /// The key is the Column (containing name and index).
+    default_values: Option<HashMap<Column, ScalarValue>>,
 }
 
-impl SparkSchemaAdapterFactory {
+impl SparkPhysicalExprAdapterFactory {
+    /// Create a new factory with the given options.
     pub fn new(
-        options: SparkParquetOptions,
-        default_values: Option<HashMap<usize, ScalarValue>>,
+        parquet_options: SparkParquetOptions,
+        default_values: Option<HashMap<Column, ScalarValue>>,
     ) -> Self {
         Self {
-            parquet_options: options,
+            parquet_options,
             default_values,
         }
     }
 }
 
-impl SchemaAdapterFactory for SparkSchemaAdapterFactory {
-    /// Create a new factory for mapping batches from a file schema to a table
-    /// schema.
-    ///
-    /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
-    /// the same schema for both the projected table schema and the table
-    /// schema.
+/// Remap physical schema field names to match logical schema field names using
+/// case-insensitive matching. This allows the DefaultPhysicalExprAdapter 
(which
+/// uses exact name matching) to correctly find columns when the parquet file 
has

Review Comment:
   Spark has some configs around case sensitivity for column names, I think. Do 
we need an issue to respect that, or is it something we cannot respect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to