mbutrovich commented on PR #4393:
URL: 
https://github.com/apache/datafusion-comet/pull/4393#issuecomment-4557800295

   ## Branch status
   
   JVM and native input now use the canonical Arrow C Stream Interface: 
`Data.exportArrayStream` on the JVM, `ArrowArrayStreamReader` on the native 
side. The bespoke `CometBatchIterator` JNI machinery and the `arrow_ffi_safe` 
flag are gone. Closes #3770. Net diff vs main: ~1070 insertions, ~660 deletions 
across 35 files.
   
   JVM-side input is unified behind three `ArrowReader` subclasses, replacing 
the per-shape conversion paths that funneled into `CometBatchIterator`:
   - `RowArrowReader` for `InternalRow` input
   - `SparkColumnarArrowReader` for non-Comet `ColumnarBatch` input
   - `ColumnarBatchArrowReader` for Comet `ColumnarBatch` input
   
   `CometExecRDD` / `CometExecIterator` / `operators.scala` route input slots 
from the protobuf (`findShuffleScanIndices`) instead of a conf flag, so JVM and 
native agree on which slot is `ShuffleScan` vs `Scan`. Allocator/reader/stream 
lifecycle is bound to `TaskContext` with rollback on partial-setup failure.
   
   Native side: `ScanExec.input_source: 
Option<Arc<Mutex<ArrowArrayStreamReader>>>`. The `arrow_ffi_safe`-gated 
deep-copy branch is removed; `copy_or_unpack_array` is preserved as the 
boundary that strips dictionary encoding before downstream DataFusion operators 
see it.
   
   ## Bucket status
   
   | Bucket | Status |
   |---|---|
   | B1 NullType end-to-end | Landed |
   | B2 TimeType fallback via `DataTypeSupport` | Landed |
   | B3b Iceberg silent corruption | Resolved (consumer-owned reader lifecycle) 
|
   | B4 Stale window-suite assertion | Landed |
   | B6 Subquery under `CometLocalTableScan` | Expected transitive on B1, 
pending CI |
   | B3a Nested-type nullability residual | Deferred |
   | B5 Plan-shape / explain / metrics | Deferred, needs skip-list discussion |
   | B7 Long tail | Deferred, individual fixes |
   
   ## Recent fixes since the last CI sweep
   
   ### Dictionary-encoded `ColumnarBatch` input
   
   Native `HashAggregate`'s row converter emits `Dictionary<Int32, T>` columns 
for string group keys. `CometColumnarShuffle` round-trips them, so the next 
stage's input is a `CometDictionaryVector`. `ColumnarBatchArrowReader` builds 
its stable VSR from the logical (non-dict) Spark schema; the unload/load step 
then trips inside `VectorLoader.loadBuffers` with `no more buffers for field 
...: Utf8. Expected 3` before any data reaches the C Stream. Fix: decode 
dictionary source columns via `DictionaryEncoder.decode` before 
`VectorUnloader`. Native still unpacks downstream via `copy_or_unpack_array`, 
so end-to-end semantics are unchanged. Caught by `CometAggregateSuite "multiple 
column distinct count"`.
   
   ### Shaded `ArrayStreamExporter` `IllegalAccessError`
   
   `org.apache.arrow.c.ArrayStreamExporter$ExportedArrayStreamPrivateData` is 
already excluded from relocation because Arrow's JNI looks it up by literal 
classname (`jni_wrapper.cc:341`). The outer `ArrayStreamExporter` was being 
shaded, which split outer and inner across packages and broke package-private 
constructor access. Fix: also exclude the outer from relocation. It's `final 
class` package-private, an implementation detail, no public-API clash risk. 
Caught by `ParquetEncryptionITCase`.
   
   ## In-flight
   
   Latest commit pushed; CI sweep running. Will update once it settles.
   
   ## Deferred (won't block this PR)
   
   - **B3a, nested struct nullability residual.** Reproducer: 
`DataFrameSetOperationsSuite "SPARK-35756: unionByName support struct having 
same col names but different sequence"`. `unionByName` reorders the right side 
via `named_struct(...)` (children become nullable); the left passes through 
(children stay non-null); DataFusion strict-validates Union/Project schemas. 
Only reproduces under the `spark.plugins=...CometPlugin` path; no 
`CometTestBase` reproducer yet.
   - **B5, plan-shape / explain / metrics assertions** in upstream Spark SQL. 
~40 tests grep for Spark-only nodes (`WholeStageCodegen`, `FilterExec`, 
`BroadcastHashJoinExec`) or codegen-stage IDs Comet operators don't emit. Needs 
an upstream-test skip list or per-test 
`spark.comet.exec.localTableScan.enabled=false`.
   - **B7, long tail.** ULP-level float math (asinh / acosh / cosh / tan / cot 
/ cbrt / pow / atan2), `bit_length`/`octet_length` on `BinaryType`, `null IN 
()` returning `false` instead of `null`, `RuntimeNullChecksV2Writes` 
error-class wrapper depth, `to_binary`/`unhex` error class, `collect_set` 
ordering. Each is a separate fix.
   
   ## Splittable into separate PRs
   
   Once CI is clean these can peel off independently from the `localTableScan` 
default flip:
   
   1. **NullType end-to-end** (B1): `Utils.scala`, `CometShuffleExchangeExec`, 
`native/shuffle/src/spark_unsafe/row.rs`, three test suites.
   2. **TimeType fallback via `DataTypeSupport`** (B2): self-contained 
type-check addition on `CometLocalTableScanExec`.
   3. **Stale window-suite assertion fix** (B4): one-line test fix.
   4. **Arrow C Stream Interface input + close of #3770**: the bulk of this PR. 
Doesn't strictly require the default flip but unblocks it.
   5. **Default flip for `localTableScan.enabled = true`**: lands last, after 
B3a residuals are resolved or accepted.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to