mbutrovich opened a new pull request, #4267: URL: https://github.com/apache/datafusion-comet/pull/4267
Draft while we discuss with #4233 and #4239. ## Which issue does this PR close? Closes #. ## Rationale for this change #4232 merged the JVM UDF bridge: a JNI path that lets native execution call `CometUDF` implementations on the JVM, with Arrow FFI for data exchange. Two follow-ups build on it: - **#4239** hand-writes five Arrow-vector `CometUDF` classes for the remaining Spark regex expressions (`regexp_extract`, `regexp_extract_all`, `regexp_instr`, `regexp_replace`, `split`) and flips the default regex engine from `rust` to `java`. Every new expression costs a hand-written UDF plus serde glue. - **#4233** exposes the bridge to end users: `CometUdfRegistry` maps a Spark UDF name to a user-authored `CometUDF` class, and `CometScalaUdf` serde routes registered names to native. Users re-implement their UDF as an Arrow-vector `CometUDF` (or opt into a `registerColumnarOnly` stub that throws if Spark falls back). Both approaches require a hand-written Arrow-vector implementation per expression. That does not scale to arbitrary `ScalaUDF`s or to Catalyst expressions that already emit Spark codegen. This PR adds a codegen-based dispatcher on top of #4232 that compiles a batch-kernel `CometUDF` directly from a bound Catalyst `Expression` via Janino. Any expression whose children and output type are supported routes through native without a hand-written `CometUDF`. The same dispatcher covers user `ScalaUDF`s (the case #4233 targets, without the reimplementation step) and the regex expressions from #4239 (as a direct comparison against the hand-written baselines). Operating on bound Catalyst `Expression` trees makes composition cheap in three places: - **Engineering**: one dispatcher and one serde entry point covers N expressions. Adding a new Catalyst expression typically means relaxing `CometBatchKernelCodegen.canHandle`, not writing a new `CometUDF` class plus its serde. - **Planning**: a UDF subtree is not an opaque boundary. Ordinary Catalyst expressions, registered `ScalaUDF`s, and unregistered `ScalaUDF`s live in the same tree and are serialized together, so Comet keeps the surrounding native operators in place. - **Execution**: an entire expression subtree (e.g. `f(g(a), h(b))` or `upper(udf(x))`) compiles into a single generated batch kernel. One JNI crossing per batch for the whole subtree, not one per nesting level; intermediate results stay on the JVM heap. The dispatcher is opt-in per serde and gated by `spark.comet.exec.codegenDispatch.mode = auto | force | disabled`. Existing paths (#4232 rlike, #4239 regex family, #4233 user registry) remain the defaults until benchmarks justify flipping specific shapes. Intended scope is narrower than "any expression". The primary targets are string expressions (where JVM-Rust differences in collation and regex engine semantics make a JVM path the more faithful baseline) and custom `ScalaUDF`s (where no Rust implementation exists). For numeric and other expression families with native Rust kernels, the native path is almost certainly faster and this dispatcher is not meant to replace it. ## What changes are included in this PR? **Codegen dispatcher** - `CometBatchKernelCodegen` - takes a bound `Expression`, runs Spark's `CodegenContext` once, and emits a Janino-compiled `CometUDF` that loops over input Arrow vectors in a generated batch kernel. Three-layer cache: JVM-wide compiled-class cache, thread-local UDF instance cache (inherited from #4232), per-partition kernel cache. - `CometCodegenDispatchUDF` - the `CometUDF` the bridge dispatches to. Carries the bound expression as serialized bytes in its first arg so the kernel is resolvable executor-side without a driver-side registry; works in cluster mode without shared state. - `CometInternalRow` - Arrow-vector-backed `InternalRow` view the generated code reads through. - `freshReferences` thunk on `CompiledKernel` - snapshots the codegen references array on first executor-thread use so kernels reused across partitions do not race on `ExpressionEncoder` mutable state. **Bridge contract (shared with #4232, #4233, #4239)** - `CometUDF.evaluate(inputs, numRows)` - new `numRows` parameter mirrors DataFusion's `ScalarFunctionArgs.number_rows`. Needed for zero-column expressions (e.g. zero-arg non-deterministic `ScalaUDF`s) where no input vector carries batch size. JNI signature `(Ljava/lang/String;[J[JJJI)V` and native call site updated. All existing `CometUDF` implementations (#4232 `RegExpLikeUDF`) updated to the new signature. **Serde routing** - `scalaUdf.scala` - routes any `ScalaUDF` through the codegen dispatcher, no registration step required (contrast with #4233 where users register by name). - `strings.scala` - `CodegenDispatchSerdeHelpers` with `pickWithMode(viaCodegen, viaNonCodegen, ...)`, giving every JVM-UDF-backed expression a uniform `auto | force | disabled` switch. The five regex expressions from #4239 gain a codegen option through this picker. - `CometConf.COMET_CODEGEN_DISPATCH_MODE` - the mode knob. **Docs** - `docs/source/contributor-guide/jvm_udf_dispatch.md` - design, caching architecture, resolved-in-branch items, open follow-ups (dict-encoded inputs, observability, specialized emitters, WSCG integration, remaining DataFusion-contract alignment). ## How are these changes tested? - `CometCodegenDispatchSmokeSuite` - 9 `ScalaUDF` type-coverage tests with explicit vector-signature assertions (verifies the dispatcher sees `IntVector` for `IntegerType` rather than a cast-pushed `BigIntVector`, etc.), composed-UDF tests (3-deep and multi-column), zero-column `ScalaUDF` coverage. - `CometCodegenSourceSuite` - direct-compile tests for primitive-input getters. - `CometCodegenDispatchFuzzSuite` - multi-column fuzz across the supported type matrix. - `CometRegExpJvmSuite` (when #4239 is merged) - passes unchanged with the dispatcher in `auto` and `force`, giving a direct correctness comparison against the hand-written baselines. - `CometScalaUDFCompositionBenchmark` - four modes (Spark, Comet native built-ins, dispatcher `disabled`, dispatcher `force`) over three shapes: three-level composition, multi-column composition, aggregation over composition. Single-op Project shapes run roughly on par with the non-codegen JVM-UDF fallback, since Spark's `ScalaUDF` codegen is mature and the per-row work is similar on each side. Composed shapes are where the dispatcher should help, since the full subtree runs in one native kernel rather than bouncing back to Spark between UDFs. Concrete numbers are in the design doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
