This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fb81e74b761 Fix another race in getWorkerFor. (#18918)
fb81e74b761 is described below
commit fb81e74b761c724dcdb0b799181bedb54efd4bd5
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Jan 14 23:49:35 2026 -0800
Fix another race in getWorkerFor. (#18918)
This patch is covering another case, beyond the one covered in #18910.
There was still a race when multiple workers were running and all trying
to contact each other, if worker A tried to contact worker B before
worker B had been set up.
The fix is to have newWorker return null rather than throw an error in
the base class, and update getWorkerFor to retry on null.
---
.../java/org/apache/druid/msq/test/MSQTestWorkerClient.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
index 859194bf1ff..10b7db40c60 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
@@ -61,11 +61,13 @@ public class MSQTestWorkerClient implements WorkerClient
protected Worker getWorkerFor(String workerTaskId)
{
- final WorkerRunRef workerRunRef =
inMemoryWorkers.computeIfAbsent(workerTaskId, this::newWorker);
final Stopwatch stopwatch = Stopwatch.createStarted();
- // Wait for the worker to exist
- while (!workerRunRef.hasWorker()) {
+ // Wait for the worker to exist. It may not have been created or started
up yet, especially if this is
+ // a worker trying to contact another worker.
+ WorkerRunRef workerRunRef;
+ while ((workerRunRef = inMemoryWorkers.computeIfAbsent(workerTaskId,
this::newWorker)) == null
+ || !workerRunRef.hasWorker()) {
if (stopwatch.millisElapsed() > WORKER_WAIT_TIMEOUT_MS) {
throw new ISE(
"Timed out after [%,d]ms waiting for worker[%s] to be registered",
@@ -88,7 +90,8 @@ public class MSQTestWorkerClient implements WorkerClient
protected WorkerRunRef newWorker(String workerId)
{
- throw new RuntimeException("Not implemented!");
+ // Return null so getWorkerFor waits for inMemoryWorkers to be populated
+ return null;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]