This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c97fdeb0067e [SPARK-53900][CONNECT] Fix unintentional `Thread.wait(0)` 
under rare conditions in `ExecuteGrpcResponseSender`
c97fdeb0067e is described below

commit c97fdeb0067e4867a90baba806856fc33fc2f805
Author: vicennial <[email protected]>
AuthorDate: Tue Oct 14 12:04:13 2025 -0400

    [SPARK-53900][CONNECT] Fix unintentional `Thread.wait(0)` under rare 
conditions in `ExecuteGrpcResponseSender`
    
    ### What changes were proposed in this pull request?
    
    Sets a lower bound of `1` to values passed into `Thread#wait` to avoid an 
unintentional indefinite wait.
    
    ### Why are the changes needed?
    
    A bug in `ExecuteGrpcResponseSender` causes RPC streams to hang 
indefinitely when the configured deadline passes. The bug was introduced in 
[PR](https://github.com/apache/spark/pull/49003/files#diff-d4629281431427e41afd6d3db6630bcfdbfdbf77ba74cf7e48a988c1b66c13f1L244-L253])
 during migration from System.currentTimeMillis() to System.nanoTime(), where 
an integer division error converts sub-millisecond timeout values to 0, 
triggering Java's wait(0) behavior (infinite wait).
    
    #### Root Cause
    
    `executionObserver.responseLock.wait(timeoutNs / NANOS_PER_MILLIS)  // ← 
BUG`
    The Problem: When `deadlineTimeNs < System.nanoTime()` (deadline has 
passed):
    
    - Math.max(1, negative_value) clamps to 1 nanosecond
    - Math.min(progressInterval_ns, 1) remains 1 nanosecond
    - Integer division: 1 / 1,000,000 = 0 milliseconds
    - wait(0) in Java means wait indefinitely until notified
    - No notification arrives (execution already completed), thread hangs 
forever
    
    While one the loop conditions guards against `deadlineTimeNs < 
System.nanoTime()`, it isn’t sufficient as the deadline can elapse while inside 
the loop (the time is freshly fetched in the latter timeout calculation). The 
probability of occurrence can be exacerbated by GC pauses.
    
    #### Conditions Required for Bug to Trigger
    
    The bug manifests when all of the following conditions are met:
    
    - Reattachable execution enabled (CONNECT_EXECUTE_REATTACHABLE_ENABLED = 
true)
    - Execution completes prior to the deadline within the inner loop
    - (all responses sent before deadline)
    - Deadline passes within the inner loop
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52609 from vicennial/fixWait.
    
    Authored-by: vicennial <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
    (cherry picked from commit ff0f1ab95238b27af93c755b600b82c0769fb8d0)
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../spark/sql/connect/execution/ExecuteGrpcResponseSender.scala       | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 3a707495ff3f..785b254d7af0 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -263,7 +263,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
               timeoutNs = Math.min(progressTimeout * NANOS_PER_MILLIS, 
timeoutNs)
             }
             logTrace(s"Wait for response to become available with 
timeout=$timeoutNs ns.")
-            executionObserver.responseLock.wait(timeoutNs / NANOS_PER_MILLIS)
+            executionObserver.responseLock.wait(Math.max(1, timeoutNs / 
NANOS_PER_MILLIS))
             enqueueProgressMessage(force = true)
             logTrace(s"Reacquired executionObserver lock after waiting.")
             sleepEnd = System.nanoTime()
@@ -384,7 +384,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
           val timeoutNs = Math.max(1, deadlineTimeNs - System.nanoTime())
           var sleepStart = System.nanoTime()
           logTrace(s"Wait for grpcCallObserver to become ready with 
timeout=$timeoutNs ns.")
-          grpcCallObserverReadySignal.wait(timeoutNs / NANOS_PER_MILLIS)
+          grpcCallObserverReadySignal.wait(Math.max(1, timeoutNs / 
NANOS_PER_MILLIS))
           logTrace(s"Reacquired grpcCallObserverReadySignal lock after 
waiting.")
           sleepEnd = System.nanoTime()
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to