andygrove opened a new issue, #4294:
URL: https://github.com/apache/datafusion-comet/issues/4294

   ## What is the problem the feature request solves?
   
   PR #4234 replaces the row-based input path in `CometMapInBatchExec` with a 
`CometArrowPythonRunner` that consumes columnar batches directly. The 
end-to-end speedup on Spark 4.0 is **1.92x narrow / 1.79x mixed / 1.42x wide** 
(vs the prior row-path numbers of 1.32x / 1.32x / 1.09x).
   
   The PR is _bulk-copy_, not true zero-copy. For every batch, Comet's buffers 
are `Unsafe.copyMemory`'d into Spark's allocator before the Arrow IPC stream is 
unloaded. The reason is that Comet's Parquet readers each construct their own 
independent `RootAllocator`, separate from `ArrowUtils.rootAllocator`. Arrow 
Java rejects cross-root transfers (`A buffer can only be associated between two 
allocators that share the same root`), so `TransferPair.transfer()` cannot move 
buffers from Comet's allocator into the Python runner's `VectorSchemaRoot`.
   
   If Comet's reader allocators shared a root with `ArrowUtils.rootAllocator`, 
the bulk-copy step in `CometColumnarPythonInput.writeNextBatchToArrowStream` 
would collapse to a pointer-swap, and the wide-row speedup should climb further 
(the wide-row case is where the copy cost is most concentrated; narrow rows are 
bottlenecked on Python fork/IPC).
   
   Related: #957, #4234.
   
   ## Describe the potential solution
   
   ### Change 1: allocator parent
   
   The five files under `common/src/main/java/org/apache/comet/parquet/` that 
today declare `new RootAllocator()` would instead allocate as children of 
`ArrowUtils.rootAllocator`:
   
   - `ColumnReader.java:55`
   - `NativeColumnReader.java:41`
   - `NativeBatchReader.java:154`
   - `ArrowConstantColumnReader.java:46`
   - `ArrowRowIndexColumnReader.java:39`
   
   Plus anywhere else \`new RootAllocator()\` appears in the read path.
   
   Result: every Comet buffer is in `ArrowUtils.rootAllocator`'s family. 
`makeTransferPair(target).transfer()` succeeds.
   
   ### Change 2: trait simplification (downstream)
   
   `CometColumnarPythonInput.copyVector` collapses from a tree walk + 
per-buffer memcpy to a single \`TransferPair.transfer()\` call per column. 
`CometVectorIpcCopier` in \`common\` is no longer needed and can be deleted. 
About 100 lines of code go away.
   
   ### Coupled question: shading
   
   `comet-common` currently shades `org.apache.arrow.*` into 
`org.apache.comet.shaded.arrow.*`. This was introduced so Comet's bundled Arrow 
(currently 18.x) doesn't conflict with Spark's bundled Arrow (~12 on Spark 3.4, 
~17 on Spark 4.0). With the allocator change, Comet's reader code would need to 
call `ArrowUtils.rootAllocator()` which returns a Spark-classpath (unshaded) 
allocator. Options:
   
   - **Un-shade `org.apache.arrow.memory.*`** in `common/pom.xml`. Memory 
classes are stable across Arrow versions, so the cross-version risk is 
contained. The vector / IPC packages can stay shaded if needed.
   - **Bridge via reflection** at the alloc site. Works but pollutes the 
perf-critical alloc path.
   
   Un-shading is the cleaner solution; the risk is bounded because we're only 
un-shading the memory subpackage, not the full Arrow tree.
   
   ### Coupled question: merge common and spark modules
   
   @andygrove notes that the original justification for `common` being a 
separate module (Java Iceberg integration) no longer applies. Merging `common` 
into `spark` would naturally force a decision on shading:
   
   - **Don't shade**: everything resolves against Spark's runtime Arrow. The 
allocator change above becomes trivial. Cost: Comet needs to be 
source-compatible with whatever Arrow API Spark ships on the oldest supported 
minor.
   - **Shade everything**: not viable because the merged module references 
Spark's unshaded `BasePythonRunner` etc.
   - **Partial shade**: same complexity as today, just in one module instead of 
two.
   
   If we go the un-shade route on the merged module, the bulk-copy → zero-copy 
transition becomes a one-commit refactor. The trade is that Comet's code paths 
that use Arrow features unique to newer versions (currently 18.x) need 
backports or guards for older Spark versions.
   
   This may be a bigger move than just the allocator change, but worth 
considering alongside since the two concerns intersect at exactly the shading 
boundary.
   
   ## Additional context
   
   Catches that need attention either way:
   
   1. **Reader lifetime.** Today each reader owns its allocator and closes it 
on `close()`. With shared-root allocators, ownership semantics change: closing 
a reader still frees its child allocator, but the root persists for the 
executor lifetime. Verify no reader code assumes `close()` deallocates the root.
   
   2. **Native FFI handoff.** Comet's native code allocates Arrow buffers and 
exports them via FFI to JVM Arrow `c.*` classes (the relocation excludes the C 
bridge: `org.apache.arrow.c.jni.JniWrapper` etc.). The buffers come out of the 
native allocator; the JVM side wraps them in JVM Arrow vectors via FFI. Confirm 
the wrap uses the shared root so the chain remains consistent end-to-end.
   
   3. **Cross-version Arrow API drift.** Spark 3.4 ships Arrow ~12, 4.0 ships 
~17. If Comet uses Arrow 18-only APIs in the reader path, those would need 
backports for the old-Spark-on-shared-allocator world. Most of Comet's Arrow 
use is via the memory + vector primitives that have been stable since Arrow 
10.x; targeted audit needed.
   
   4. **Benchmark target.** Re-run 
`spark/src/test/resources/pyspark/benchmark_pyarrow_udf.py` after the change. 
Headline numbers to beat:
   
      | workload | bulk-copy (current) | zero-copy (target) |
      |---|---|---|
      | narrow primitives | 1.92x | TBD; minor improvement expected (Python IPC 
dominates) |
      | mixed with strings | 1.79x | TBD |
      | wide rows (50 cols) | 1.42x | TBD; biggest improvement expected here |
   
   Once this lands, the PR #4234 user guide can drop the "Not zero-copy; bulk 
per-buffer memcpy" limitation paragraph.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to