mbutrovich commented on code in PR #4306:
URL: https://github.com/apache/datafusion-comet/pull/4306#discussion_r3229095226
##########
common/src/main/java/org/apache/comet/udf/CometUdfBridge.java:
##########
@@ -48,13 +50,45 @@ public class CometUdfBridge {
* @param inputSchemaPtrs addresses of pre-allocated FFI_ArrowSchema structs
(one per input)
* @param outArrayPtr address of pre-allocated FFI_ArrowArray for the result
* @param outSchemaPtr address of pre-allocated FFI_ArrowSchema for the
result
+ * @param numRows row count of the current batch. Mirrors DataFusion's {@code
+ * ScalarFunctionArgs.number_rows}; the only batch-size signal a
zero-input UDF (e.g. a
+ * zero-arg non-deterministic ScalaUDF) ever sees.
+ * @param taskContext propagated Spark {@link TaskContext} from the driving
Spark task thread, or
+ * {@code null} outside a Spark task. Installed as the thread-local for
the duration of the
+ * call when the current thread has none, so partition-sensitive
built-ins ({@code Rand},
+ * {@code Uuid}, {@code MonotonicallyIncreasingID}) work from Tokio
workers. Cleared in {@code
+ * finally} to avoid leaking across worker reuse.
*/
public static void evaluate(
String udfClassName,
long[] inputArrayPtrs,
long[] inputSchemaPtrs,
long outArrayPtr,
- long outSchemaPtr) {
+ long outSchemaPtr,
+ int numRows,
+ TaskContext taskContext) {
+ boolean installedTaskContext = false;
+ if (taskContext != null && TaskContext.get() == null) {
Review Comment:
Thanks for the review @andygrove! The check is "skip if the worker thread
already has a TaskContext." In normal operation it's unreachable on Tokio
workers: `TaskContext.taskContext` is a plain `ThreadLocal` (not
`InheritableThreadLocal`), so Tokio workers spawn with `null`, and we unset in
`finally`. The check is there so we don't stomp a Spark task thread's own
context if the bridge ever gets called directly from one (unit tests, future
paths).
You're right that if a stale value did somehow end up on a worker, the
current code would silently use it instead of the propagated one. Safer to
switch to save-and-restore: always install the propagated `TaskContext`,
capture the prior value, restore it (or unset) in `finally`. I'll push that
change. Basically, trust the stored one as the ground truth for this task.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]