mbutrovich commented on code in PR #4306:
URL: https://github.com/apache/datafusion-comet/pull/4306#discussion_r3229095226


##########
common/src/main/java/org/apache/comet/udf/CometUdfBridge.java:
##########
@@ -48,13 +50,45 @@ public class CometUdfBridge {
    * @param inputSchemaPtrs addresses of pre-allocated FFI_ArrowSchema structs 
(one per input)
    * @param outArrayPtr address of pre-allocated FFI_ArrowArray for the result
    * @param outSchemaPtr address of pre-allocated FFI_ArrowSchema for the 
result
+   * @param numRows row count of the current batch. Mirrors DataFusion's {@code
+   *     ScalarFunctionArgs.number_rows}; the only batch-size signal a 
zero-input UDF (e.g. a
+   *     zero-arg non-deterministic ScalaUDF) ever sees.
+   * @param taskContext propagated Spark {@link TaskContext} from the driving 
Spark task thread, or
+   *     {@code null} outside a Spark task. Installed as the thread-local for 
the duration of the
+   *     call when the current thread has none, so partition-sensitive 
built-ins ({@code Rand},
+   *     {@code Uuid}, {@code MonotonicallyIncreasingID}) work from Tokio 
workers. Cleared in {@code
+   *     finally} to avoid leaking across worker reuse.
    */
   public static void evaluate(
       String udfClassName,
       long[] inputArrayPtrs,
       long[] inputSchemaPtrs,
       long outArrayPtr,
-      long outSchemaPtr) {
+      long outSchemaPtr,
+      int numRows,
+      TaskContext taskContext) {
+    boolean installedTaskContext = false;
+    if (taskContext != null && TaskContext.get() == null) {

Review Comment:
   Thanks for the review @andygrove! The check is "skip if the worker thread 
already has a TaskContext." In normal operation it's unreachable on Tokio 
workers: `TaskContext.taskContext` is a plain `ThreadLocal` (not 
`InheritableThreadLocal`), so Tokio workers spawn with `null`, and we unset in 
`finally`. The check is there so we don't stomp a Spark task thread's own 
context if the bridge ever gets called directly from one (unit tests, future 
paths).
   
   You're right that if a stale value did somehow end up on a worker, the 
current code would silently use it instead of the propagated one. Safer to 
switch to save-and-restore: always install the propagated `TaskContext`, 
capture the prior value, restore it (or unset) in `finally`. I'll push that 
change. Basically, trust the stored one as the ground truth for this task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to