zhuqi-lucas opened a new issue, #21290:
URL: https://github.com/apache/datafusion/issues/21290

   ## Problem
   
   After upgrading from DataFusion 51 to 52, we hit several hard errors reading 
Parquet files that worked fine in DF 51. The root cause is that **DF 52 removed 
`SchemaAdapter`** ([PR 
#18998](https://github.com/apache/datafusion/pull/18998)), which previously 
handled schema mismatches transparently via `map_batch()`.
   
   In real-world data lakes, Parquet files accumulated over years often have 
schema inconsistencies that are unavoidable:
   
   1. **Nullability mismatch**: Files declare columns as `REQUIRED` 
(non-nullable) but actual data contains nulls — common with schema evolution or 
different writer implementations
   2. **List/Struct inner field differences**: e.g. `List(Field("conditions", 
Int32, false))` vs `List(Field("element", Int32, true))` — different Parquet 
writers use different field naming and nullability conventions
   3. **Type evolution**: Older files store columns as `Utf8`, newer table 
schema expects `Date32`
   
   These are not bugs — they are a natural reality of long-lived data lakes 
with multiple writers. DF 51 handled all of these gracefully and silently.
   
   ## How DF 51 handled this (worked perfectly)
   
   `SchemaAdapter::map_batch()` ran **after** the Parquet reader produced each 
`RecordBatch`:
   - Rebuilt batches using the table schema, **bypassing nullability validation 
entirely**
   - Called `arrow::compute::cast_with_options()` on each column for type 
coercion (e.g. `Utf8 → Date32`)
   - Transparently handled missing columns, renamed fields, and nullability 
differences
   
   This was a **safe, battle-tested approach** that made DataFusion robust for 
real-world Parquet files.
   
   ## What breaks in DF 52
   
   DF 52 replaced `SchemaAdapter` with `PhysicalExprAdapterFactory`, which only 
adapts filter/projection **expressions** — not batch data. Now:
   
   1. **`RecordBatch::try_new_with_options`** (arrow-rs) strictly validates 
`!field.is_nullable() && null_count > 0` and rejects batches. This happens 
**inside the Parquet reader stream**, before any user code can intercept.
   
   2. **`ArrowReaderMetadata::try_new`** (parquet-rs) rejects schema overrides 
where nullability differs — even for safe widening (non-nullable → nullable):
      ```
      Incompatible supplied Arrow schema: nullability mismatch for field X: 
expected true but found false
      ```
   
   3. **No escape hatch**: There's no option to skip validation in 
`RecordBatch`, `ArrowReaderMetadata`, or `ParquetRecordBatchStreamBuilder`. We 
cannot work around this from the outside.
   
   ## Concrete errors we hit
   
   ```
   # Non-nullable column with null data
   Arrow error: Invalid argument error: Column 'conditions' is declared as 
   non-nullable but contains null values
   
   # Schema override rejected when trying to force nullable
   Parquet error: Arrow: Incompatible supplied Arrow schema: 
   nullability mismatch for field location: expected true but found false
   ```
   
   ## Workarounds attempted (all failed or insufficient)
   
   | Approach | Result |
   |----------|--------|
   | Force nullable in `TableSchema` passed to `ParquetSource` | Parquet reader 
still uses file's physical schema internally → `RecordBatch` validation fails 
before our code runs |
   | Override schema via `ArrowReaderOptions::with_schema()` | 
`ArrowReaderMetadata::try_new` rejects nullable widening |
   | Wrap stream to fix batches post-read | Error occurs inside stream, batch 
never emitted |
   | Port `SchemaAdapter::map_batch()` logic into the opener | Partially works, 
but breaks on multiple layers — `ProjectionExprs`, `ArrowReaderMetadata`, 
`RecordBatch` all validate independently |
   
   ## Proposal: restore DF 51's automatic schema adaptation as an opt-in feature
   
   We believe we are not the only downstream affected by this breaking change. 
Any data lake with historical Parquet files from different writers or schema 
evolution will hit similar issues.
   
   **Could DataFusion provide an opt-in option to restore DF 51's 
`SchemaAdapter` behavior?** Specifically:
   
   1. **A `ParquetSource` or `ParquetOpener` option** (e.g. 
`with_lenient_schema(true)` or `with_schema_adapter(true)`) that, when enabled:
      - Allows nullable widening in `ArrowReaderMetadata::try_new` 
(non-nullable file → nullable table schema is always safe)
      - Applies `arrow::compute::cast()` for type mismatches (like the old 
`map_batch`)
      - Rebuilds `RecordBatch` with the table schema after reading, skipping 
strict nullability validation
      
   2. **This should be opt-in** (default off) to preserve the current strict 
behavior for users who want it, while giving data lake users a migration path.
   
   3. Alternatively, relaxing the `ArrowReaderMetadata::try_new` check in 
arrow-rs to **allow nullable widening** (non-nullable → nullable) would solve 
the most common case. This is always safe — non-nullable data is trivially 
valid in a nullable column. The current check treats widening and narrowing as 
equally invalid, which seems overly strict.
   
   ### Impact on complex downstream systems
   
   Our system is a large-scale data platform built on DataFusion with extensive 
customizations:
   - **Custom physical operators** (sort-preserving joins, cooperative 
execution, reverse scan, OneOf routing)
   - **Custom `ParquetFileReaderFactory`** with caching and instrumented I/O
   - **Custom `ExecutionPlanFactory`** that wraps 
`ParquetSource`/`ReverseParquetSource` with adapted schemas
   - **Materialized Views** with their own table schemas that propagate through 
`ProjectionExec` output
   
   The removal of `SchemaAdapter` affects **every layer** that touches Parquet 
schema — not just the reader, but also projections, partition column injection, 
expression evaluation, and output schema validation. Because validation happens 
independently at multiple levels (`ArrowReaderMetadata`, `RecordBatch`, 
`ProjectionExprs`), fixing it in one place often exposes failures in another. 
This made the DF 51 → 52 upgrade extremely difficult and error-prone for us.
   
   An upstream-supported compatibility strategy would be far more reliable than 
downstream workarounds, since the schema adaptation needs to be integrated at 
the core reader level where arrow-rs and DataFusion interact.
   
   Without this, upgrading from DF 51 to DF 52 requires either:
   - Rewriting all historical Parquet files (impractical for petabyte-scale 
lakes)
   - Forking both DataFusion **and** arrow-rs to patch validation at multiple 
layers (what we're currently doing — fragile and hard to maintain)
   - Staying on DF 51 indefinitely
   
   None of these are sustainable. We'd love to hear if others have encountered 
similar issues and what the community's recommended approach is.
   
   ## Environment
   
   - DataFusion 51 → 52 upgrade
   - arrow-rs / parquet 57.x
   - Production data lake with ~5 years of Parquet files from multiple 
writers/pipelines


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to