andygrove opened a new pull request, #22341: URL: https://github.com/apache/datafusion/pull/22341
## Which issue does this PR close? - Closes #22339. ## Rationale for this change Issue #22339 asks for a Spark-compatible Parquet reader in the `datafusion-spark` crate, based on functionality that already exists in [apache/datafusion-comet](https://github.com/apache/datafusion-comet/blob/main/native/core/src/parquet/schema_adapter.rs). The schema adapter is the central piece of that reader — it rewrites physical expressions at planning time so that column references against the logical (query) schema resolve correctly to the physical (file) schema while preserving Spark's vectorized-reader semantics. This PR ports that schema adapter into `datafusion-spark` so DataFusion users can read Parquet files with Spark semantics by plugging a `SparkPhysicalExprAdapterFactory` into a `FileScanConfig`. ## What changes are included in this PR? A new `parquet` feature on `datafusion-spark` (off by default), exposing: - `SparkPhysicalExprAdapterFactory` / `SparkPhysicalExprAdapter` — implements `PhysicalExprAdapterFactory` for use via `FileScanConfigBuilder::with_expr_adapter`. - `SparkParquetOptions` with all the version-sensitive flags (`allow_type_promotion`, `return_null_struct_if_all_fields_missing`, `case_sensitive`, `use_field_id`, `ignore_missing_field_id`, etc.) and an `EvalMode` enum. - `spark_parquet_convert` — Spark-compatible nested struct/list/map adaptation, INT96 timezone reinterpret, FixedSizeBinary(16) → UUID rendering. - `SparkCastColumnExpr` — `PhysicalExpr` for column-level type adaptation (timestamp micros → millis, nested field-name relabel, fallback to `spark_parquet_convert`). - `RejectOnNonEmpty` — defers type-promotion rejection to runtime so empty Parquet files still pass (SPARK-26709). - `ParquetSchemaError` — the four Parquet-relevant error variants from Comet's `SparkError` (`SchemaConvert`, `MissingFieldIds`, `DuplicateFieldByFieldId`, `DuplicateFieldCaseInsensitive`). The full set of Spark vectorized-reader rejection rules is ported, including: - BINARY column rejection rules (no `int → string`, no `binary → decimal` without `DecimalLogicalTypeAnnotation`). - Decimal-to-decimal narrowing (`isDecimalTypeMatched`). - Integer-to-decimal narrowing (`canReadAsDecimal`). - Configurable type-promotion rejection (Spark 3.x rejects INT32→INT64 etc; Spark 4.x allows them). - Same-Spark-version rejections (long → narrower int, float → int, etc). - Scalar/complex type-shape mismatch (SPARK-45604). - Field-id and case-insensitive matching with duplicate detection. ### Per-Spark-version behavior Documented in `parquet/mod.rs`. Configured via `SparkParquetOptions` flags: - `allow_type_promotion` — Spark 3.x rejects INT32→INT64 / FLOAT→DOUBLE / INT32→DOUBLE; Spark 4.x allows. - `return_null_struct_if_all_fields_missing` — flips at SPARK-53535 (Spark 4.1+). - `eval_mode` — Spark 4.0 made `Ansi` the default. - TimestampNTZ is supported in the schema mapping. ### Simplified vs Comet (deliberately) - **Primitive scalar casts** stay as DataFusion's `CastExpr` rather than Comet's full Spark-compatible `Cast` `PhysicalExpr`. Spark's rejection rules still apply at the schema-adapter level; only the cast kernel itself is DataFusion's. Adding a Spark-specific `Cast` `PhysicalExpr` to `datafusion-spark` is a separate task. - **Error taxonomy**: only the four Parquet-relevant `SparkError` variants are ported. Comet's full `SparkError` enum exists for its JNI bridge to the Spark JVM, which is not relevant here. - **HDFS / object-store cache / JSON error serialization**: not ported (Comet-specific concerns). ## Are these changes tested? Yes — 27 new tests, all passing (5 unit + 22 integration tests that round-trip Parquet through `DataSourceExec`). The integration tests cover every rejection rule that has a corresponding test in Comet's `schema_adapter.rs`: - BINARY-as-non-string/binary, string-as-int, binary-as-decimal - Int32-as-narrow-decimal (rejected) / Int32-as-wide-decimal (allowed) - Int64-as-narrow-decimal, decimal precision/int-precision narrowing - Decimal widening (allowed sanity check) - INT64 → narrower int / float, float → int, double → float, int → float - INT32 / INT64 → date / timestamp without annotations - Date → Timestamp(LTZ), Timestamp → Date - Empty file with disallowed widening (SPARK-26709 — passes) - Non-empty file with disallowed widening (rejected at runtime) - Unsigned-int round-trip (Iceberg compatibility) - Case-insensitive duplicate field detection Total `cargo test -p datafusion-spark --features "core parquet" --lib`: **253 passed, 0 failed**. ## Are there any user-facing changes? Yes — new public API behind the `parquet` feature on `datafusion-spark`: ```rust use datafusion_spark::parquet::{ EvalMode, SparkParquetOptions, SparkPhysicalExprAdapterFactory, }; let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); options.allow_type_promotion = false; // Spark 3.x let factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); // Plug into FileScanConfigBuilder::with_expr_adapter ``` No changes to existing APIs; the new module is gated behind the optional `parquet` feature. 🤖 Generated with [Claude Code](https://claude.com/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
