andygrove opened a new pull request, #22341:
URL: https://github.com/apache/datafusion/pull/22341

   ## Which issue does this PR close?
   
   - Closes #22339.
   
   ## Rationale for this change
   
   Issue #22339 asks for a Spark-compatible Parquet reader in the 
`datafusion-spark` crate, based on functionality that already exists in 
[apache/datafusion-comet](https://github.com/apache/datafusion-comet/blob/main/native/core/src/parquet/schema_adapter.rs).
 The schema adapter is the central piece of that reader — it rewrites physical 
expressions at planning time so that column references against the logical 
(query) schema resolve correctly to the physical (file) schema while preserving 
Spark's vectorized-reader semantics.
   
   This PR ports that schema adapter into `datafusion-spark` so DataFusion 
users can read Parquet files with Spark semantics by plugging a 
`SparkPhysicalExprAdapterFactory` into a `FileScanConfig`.
   
   ## What changes are included in this PR?
   
   A new `parquet` feature on `datafusion-spark` (off by default), exposing:
   
   - `SparkPhysicalExprAdapterFactory` / `SparkPhysicalExprAdapter` — 
implements `PhysicalExprAdapterFactory` for use via 
`FileScanConfigBuilder::with_expr_adapter`.
   - `SparkParquetOptions` with all the version-sensitive flags 
(`allow_type_promotion`, `return_null_struct_if_all_fields_missing`, 
`case_sensitive`, `use_field_id`, `ignore_missing_field_id`, etc.) and an 
`EvalMode` enum.
   - `spark_parquet_convert` — Spark-compatible nested struct/list/map 
adaptation, INT96 timezone reinterpret, FixedSizeBinary(16) → UUID rendering.
   - `SparkCastColumnExpr` — `PhysicalExpr` for column-level type adaptation 
(timestamp micros → millis, nested field-name relabel, fallback to 
`spark_parquet_convert`).
   - `RejectOnNonEmpty` — defers type-promotion rejection to runtime so empty 
Parquet files still pass (SPARK-26709).
   - `ParquetSchemaError` — the four Parquet-relevant error variants from 
Comet's `SparkError` (`SchemaConvert`, `MissingFieldIds`, 
`DuplicateFieldByFieldId`, `DuplicateFieldCaseInsensitive`).
   
   The full set of Spark vectorized-reader rejection rules is ported, including:
   
   - BINARY column rejection rules (no `int → string`, no `binary → decimal` 
without `DecimalLogicalTypeAnnotation`).
   - Decimal-to-decimal narrowing (`isDecimalTypeMatched`).
   - Integer-to-decimal narrowing (`canReadAsDecimal`).
   - Configurable type-promotion rejection (Spark 3.x rejects INT32→INT64 etc; 
Spark 4.x allows them).
   - Same-Spark-version rejections (long → narrower int, float → int, etc).
   - Scalar/complex type-shape mismatch (SPARK-45604).
   - Field-id and case-insensitive matching with duplicate detection.
   
   ### Per-Spark-version behavior
   
   Documented in `parquet/mod.rs`. Configured via `SparkParquetOptions` flags:
   - `allow_type_promotion` — Spark 3.x rejects INT32→INT64 / FLOAT→DOUBLE / 
INT32→DOUBLE; Spark 4.x allows.
   - `return_null_struct_if_all_fields_missing` — flips at SPARK-53535 (Spark 
4.1+).
   - `eval_mode` — Spark 4.0 made `Ansi` the default.
   - TimestampNTZ is supported in the schema mapping.
   
   ### Simplified vs Comet (deliberately)
   
   - **Primitive scalar casts** stay as DataFusion's `CastExpr` rather than 
Comet's full Spark-compatible `Cast` `PhysicalExpr`. Spark's rejection rules 
still apply at the schema-adapter level; only the cast kernel itself is 
DataFusion's. Adding a Spark-specific `Cast` `PhysicalExpr` to 
`datafusion-spark` is a separate task.
   - **Error taxonomy**: only the four Parquet-relevant `SparkError` variants 
are ported. Comet's full `SparkError` enum exists for its JNI bridge to the 
Spark JVM, which is not relevant here.
   - **HDFS / object-store cache / JSON error serialization**: not ported 
(Comet-specific concerns).
   
   ## Are these changes tested?
   
   Yes — 27 new tests, all passing (5 unit + 22 integration tests that 
round-trip Parquet through `DataSourceExec`). The integration tests cover every 
rejection rule that has a corresponding test in Comet's `schema_adapter.rs`:
   
   - BINARY-as-non-string/binary, string-as-int, binary-as-decimal
   - Int32-as-narrow-decimal (rejected) / Int32-as-wide-decimal (allowed)
   - Int64-as-narrow-decimal, decimal precision/int-precision narrowing
   - Decimal widening (allowed sanity check)
   - INT64 → narrower int / float, float → int, double → float, int → float
   - INT32 / INT64 → date / timestamp without annotations
   - Date → Timestamp(LTZ), Timestamp → Date
   - Empty file with disallowed widening (SPARK-26709 — passes)
   - Non-empty file with disallowed widening (rejected at runtime)
   - Unsigned-int round-trip (Iceberg compatibility)
   - Case-insensitive duplicate field detection
   
   Total `cargo test -p datafusion-spark --features "core parquet" --lib`: 
**253 passed, 0 failed**.
   
   ## Are there any user-facing changes?
   
   Yes — new public API behind the `parquet` feature on `datafusion-spark`:
   
   ```rust
   use datafusion_spark::parquet::{
       EvalMode, SparkParquetOptions, SparkPhysicalExprAdapterFactory,
   };
   
   let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
   options.allow_type_promotion = false; // Spark 3.x
   
   let factory: Arc<dyn PhysicalExprAdapterFactory> =
       Arc::new(SparkPhysicalExprAdapterFactory::new(options, None));
   
   // Plug into FileScanConfigBuilder::with_expr_adapter
   ```
   
   No changes to existing APIs; the new module is gated behind the optional 
`parquet` feature.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to