mbutrovich opened a new pull request, #4267:
URL: https://github.com/apache/datafusion-comet/pull/4267

   Draft while we discuss with #4233 and #4239.
   
   ## Which issue does this PR close?
   
   Closes #.
   
   ## Rationale for this change
   
   #4232 merged the JVM UDF bridge: a JNI path that lets native execution call 
`CometUDF` implementations on the JVM, with Arrow FFI for data exchange. Two 
follow-ups build on it:
   
   - **#4239** hand-writes five Arrow-vector `CometUDF` classes for the 
remaining Spark regex expressions (`regexp_extract`, `regexp_extract_all`, 
`regexp_instr`, `regexp_replace`, `split`) and flips the default regex engine 
from `rust` to `java`. Every new expression costs a hand-written UDF plus serde 
glue.
   - **#4233** exposes the bridge to end users: `CometUdfRegistry` maps a Spark 
UDF name to a user-authored `CometUDF` class, and `CometScalaUdf` serde routes 
registered names to native. Users re-implement their UDF as an Arrow-vector 
`CometUDF` (or opt into a `registerColumnarOnly` stub that throws if Spark 
falls back).
   
   Both approaches require a hand-written Arrow-vector implementation per 
expression. That does not scale to arbitrary `ScalaUDF`s or to Catalyst 
expressions that already emit Spark codegen.
   
   This PR adds a codegen-based dispatcher on top of #4232 that compiles a 
batch-kernel `CometUDF` directly from a bound Catalyst `Expression` via Janino. 
Any expression whose children and output type are supported routes through 
native without a hand-written `CometUDF`. The same dispatcher covers user 
`ScalaUDF`s (the case #4233 targets, without the reimplementation step) and the 
regex expressions from #4239 (as a direct comparison against the hand-written 
baselines).
   
   Operating on bound Catalyst `Expression` trees makes composition cheap in 
three places:
   
   - **Engineering**: one dispatcher and one serde entry point covers N 
expressions. Adding a new Catalyst expression typically means relaxing 
`CometBatchKernelCodegen.canHandle`, not writing a new `CometUDF` class plus 
its serde.
   - **Planning**: a UDF subtree is not an opaque boundary. Ordinary Catalyst 
expressions, registered `ScalaUDF`s, and unregistered `ScalaUDF`s live in the 
same tree and are serialized together, so Comet keeps the surrounding native 
operators in place.
   - **Execution**: an entire expression subtree (e.g. `f(g(a), h(b))` or 
`upper(udf(x))`) compiles into a single generated batch kernel. One JNI 
crossing per batch for the whole subtree, not one per nesting level; 
intermediate results stay on the JVM heap.
   
   The dispatcher is opt-in per serde and gated by 
`spark.comet.exec.codegenDispatch.mode = auto | force | disabled`. Existing 
paths (#4232 rlike, #4239 regex family, #4233 user registry) remain the 
defaults until benchmarks justify flipping specific shapes.
   
   Intended scope is narrower than "any expression". The primary targets are 
string expressions (where JVM-Rust differences in collation and regex engine 
semantics make a JVM path the more faithful baseline) and custom `ScalaUDF`s 
(where no Rust implementation exists). For numeric and other expression 
families with native Rust kernels, the native path is almost certainly faster 
and this dispatcher is not meant to replace it.
   
   ## What changes are included in this PR?
   
   **Codegen dispatcher**
   
   - `CometBatchKernelCodegen` - takes a bound `Expression`, runs Spark's 
`CodegenContext` once, and emits a Janino-compiled `CometUDF` that loops over 
input Arrow vectors in a generated batch kernel. Three-layer cache: JVM-wide 
compiled-class cache, thread-local UDF instance cache (inherited from #4232), 
per-partition kernel cache.
   - `CometCodegenDispatchUDF` - the `CometUDF` the bridge dispatches to. 
Carries the bound expression as serialized bytes in its first arg so the kernel 
is resolvable executor-side without a driver-side registry; works in cluster 
mode without shared state.
   - `CometInternalRow` - Arrow-vector-backed `InternalRow` view the generated 
code reads through.
   - `freshReferences` thunk on `CompiledKernel` - snapshots the codegen 
references array on first executor-thread use so kernels reused across 
partitions do not race on `ExpressionEncoder` mutable state.
   
   **Bridge contract (shared with #4232, #4233, #4239)**
   
   - `CometUDF.evaluate(inputs, numRows)` - new `numRows` parameter mirrors 
DataFusion's `ScalarFunctionArgs.number_rows`. Needed for zero-column 
expressions (e.g. zero-arg non-deterministic `ScalaUDF`s) where no input vector 
carries batch size. JNI signature `(Ljava/lang/String;[J[JJJI)V` and native 
call site updated. All existing `CometUDF` implementations (#4232 
`RegExpLikeUDF`) updated to the new signature.
   
   **Serde routing**
   
   - `scalaUdf.scala` - routes any `ScalaUDF` through the codegen dispatcher, 
no registration step required (contrast with #4233 where users register by 
name).
   - `strings.scala` - `CodegenDispatchSerdeHelpers` with 
`pickWithMode(viaCodegen, viaNonCodegen, ...)`, giving every JVM-UDF-backed 
expression a uniform `auto | force | disabled` switch. The five regex 
expressions from #4239 gain a codegen option through this picker.
   - `CometConf.COMET_CODEGEN_DISPATCH_MODE` - the mode knob.
   
   **Docs**
   
   - `docs/source/contributor-guide/jvm_udf_dispatch.md` - design, caching 
architecture, resolved-in-branch items, open follow-ups (dict-encoded inputs, 
observability, specialized emitters, WSCG integration, remaining 
DataFusion-contract alignment).
   
   ## How are these changes tested?
   
   - `CometCodegenDispatchSmokeSuite` - 9 `ScalaUDF` type-coverage tests with 
explicit vector-signature assertions (verifies the dispatcher sees `IntVector` 
for `IntegerType` rather than a cast-pushed `BigIntVector`, etc.), composed-UDF 
tests (3-deep and multi-column), zero-column `ScalaUDF` coverage.
   - `CometCodegenSourceSuite` - direct-compile tests for primitive-input 
getters.
   - `CometCodegenDispatchFuzzSuite` - multi-column fuzz across the supported 
type matrix.
   - `CometRegExpJvmSuite` (when #4239 is merged) - passes unchanged with the 
dispatcher in `auto` and `force`, giving a direct correctness comparison 
against the hand-written baselines.
   - `CometScalaUDFCompositionBenchmark` - four modes (Spark, Comet native 
built-ins, dispatcher `disabled`, dispatcher `force`) over three shapes: 
three-level composition, multi-column composition, aggregation over 
composition. Single-op Project shapes run roughly on par with the non-codegen 
JVM-UDF fallback, since Spark's `ScalaUDF` codegen is mature and the per-row 
work is similar on each side. Composed shapes are where the dispatcher should 
help, since the full subtree runs in one native kernel rather than bouncing 
back to Spark between UDFs. Concrete numbers are in the design doc.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to