This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fb81e74b761 Fix another race in getWorkerFor. (#18918)
fb81e74b761 is described below

commit fb81e74b761c724dcdb0b799181bedb54efd4bd5
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Jan 14 23:49:35 2026 -0800

    Fix another race in getWorkerFor. (#18918)
    
    This patch is covering another case, beyond the one covered in #18910.
    There was still a race when multiple workers were running and all trying
    to contact each other, if worker A tried to contact worker B before
    worker B had been set up.
    
    The fix is to have newWorker return null rather than throw an error in
    the base class, and update getWorkerFor to retry on null.
---
 .../java/org/apache/druid/msq/test/MSQTestWorkerClient.java   | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
index 859194bf1ff..10b7db40c60 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
@@ -61,11 +61,13 @@ public class MSQTestWorkerClient implements WorkerClient
 
   protected Worker getWorkerFor(String workerTaskId)
   {
-    final WorkerRunRef workerRunRef = 
inMemoryWorkers.computeIfAbsent(workerTaskId, this::newWorker);
     final Stopwatch stopwatch = Stopwatch.createStarted();
 
-    // Wait for the worker to exist
-    while (!workerRunRef.hasWorker()) {
+    // Wait for the worker to exist. It may not have been created or started 
up yet, especially if this is
+    // a worker trying to contact another worker.
+    WorkerRunRef workerRunRef;
+    while ((workerRunRef = inMemoryWorkers.computeIfAbsent(workerTaskId, 
this::newWorker)) == null
+           || !workerRunRef.hasWorker()) {
       if (stopwatch.millisElapsed() > WORKER_WAIT_TIMEOUT_MS) {
         throw new ISE(
             "Timed out after [%,d]ms waiting for worker[%s] to be registered",
@@ -88,7 +90,8 @@ public class MSQTestWorkerClient implements WorkerClient
 
   protected WorkerRunRef newWorker(String workerId)
   {
-    throw new RuntimeException("Not implemented!");
+    // Return null so getWorkerFor waits for inMemoryWorkers to be populated
+    return null;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to