schenksj opened a new pull request, #3932:
URL: https://github.com/apache/datafusion-comet/pull/3932

   ## Summary
   
   Adds native Delta Lake read support to Comet using `delta-kernel-rs` for log 
replay, matching all optimizations in the existing Iceberg native scan path. 
Delta tables (`spark.sql("SELECT ... FROM delta.\`/path\`")`) now execute 
through `CometDeltaNativeScanExec` → protobuf `DeltaScan` → Rust planner → 
Comet's tuned `ParquetSource`, preserving every Comet Parquet-read optimization 
(parallel I/O, range merging, page-index filtering, schema adapter for Spark 
semantics).
   
   ## Design
   
   ### Architecture
   
   ```
   Driver (Scala)                          Executor (Rust)
   ─────────────────                       ─────────────────
   CometScanRule                           OpStruct::DeltaScan match arm
     └─ detect DeltaParquetFileFormat        └─ deserialize DeltaScanCommon
     └─ stripDeltaDvWrappers                 └─ apply column mapping to 
data_schema
     └─ nativeDeltaScan validation           └─ rewrite filters 
(ColumnMappingFilterRewriter)
                                             └─ build PartitionedFiles from 
tasks
   CometExecRule                             └─ split by DV presence
     └─ CometDeltaNativeScan.convert()       └─ init_datasource_exec 
(ParquetSource)
        └─ JNI: Native.planDeltaScan()       └─ wrap with DeltaDvFilterExec (if 
DVs)
           └─ delta-kernel-rs log replay
           └─ DV materialization
           └─ column mapping extraction
        └─ partition pruning (static)
        └─ serialize DeltaScanCommon proto
   
   CometDeltaNativeScanExec
     └─ doPrepare() (DPP subqueries)
     └─ serializedPartitionData (lazy)
        └─ apply DPP filters
        └─ per-file split-mode serialization
     └─ DeltaPlanDataInjector (LRU-cached)
   ```
   
   ### Key Design Decisions
   
   1. **Kernel on driver, ParquetSource on executors** — `delta-kernel-rs` 
handles log replay + file enumeration once on the driver via JNI. Data reads go 
through Comet's existing `ParquetSource` (not kernel's `ArrowReader`), 
inheriting all Comet optimizations.
   
   2. **Arrow version isolation** — kernel pins arrow-57 / object_store-0.12 
internally; Comet uses arrow-58 / object_store-0.13. Only plain Rust types 
(`String`, `HashMap`, `Vec<u64>`) cross the boundary. Both arrow versions 
coexist in the dep tree without conflict.
   
   3. **Detection by class name** — `DeltaReflection` uses string-based class 
name matching (no compile-time dep on `spark-delta`), same pattern as Iceberg's 
`SparkBatchQueryScan` detection.
   
   4. **DV handling via plan-tree rewrite** — Delta's `PreprocessTableWithDVs` 
Catalyst strategy injects synthetic `__delta_internal_is_row_deleted` columns. 
`stripDeltaDvWrappers` undoes this at scan-rule time, and 
`CometDeltaDvConfigRule` disables the incompatible `useMetadataRowIndex` 
strategy automatically.
   
   ## Capabilities
   
   ### Phases Implemented
   
   | Phase | Feature | Status |
   |-------|---------|--------|
   | 0 | Dependency spike (delta_kernel + object_store + roaring) | ✅ |
   | 1 | Read-only happy path (unpartitioned, no DV, no column mapping) | ✅ |
   | 2 | Predicate pushdown (Catalyst → kernel predicate translation, 
stats-based file pruning) | ✅ |
   | 3 | Deletion vectors (inline + on-disk, materialized on driver, applied 
via DeltaDvFilterExec) | ✅ |
   | 4 | Column mapping (mode=id and mode=name, schema evolution with missing 
columns) | ✅ |
   | 5 | Split-mode serialization, per-file parallelism, partition pruning | ✅ |
   | 5b | Dynamic Partition Pruning (DPP via doPrepare + deferred task 
filtering) | ✅ |
   | 6 | Reader-feature gate (unsupported features → tagged fallback, not 
silent wrong results) | ✅ |
   
   ### Supported Delta Features
   
   - Partitioned and unpartitioned tables
   - Schema evolution (mergeSchema, missing columns → null)
   - Time travel (VERSION AS OF, TIMESTAMP AS OF)
   - Column mapping modes: none, id, name (including rename after write)
   - Deletion vectors (inline bitmaps + on-disk UUID files)
   - Stats-based file pruning via kernel predicates
   - Data filter pushdown into ParquetSource
   - Dynamic partition pruning through joins
   - Multi-column partitioning with typed columns (int, long, date, string, 
etc.)
   - Complex types (array, map, struct, deeply nested)
   - Cloud storage (S3/S3A, Azure ABFSS, GCS, local filesystem)
   - Protocol feature gating (rowTracking, typeWidening → graceful fallback)
   
   ## Iceberg Parity
   
   Every optimization in Comet's Iceberg path has a Delta equivalent:
   
   | # | Feature | Iceberg | Delta |
   |---|---------|---------|-------|
   | 1 | Split-mode serialization | ✅ lazy val + IcebergPlanDataInjector | ✅ 
lazy val + DeltaPlanDataInjector |
   | 2 | DPP support | ✅ doPrepare() + SubqueryAdaptiveBroadcastExec | ✅ 
doPrepare() + applyDppFilters() |
   | 3 | LRU cache in PlanDataInjector | ✅ 16-entry synchronized LinkedHashMap 
| ✅ identical pattern |
   | 4 | ImmutableSQLMetric | ✅ prevents accumulator merge overwrites | ✅ 
identical pattern |
   | 5 | Planning metrics | ✅ Iceberg V2 custom metrics | ✅ total_files, 
dv_files |
   | 6 | Runtime metrics | ✅ output_rows, num_splits | ✅ output_rows, 
num_splits |
   | 7 | doExecuteColumnar() | ✅ explicit CometExecRDD | ✅ identical pattern |
   | 8 | convertBlock() | ✅ preserves @transient fields | ✅ identical pattern |
   | 9 | Filesystem scheme validation | ✅ 9 schemes | ✅ same 9 schemes |
   | 10 | Schema adapter | ✅ SparkPhysicalExprAdapterFactory | ✅ same adapter |
   | 11 | Delete handling | ✅ iceberg-rust ArrowReader MOR | ✅ 
DeltaDvFilterExec per-batch masking |
   | 12 | Config gating | ✅ COMET_ICEBERG_NATIVE_ENABLED | ✅ 
COMET_DELTA_NATIVE_ENABLED |
   | 13 | Feature fallback | ✅ format version check | ✅ kernel 
unsupported_features gate |
   | 14 | Cloud credentials | ✅ Hadoop→Iceberg key mapping | ✅ Hadoop→kernel 
dual-key lookup |
   
   ### Intentional Differences (by design, not gaps)
   
   - **Rust execution**: Iceberg uses dedicated `IcebergScanExec` with 
iceberg-rust `ArrowReader`; Delta reuses `init_datasource_exec` → Comet's 
`ParquetSource` (gets parallel I/O and range merging for free)
   - **Proto dedup pools**: Iceberg has 8 deduplication pools for repeated 
schemas/partitions; Delta tasks are simpler and don't need pools
   - **Scan rule validation depth**: Iceberg validates 11+ conditions via 
reflection; Delta delegates most to kernel's built-in validation
   
   ## New Files
   
   | File | Purpose |
   |------|---------|
   | `native/core/src/delta/mod.rs` | Module root, quarantine documentation |
   | `native/core/src/delta/scan.rs` | `plan_delta_scan_with_predicate()` — 
kernel log replay |
   | `native/core/src/delta/engine.rs` | `DeltaStorageConfig` + 
`create_engine()` (S3/Azure/local) |
   | `native/core/src/delta/jni.rs` | 
`Java_org_apache_comet_Native_planDeltaScan` JNI entry |
   | `native/core/src/delta/predicate.rs` | Catalyst → kernel predicate 
translator |
   | `native/core/src/delta/error.rs` | `DeltaError` enum |
   | `native/core/src/execution/operators/delta_dv_filter.rs` | 
`DeltaDvFilterExec` — per-batch DV row masking |
   | `spark/.../CometDeltaNativeScanExec.scala` | Split-mode exec with DPP, 
metrics, lazy serialization |
   | `spark/.../CometDeltaNativeScan.scala` | Serde: JNI call, partition 
pruning, proto construction |
   | `spark/.../DeltaReflection.scala` | Class-name detection, table 
root/version extraction |
   | `spark/.../CometDeltaDvConfigRule` | Auto-configures 
useMetadataRowIndex=false |
   
   ## Configuration
   
   | Config | Default | Description |
   |--------|---------|-------------|
   | `spark.comet.scan.deltaNative.enabled` | `false` | Enable native Delta 
scan |
   | `spark.comet.scan.deltaNative.dataFileConcurrencyLimit` | `1` | Concurrent 
file reads per task (2-8 suggested) |
   | `spark.comet.scan.deltaNative.fallbackOnUnsupportedFeature` | `true` | 
Fallback to Spark on unsupported reader features |
   
   ## Test Plan
   
   - [x] **CometDeltaNativeSuite** (26 tests) — core reads, projections, 
filters, partitioning, schema evolution, time travel, complex types, primitive 
coverage
   - [x] **CometDeltaColumnMappingSuite** (5 tests) — column mapping (name/id), 
deletion vectors, DV + column mapping, column mapping + schema evolution
   - [x] **CometDeltaAdvancedSuite** (11 tests) — joins, aggregations, unions, 
window functions, DPP, DPP file pruning, planning metrics, filesystem scheme 
validation
   - [x] **CometFuzzDeltaSuite** — property-based testing with random schemas
   - [x] **DeltaReadFromS3Suite** — MinIO-based S3 integration tests
   - [x] All 82 tests passing (Spark 3.5)
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to