This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a8283975a26 Fixing deadlock in MSQ worker retry logic. (#18254)
a8283975a26 is described below

commit a8283975a26cc234c0cfaa0e324efd8ab713a909
Author: Karan Kumar <[email protected]>
AuthorDate: Sat Jul 19 21:28:35 2025 +0530

    Fixing deadlock in MSQ worker retry logic. (#18254)
    
    * Fixing deadlock in MSQ worker retry logic.
    
    * Fixing deadlock in MSQ worker retry logic.
    
    * Better logging.
    
    * Better logging.
    
    * Minor rename.
    
    * Fixing static checks.
    
    * Removing extra spaces.
    
    * Fixing imports.
    
    * Addressing feedback
    
    * Better tests.
    
    * Review adjustments.
---
 .../msq/dart/controller/DartControllerContext.java |   4 +-
 .../msq/dart/controller/DartWorkerManager.java     |   3 +-
 .../apache/druid/msq/exec/ControllerContext.java   |  10 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  57 ++-
 .../druid/msq/exec/RetryCapableWorkerManager.java  |   4 +-
 .../org/apache/druid/msq/exec/WorkerManager.java   |   7 +-
 .../apache/druid/msq/exec/WorkerSketchFetcher.java |   3 +-
 .../msq/indexing/IndexerControllerContext.java     |   7 +-
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  |  98 +++-
 .../msq/dart/controller/DartWorkerManagerTest.java |   6 +-
 .../org/apache/druid/msq/exec/MSQTasksTest.java    |   3 +-
 .../indexing/MSQWorkerTaskLauncherRetryTests.java  | 503 +++++++++++++++++++++
 .../msq/indexing/MSQWorkerTaskLauncherTest.java    |  16 +-
 .../druid/msq/test/MSQTestControllerContext.java   |   5 +-
 .../apache/druid/msq/util/MSQFaultUtilsTest.java   |   5 +
 .../apache/druid/indexing/overlord/TaskQueue.java  |   3 +-
 16 files changed, 659 insertions(+), 75 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index a44089c5651..cdd317c5466 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -35,7 +35,6 @@ import org.apache.druid.msq.exec.ControllerMemoryParameters;
 import org.apache.druid.msq.exec.MSQMetriceEventBuilder;
 import org.apache.druid.msq.exec.MemoryIntrospector;
 import org.apache.druid.msq.exec.SegmentSource;
-import org.apache.druid.msq.exec.WorkerFailureListener;
 import org.apache.druid.msq.exec.WorkerManager;
 import org.apache.druid.msq.indexing.IndexerControllerContext;
 import org.apache.druid.msq.indexing.MSQSpec;
@@ -203,8 +202,7 @@ public class DartControllerContext implements 
ControllerContext
   public WorkerManager newWorkerManager(
       String queryId,
       MSQSpec querySpec,
-      ControllerQueryKernelConfig queryKernelConfig,
-      WorkerFailureListener workerFailureListener
+      ControllerQueryKernelConfig queryKernelConfig
   )
   {
     // We're ignoring WorkerFailureListener. Dart worker failures are routed 
into the controller by
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
index c49e0b98aed..5ce31286a5f 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
@@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.dart.worker.DartWorkerClient;
 import org.apache.druid.msq.exec.ControllerContext;
 import org.apache.druid.msq.exec.WorkerClient;
+import org.apache.druid.msq.exec.WorkerFailureListener;
 import org.apache.druid.msq.exec.WorkerManager;
 import org.apache.druid.msq.exec.WorkerStats;
 import org.apache.druid.msq.indexing.WorkerCount;
@@ -85,7 +86,7 @@ public class DartWorkerManager implements WorkerManager
   }
 
   @Override
-  public ListenableFuture<?> start()
+  public ListenableFuture<?> start(WorkerFailureListener workerFailureListener)
   {
     if (!state.compareAndSet(State.NEW, State.STARTED)) {
       throw new ISE("Cannot start from state[%s]", state.get());
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index a25c944cf63..7c0f7c2300a 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -99,16 +99,14 @@ public interface ControllerContext
   /**
    * Provides services about workers: starting, canceling, obtaining status.
    *
-   * @param queryId               query ID
-   * @param querySpec             query spec
-   * @param queryKernelConfig     config from {@link 
#queryKernelConfig(MSQSpec)}
-   * @param workerFailureListener listener that receives callbacks when 
workers fail
+   * @param queryId           query ID
+   * @param querySpec         query spec
+   * @param queryKernelConfig config from {@link #queryKernelConfig(MSQSpec)}
    */
   WorkerManager newWorkerManager(
       String queryId,
       MSQSpec querySpec,
-      ControllerQueryKernelConfig queryKernelConfig,
-      WorkerFailureListener workerFailureListener
+      ControllerQueryKernelConfig queryKernelConfig
   );
 
   /**
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 0851fa7be7b..7bd6d79b557 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -213,6 +213,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -281,8 +282,8 @@ public class ControllerImpl implements Controller
 
   private WorkerSketchFetcher workerSketchFetcher;
 
-  // WorkerNumber -> WorkOrders which need to be retried and our determined by 
the controller.
-  // Map is always populated in the main controller thread by addToRetryQueue, 
and pruned in retryFailedTasks.
+  // WorkerNumber -> WorkOrders which need to be retried and are determined by 
the controller.
+  // Map is always populated in the main controller thread by addToRetryQueue 
and pruned in retryFailedTasks.
   private final Map<Integer, Set<WorkOrder>> workOrdersToRetry = new 
HashMap<>();
 
   // Time at which the query started.
@@ -300,6 +301,8 @@ public class ControllerImpl implements Controller
   @Nullable
   private MSQSegmentReport segmentReport;
 
+  private final AtomicLong mainThreadId = new AtomicLong();
+
   public ControllerImpl(
       final LegacyMSQSpec querySpec,
       final ResultsContext resultsContext,
@@ -391,6 +394,8 @@ public class ControllerImpl implements Controller
     final TaskState taskStateForReport;
     final MSQErrorReport errorForReport;
 
+    mainThreadId.set(Thread.currentThread().getId());
+
     try {
       // Planning-related: convert the native query from MSQSpec into a 
multi-stage QueryDefinition.
       this.queryStartTime = DateTimes.nowUtc();
@@ -427,7 +432,7 @@ public class ControllerImpl implements Controller
       exceptionEncountered = e;
     }
 
-    // Fetch final counters in separate try, in case runQueryUntilDone threw 
an exception.
+    // Fetch final counters in a separate try, in case runQueryUntilDone threw 
an exception.
     try {
       countersSnapshot = getFinalCountersSnapshot(queryKernel);
     }
@@ -744,16 +749,7 @@ public class ControllerImpl implements Controller
     workerManager = context.newWorkerManager(
         context.queryId(),
         querySpec,
-        queryKernelConfig,
-        (failedTask, fault) -> {
-          if (queryKernelConfig.isFaultTolerant() && 
ControllerQueryKernel.isRetriableFault(fault)) {
-            addToKernelManipulationQueue(kernel -> {
-              addToRetryQueue(kernel, failedTask.getWorkerNumber(), fault);
-            });
-          } else {
-            throw new MSQException(fault);
-          }
-        }
+        queryKernelConfig
     );
 
     if (queryKernelConfig.isFaultTolerant() && !(workerManager instanceof 
RetryCapableWorkerManager)) {
@@ -786,6 +782,22 @@ public class ControllerImpl implements Controller
     return queryDef;
   }
 
+  private WorkerFailureListener getWorkerFailureListener(ControllerQueryKernel 
controllerQueryKernel)
+  {
+    return (failedTask, fault) -> {
+      throwIfNonRetriableFault(fault);
+      if (Thread.currentThread().getId() == mainThreadId.get()) {
+        // this is called from the main controller thread, so we can directly 
access the kernel.
+        addToRetryQueue(controllerQueryKernel, failedTask.getWorkerNumber(), 
fault);
+      } else {
+        // since this is called from the task launcher thread, we need to add 
it to the kernel manipulation queue so that only the controller thread can 
manipulate the kernel.
+        addToKernelManipulationQueue(kernel -> {
+          addToRetryQueue(kernel, failedTask.getWorkerNumber(), fault);
+        });
+      }
+    };
+  }
+
   /**
    * Adds the work orders for worker to {@link 
ControllerImpl#workOrdersToRetry} if the {@link ControllerQueryKernel} 
determines that there
    * are work orders which needs reprocessing.
@@ -800,7 +812,7 @@ public class ControllerImpl implements Controller
 
     List<WorkOrder> retriableWorkOrders = 
kernel.getWorkInCaseWorkerEligibleForRetryElseThrow(worker, fault);
     if (!retriableWorkOrders.isEmpty()) {
-      log.info("Submitting worker[%s] for relaunch because of fault[%s]", 
worker, fault);
+      log.debug("Submitting worker[%s] for relaunch because of fault[%s]", 
worker, fault);
       retryCapableWorkerManager.submitForRelaunch(worker);
       workOrdersToRetry.compute(
           worker,
@@ -1379,7 +1391,7 @@ public class ControllerImpl implements Controller
       final boolean retryOnFailure
   )
   {
-    // Sorted copy of target worker numbers to ensure consistent iteration 
order.
+    // Sorted copy of target worker numbers to ensure a consistent iteration 
order.
     final List<Integer> workersCopy = Ordering.natural().sortedCopy(workers);
     final List<String> workerIds = getWorkerIds();
     final List<ListenableFuture<Void>> workerFutures = new 
ArrayList<>(workersCopy.size());
@@ -2222,7 +2234,7 @@ public class ControllerImpl implements Controller
     private final ControllerQueryKernel queryKernel;
 
     /**
-     * Return value of {@link WorkerManager#start()}. Set by {@link 
#startTaskLauncher()}.
+     * Return value of {@link WorkerManager#start(WorkerFailureListener)} )}. 
Set by {@link #startTaskLauncher()}.
      */
     private ListenableFuture<?> workerTaskLauncherFuture;
 
@@ -2433,13 +2445,13 @@ public class ControllerImpl implements Controller
       // Start tasks.
       log.debug("Query [%s] starting task launcher.", queryDef.getQueryId());
 
-      workerTaskLauncherFuture = workerManager.start();
+      workerTaskLauncherFuture = 
workerManager.start(getWorkerFailureListener(queryKernel));
       closer.register(() -> workerManager.stop(true));
 
       workerTaskLauncherFuture.addListener(
           () ->
               addToKernelManipulationQueue(queryKernel -> {
-                // Throw an exception in the main loop, if anything went wrong.
+                // Throw an exception in the main loop if anything went wrong.
                 FutureUtils.getUncheckedImmediately(workerTaskLauncherFuture);
               }),
           Execs.directExecutor()
@@ -2529,7 +2541,7 @@ public class ControllerImpl implements Controller
         );
 
         for (final StageId stageId : newStageIds) {
-          // Allocate segments, if this is the final stage of an ingestion.
+          // Allocate segments if this is the final stage of ingestion.
           if (MSQControllerTask.isIngestion(querySpec)
               && stageId.getStageNumber() == 
queryDef.getFinalStageDefinition().getStageNumber()
               && (((DataSourceMSQDestination) 
querySpec.getDestination()).getTerminalStageSpec() instanceof 
SegmentGenerationStageSpec)) {
@@ -2848,6 +2860,13 @@ public class ControllerImpl implements Controller
     }
   }
 
+  private void throwIfNonRetriableFault(MSQFault fault)
+  {
+    if (!queryKernelConfig.isFaultTolerant() || 
!ControllerQueryKernel.isRetriableFault(fault)) {
+      throw new MSQException(fault);
+    }
+  }
+
   static ClusterStatisticsMergeMode finalizeClusterStatisticsMergeMode(
       StageDefinition stageDef,
       ClusterStatisticsMergeMode initialMode
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RetryCapableWorkerManager.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RetryCapableWorkerManager.java
index d5b3d41d7a2..b38f6240d98 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RetryCapableWorkerManager.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RetryCapableWorkerManager.java
@@ -30,7 +30,7 @@ public interface RetryCapableWorkerManager extends 
WorkerManager
   void submitForRelaunch(int workerNumber);
 
   /**
-   * Report a worker that failed without active orders. To be retried if it is 
requried for future stages only.
+   * Report a worker that failed without active orders. To be retried if it is 
required for future stages only.
    */
   void reportFailedInactiveWorker(int workerNumber);
 
@@ -39,7 +39,7 @@ public interface RetryCapableWorkerManager extends 
WorkerManager
    * to figure out if the worker taskId is canceled by the controller. If yes, 
the errors from that worker taskId
    * are ignored for the error reports.
    *
-   * @return true if task is canceled by the controller, else false
+   * @return true if the task is canceled by the controller, else false
    */
   boolean isTaskCanceledByController(String taskId);
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
index a4dc62aefd2..db25641fb13 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
@@ -42,7 +42,7 @@ public interface WorkerManager
    * resolves to an exception if one of the workers fails without being 
explicitly canceled, or if something else
    * goes wrong.
    */
-  ListenableFuture<?> start();
+  ListenableFuture<?> start(WorkerFailureListener workerFailureListener);
 
   /**
    * Launch additional workers, if needed, to bring the number of running 
workers up to {@code workerCount}.
@@ -56,13 +56,14 @@ public interface WorkerManager
    */
   void waitForWorkers(Set<Integer> workerNumbers) throws InterruptedException;
 
+
   /**
-   * List of currently-active workers.
+   * List of currently active workers.
    */
   List<String> getWorkerIds();
 
   /**
-   * Number of currently-active and currently-pending workers.
+   * Number of currently active and currently-pending workers.
    */
   WorkerCount getWorkerCount();
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
index 0139185a005..8baf54c46fc 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
@@ -138,6 +138,7 @@ public class WorkerSketchFetcher implements AutoCloseable
     }
 
     try {
+      // since this is running another in thread, hence the controller thread 
does not block; hence we need not call with failure handler.
       workerManager.waitForWorkers(ImmutableSet.of(worker));
     }
     catch (InterruptedException interruptedException) {
@@ -146,7 +147,7 @@ public class WorkerSketchFetcher implements AutoCloseable
       return;
     }
 
-    // if task is not the latest task. It must have retried.
+    // if the task is not the latest task, it must have retried.
     if (!workerManager.isWorkerActive(taskId)) {
       log.info("Task[%s] is no longer the latest task for worker[%d]. Skipping 
fetch.", taskId, worker);
       return;
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 079af24c390..67fc6aadb24 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -38,7 +38,6 @@ import org.apache.druid.msq.exec.MSQMetriceEventBuilder;
 import org.apache.druid.msq.exec.MemoryIntrospector;
 import org.apache.druid.msq.exec.SegmentSource;
 import org.apache.druid.msq.exec.WorkerClient;
-import org.apache.druid.msq.exec.WorkerFailureListener;
 import org.apache.druid.msq.exec.WorkerManager;
 import org.apache.druid.msq.guice.MultiStageQuery;
 import 
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
@@ -217,15 +216,13 @@ public class IndexerControllerContext implements 
ControllerContext
   public WorkerManager newWorkerManager(
       final String queryId,
       final MSQSpec querySpec,
-      final ControllerQueryKernelConfig queryKernelConfig,
-      final WorkerFailureListener workerFailureListener
+      final ControllerQueryKernelConfig queryKernelConfig
   )
   {
     return new MSQWorkerTaskLauncher(
         queryId,
         taskDataSource,
         overlordClient,
-        workerFailureListener,
         makeTaskContext(querySpec, queryKernelConfig, taskContext),
         // 10 minutes +- 2 minutes jitter
         TimeUnit.SECONDS.toMillis(600 + 
ThreadLocalRandom.current().nextInt(-4, 5) * 30L),
@@ -338,7 +335,7 @@ public class IndexerControllerContext implements 
ControllerContext
   }
 
   /**
-   * Helper method for {@link #newWorkerManager}, split out to be used in 
tests.
+   * Helper method for {@link ControllerContext#newWorkerManager}, split out 
to be used in tests.
    *
    * @param querySpec MSQ query spec; used for
    */
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index c7b1471380c..5155eb25493 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -29,9 +29,11 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -42,6 +44,7 @@ import org.apache.druid.msq.exec.RetryCapableWorkerManager;
 import org.apache.druid.msq.exec.WorkerFailureListener;
 import org.apache.druid.msq.exec.WorkerStats;
 import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQFault;
 import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault;
 import org.apache.druid.msq.indexing.error.TooManyAttemptsForJob;
 import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker;
@@ -67,7 +70,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
- * Like {@link 
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor}, but 
different.
+ * Like {@link TaskMonitor}, but different.
  */
 public class MSQWorkerTaskLauncher implements RetryCapableWorkerManager
 {
@@ -146,7 +149,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
   private final Set<Integer> failedInactiveWorkers = 
ConcurrentHashMap.newKeySet();
 
   private final ConcurrentHashMap<Integer, List<String>> workerToTaskIds = new 
ConcurrentHashMap<>();
-  private final WorkerFailureListener workerFailureListener;
+  private final AtomicReference<WorkerFailureListener> 
workerFailureListenerRef = new AtomicReference<>();
 
   private final AtomicLong recentFullyStartedWorkerTimeInMillis = new 
AtomicLong(System.currentTimeMillis());
 
@@ -154,7 +157,6 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
       final String controllerTaskId,
       final String dataSource,
       final OverlordClient overlordClient,
-      final WorkerFailureListener workerFailureListener,
       final Map<String, Object> taskContextOverrides,
       final long maxTaskStartDelayMillis,
       final MSQWorkerTaskLauncherConfig config
@@ -167,14 +169,17 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
     this.exec = Execs.singleThreaded(
         "multi-stage-query-task-launcher[" + 
StringUtils.encodeForFormat(controllerTaskId) + "]-%s"
     );
-    this.workerFailureListener = workerFailureListener;
     this.maxTaskStartDelayMillis = maxTaskStartDelayMillis;
     this.config = config;
   }
 
   @Override
-  public ListenableFuture<?> start()
+  public ListenableFuture<?> start(WorkerFailureListener workerFailureListener)
   {
+    if (!this.workerFailureListenerRef.compareAndSet(null, 
workerFailureListener)) {
+      throw DruidException.defensive("WorkerFailureListener already set for 
MSQWorkerTaskLauncher");
+    }
+
     if (state.compareAndSet(State.NEW, State.STARTED)) {
       exec.submit(() -> {
         try {
@@ -185,7 +190,6 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
         }
       });
     }
-
     // Return an "everything is done" future that callers can wait for.
     return stopFuture;
   }
@@ -244,22 +248,36 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
   }
 
   @Override
-  public void launchWorkersIfNeeded(final int taskCount) throws 
InterruptedException
+  public void launchWorkersIfNeeded(final int workerCount)
+      throws InterruptedException
   {
     synchronized (taskIds) {
-      retryInactiveTasksIfNeeded(taskCount);
+      retryInactiveTasksIfNeeded(workerCount);
 
-      if (taskCount > desiredTaskCount) {
-        desiredTaskCount = taskCount;
+      if (workerCount > desiredTaskCount) {
+        desiredTaskCount = workerCount;
         taskIds.notifyAll();
       }
 
-      while (taskIds.size() < taskCount || !allTasksStarted(taskCount)) {
+      while (taskIds.size() < workerCount || !allTasksStarted(workerCount)) {
         if (stopFuture.isDone() || stopFuture.isCancelled()) {
           FutureUtils.getUnchecked(stopFuture, false);
           throw new ISE("Stopped");
         }
-
+        // add failed tasks to retry the queue
+        if (workerFailureListenerRef.get() != null) {
+          for (TaskTracker taskTracker : taskTrackers.values()) {
+            if (taskTracker.isRetrying()) {
+              invokeFailureListener(
+                  taskTracker,
+                  new WorkerFailedFault(
+                      taskTracker.msqWorkerTask.getId(),
+                      taskTracker.statusRef.get().getErrorMsg()
+                  )
+              );
+            }
+          }
+        }
         taskIds.wait();
       }
     }
@@ -301,7 +319,8 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
   }
 
   @Override
-  public void waitForWorkers(Set<Integer> workerNumbers) throws 
InterruptedException
+  public void waitForWorkers(Set<Integer> workerNumbers)
+      throws InterruptedException
   {
     synchronized (taskIds) {
       while (!fullyStartedTasks.containsAll(workerNumbers)) {
@@ -309,6 +328,20 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
           FutureUtils.getUnchecked(stopFuture, false);
           throw new ISE("Stopped");
         }
+
+        if (workerFailureListenerRef.get() != null) {
+          for (TaskTracker taskTracker : taskTrackers.values()) {
+            if (taskTracker.isRetrying() && 
workerNumbers.contains(taskTracker.workerNumber)) {
+              invokeFailureListener(taskTracker,
+                                    new WorkerFailedFault(
+                                        taskTracker.msqWorkerTask.getId(),
+                                        
taskTracker.statusRef.get().getErrorMsg()
+                                    )
+              );
+            }
+          }
+        }
+
         taskIds.wait();
       }
     }
@@ -562,14 +595,11 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
       }
 
       if (tracker.statusRef.get() == null) {
+        tracker.enableRetrying();
         removeWorkerFromFullyStartedWorkers(tracker);
         final String errorMessage = StringUtils.format("Task [%s] status 
missing", taskId);
         log.info(errorMessage + ". Trying to relaunch the worker");
-        tracker.enableRetrying();
-        workerFailureListener.onFailure(
-            tracker.msqWorkerTask,
-            UnknownFault.forMessage(errorMessage)
-        );
+        invokeFailureListener(tracker, UnknownFault.forMessage(errorMessage));
 
       } else if (tracker.didRunTimeOut(maxTaskStartDelayMillis) && 
!canceledWorkerTasks.contains(taskId)) {
         removeWorkerFromFullyStartedWorkers(tracker);
@@ -579,22 +609,31 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
             maxTaskStartDelayMillis
         ));
       } else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
+        tracker.enableRetrying();
         removeWorkerFromFullyStartedWorkers(tracker);
         TaskStatus taskStatus = tracker.statusRef.get();
         log.info("Task[%s] failed because %s. Trying to relaunch the worker", 
taskId, taskStatus.getErrorMsg());
-        tracker.enableRetrying();
-        workerFailureListener.onFailure(
-            tracker.msqWorkerTask,
-            new WorkerFailedFault(taskId, taskStatus.getErrorMsg())
-        );
+        invokeFailureListener(tracker, new WorkerFailedFault(taskId, 
taskStatus.getErrorMsg()));
       }
     }
   }
 
+  private void invokeFailureListener(TaskTracker tracker, MSQFault msqFault)
+  {
+    WorkerFailureListener workerFailureListener = 
workerFailureListenerRef.get();
+    if (workerFailureListener != null) {
+      workerFailureListener.onFailure(
+          tracker.msqWorkerTask,
+          msqFault
+      );
+    }
+  }
+
   private void removeWorkerFromFullyStartedWorkers(TaskTracker tracker)
   {
     synchronized (taskIds) {
       fullyStartedTasks.remove(tracker.msqWorkerTask.getWorkerNumber());
+      taskIds.notifyAll();
     }
   }
 
@@ -616,8 +655,13 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
         if (tracker == null) {
           throw new ISE("Did not find taskTracker for latest taskId[%s]", 
latestTaskId);
         }
-        // if task is not failed donot retry
+        // if the task is not failed, no need to retry
         if (!tracker.isComplete()) {
+          log.info(
+              "Did not relaunch worker[%d] with task id[%s] because the task 
is still running",
+              tracker.workerNumber,
+              latestTaskId
+          );
           return taskHistory;
         }
 
@@ -657,6 +701,7 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
 
         return taskHistory;
       });
+      // remove the worker from the relaunch set
       iterator.remove();
     }
   }
@@ -882,5 +927,10 @@ public class MSQWorkerTaskLauncher implements 
RetryCapableWorkerManager
         return Math.max(0, currentFullyStartingTime - startTimeMillis);
       }
     }
+
+    public int getWorkerNumber()
+    {
+      return workerNumber;
+    }
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
index ea781e2e992..10aca82bf98 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
@@ -140,7 +140,7 @@ public class DartWorkerManagerTest
     Mockito.when(workerClient.stopWorker(WORKERS.get(1)))
            .thenReturn(Futures.immediateFuture(null));
 
-    final ListenableFuture<?> future = workerManager.start();
+    final ListenableFuture<?> future = workerManager.start(null);
     workerManager.stop(false);
 
     // Ensure the future from start() resolves.
@@ -155,7 +155,7 @@ public class DartWorkerManagerTest
     Mockito.when(workerClient.stopWorker(WORKERS.get(1)))
            .thenReturn(Futures.immediateFuture(null));
 
-    final ListenableFuture<?> future = workerManager.start();
+    final ListenableFuture<?> future = workerManager.start(null);
     workerManager.stop(true);
 
     // Ensure the future from start() resolves.
@@ -170,7 +170,7 @@ public class DartWorkerManagerTest
     Mockito.when(workerClient.stopWorker(WORKERS.get(1)))
            .thenReturn(Futures.immediateFuture(null));
 
-    final ListenableFuture<?> future = workerManager.start();
+    final ListenableFuture<?> future = workerManager.start(null);
     workerManager.stop(true);
 
     // Ensure the future from start() resolves.
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
index 24df8823346..02e11444c2e 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
@@ -230,14 +230,13 @@ public class MSQTasksTest
         CONTROLLER_ID,
         "foo",
         new TasksTestOverlordClient(numSlots),
-        (task, fault) -> {},
         ImmutableMap.of(),
         TimeUnit.SECONDS.toMillis(5),
         new MSQWorkerTaskLauncherConfig()
     );
 
     try {
-      msqWorkerTaskLauncher.start();
+      msqWorkerTaskLauncher.start(null);
       msqWorkerTaskLauncher.launchWorkersIfNeeded(numTasks);
       fail();
     }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTests.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTests.java
new file mode 100644
index 00000000000..f2e2477dd47
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTests.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
+import org.apache.druid.client.indexing.IndexingWorkerInfo;
+import org.apache.druid.client.indexing.TaskPayloadResponse;
+import org.apache.druid.client.indexing.TaskStatusResponse;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.overlord.TaskQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
+import org.apache.druid.msq.exec.MSQTasks;
+import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.apache.druid.rpc.UpdateResponse;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.rpc.indexing.SegmentUpdateResponse;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.http.SegmentsToUpdateFilter;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class MSQWorkerTaskLauncherRetryTests
+{
+
+  private static final TaskLocation RUNNING_TASK_LOCATION = new 
TaskLocation("host", 1, 2, null);
+
+  @Test
+  public void mainThreadBlockingSimulationTest() throws Exception
+  {
+    final ExecutorService executors = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setDaemon(false)
+                                                                               
                   .setNameFormat(
+                                                                               
                       "Controller-simulator-%d")
+                                                                               
                   .build());
+
+    final TestOverlordClient overlordClient = new TestOverlordClient();
+    final int failedWorkerNumber = 2;
+    final CountDownLatch workerFailedLatch = new CountDownLatch(1);
+    final CountDownLatch workerStartedLatch = new CountDownLatch(1);
+    overlordClient.addFailedWorker(2);
+    overlordClient.addUnknownLocationWorker(1);
+
+    final MSQWorkerTaskLauncher msqWorkerTaskLauncher = new 
MSQWorkerTaskLauncher(
+        "controller-id",
+        "foo",
+        overlordClient,
+        ImmutableMap.of(),
+        TimeUnit.SECONDS.toMillis(5),
+        new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig()
+    );
+
+    try {
+      final long workerThreadId = Thread.currentThread().getId();
+
+      startTaskLauncher(
+          msqWorkerTaskLauncher,
+          failedWorkerNumber,
+          workerFailedLatch,
+          overlordClient,
+          workerThreadId,
+          workerStartedLatch
+      );
+
+      MockConsumer mockConsumer = new MockConsumer(
+          msqWorkerTaskLauncher,
+          3,
+          workerStartedLatch
+      );
+      Future<?> futures = executors.submit(mockConsumer);
+      // hook called but worker not queued for relaunch.
+      workerFailedLatch.await();
+      Assertions.assertEquals(1, workerStartedLatch.getCount());
+      // we would need to call hooks to allow the main thread to proceed since 
we are using an exec service to so the thread id's would not match.
+      enableWorkerRelaunch(overlordClient, failedWorkerNumber, 
msqWorkerTaskLauncher, workerStartedLatch);
+      // future should be completed in 5 seconds else throw an exception.
+      Assertions.assertNull(futures.get(5, TimeUnit.SECONDS));
+    }
+    finally {
+      msqWorkerTaskLauncher.stop(true);
+      executors.shutdownNow();
+    }
+  }
+
+  private static void enableWorkerRelaunch(
+      TestOverlordClient overlordClient,
+      int failedWorkerNumber,
+      MSQWorkerTaskLauncher msqWorkerTaskLauncher,
+      CountDownLatch workerStartedLatch
+  )
+  {
+    overlordClient.removeUnknownLocationWorker(1);
+    overlordClient.removefailedWorker(failedWorkerNumber);
+    msqWorkerTaskLauncher.submitForRelaunch(failedWorkerNumber);
+    workerStartedLatch.countDown();
+  }
+
+  private static void startTaskLauncher(
+      MSQWorkerTaskLauncher msqWorkerTaskLauncher,
+      int failedWorkerNumber,
+      CountDownLatch workerFailedLatch,
+      TestOverlordClient overlordClient,
+      long workerThreadId,
+      CountDownLatch workerStartedLatch
+  )
+  {
+    msqWorkerTaskLauncher.start((task, fault) -> {
+      Assertions.assertEquals(failedWorkerNumber, task.getWorkerNumber());
+      workerFailedLatch.countDown();
+      if (workerThreadId == Thread.currentThread().getId()) {
+        // If the worker thread is the same as the main thread, we can 
directly relaunch the worker.
+        enableWorkerRelaunch(overlordClient, failedWorkerNumber, 
msqWorkerTaskLauncher, workerStartedLatch);
+      }
+    });
+  }
+
+
+  @Test
+  public void mainThreadNonBlockingSimulationTest() throws Exception
+  {
+    final TestOverlordClient overlordClient = new TestOverlordClient();
+    final int failedWorkerNumber = 2;
+    final CountDownLatch workerFailedLatch = new CountDownLatch(1);
+    final CountDownLatch workerStartedLatch = new CountDownLatch(1);
+    overlordClient.addFailedWorker(2);
+    overlordClient.addUnknownLocationWorker(1);
+
+    final MSQWorkerTaskLauncher msqWorkerTaskLauncher = new 
MSQWorkerTaskLauncher(
+        "controller-id",
+        "foo",
+        overlordClient,
+        ImmutableMap.of(),
+        TimeUnit.SECONDS.toMillis(5),
+        new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig()
+    );
+
+    try {
+      final long workerThreadId = Thread.currentThread().getId();
+
+      startTaskLauncher(
+          msqWorkerTaskLauncher,
+          failedWorkerNumber,
+          workerFailedLatch,
+          overlordClient,
+          workerThreadId,
+          workerStartedLatch
+      );
+
+
+      MockConsumer mockConsumer = new MockConsumer(
+          msqWorkerTaskLauncher,
+          3,
+          workerStartedLatch
+      );
+      mockConsumer.run();
+      // failed latch  called
+      workerFailedLatch.await();
+      // worker started.
+      workerStartedLatch.await();
+    }
+    finally {
+      msqWorkerTaskLauncher.stop(true);
+    }
+  }
+
+
+  private static class MockConsumer implements Runnable
+  {
+
+    private final MSQWorkerTaskLauncher msqWorkerTaskLauncher;
+    private final int taskCount;
+    private final CountDownLatch workerStartedLatch;
+
+    public MockConsumer(
+        MSQWorkerTaskLauncher msqWorkerTaskLauncher,
+        int tasksCount,
+        CountDownLatch workerStartedLatch
+    )
+    {
+      this.msqWorkerTaskLauncher = msqWorkerTaskLauncher;
+      this.taskCount = tasksCount;
+      this.workerStartedLatch = workerStartedLatch;
+    }
+
+
+    @Override
+    public void run()
+    {
+      // start stages
+      try {
+        msqWorkerTaskLauncher.launchWorkersIfNeeded(taskCount);
+        workerStartedLatch.await();
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      Set<Integer> workerNumbers = new HashSet<>();
+      for (int i = 0; i < taskCount; i++) {
+        workerNumbers.add(i);
+      }
+
+      // submit work worders
+      try {
+        msqWorkerTaskLauncher.waitForWorkers(workerNumbers);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+
+  private static class TestOverlordClient implements OverlordClient
+  {
+    private final ConcurrentSkipListSet<Integer> unknownLocationWorkers = new 
ConcurrentSkipListSet<>();
+    private final ConcurrentSkipListSet<Integer> failedWorkers = new 
ConcurrentSkipListSet<>();
+
+    public TestOverlordClient()
+    {
+    }
+
+    @Override
+    public ListenableFuture<URI> findCurrentLeader()
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<Void> runTask(String taskId, Object taskObject)
+    {
+      return Futures.immediateFuture(null);
+    }
+
+    @Override
+    public ListenableFuture<Void> cancelTask(String taskId)
+    {
+      if (failedWorkers.contains(MSQTasks.workerFromTaskId(taskId))) {
+        return Futures.immediateFuture(null);
+      } else {
+        throw DruidException.defensive("Task %s should not be cancelled", 
taskId);
+      }
+    }
+
+    @Override
+    public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
+        @Nullable String state,
+        @Nullable String dataSource,
+        @Nullable Integer maxCompletedTasks
+    )
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<Map<String, TaskStatus>> taskStatuses(Set<String> 
taskIds)
+    {
+      final Map<String, TaskStatus> taskStatusMap = new HashMap<>();
+      for (String taskId : taskIds) {
+        int workerNumber = MSQTasks.workerFromTaskId(taskId);
+        if (failedWorkers.contains(workerNumber)) {
+          taskStatusMap.put(taskId, TaskStatus.failure(taskId, 
TaskQueue.FAILED_TO_RUN_TASK_SEE_OVERLORD_MSG));
+        } else if (unknownLocationWorkers.contains(workerNumber)) {
+          taskStatusMap.put(taskId, 
TaskStatus.running(taskId).withLocation(TaskLocation.unknown()));
+        } else {
+          taskStatusMap.put(taskId, 
TaskStatus.running(taskId).withLocation(RUNNING_TASK_LOCATION));
+        }
+      }
+      return Futures.immediateFuture(taskStatusMap);
+    }
+
+    @Override
+    public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
+    {
+      if (failedWorkers.contains(MSQTasks.workerFromTaskId(taskId))) {
+        return Futures.immediateFuture(new TaskStatusResponse(taskId, 
createFailedTaskStatus(taskId)));
+      }
+      if (unknownLocationWorkers.contains(MSQTasks.workerFromTaskId(taskId))) {
+        return Futures.immediateFuture(new TaskStatusResponse(taskId, 
createRunningTaskStatusWithOutLocation(taskId)));
+      } else {
+        return Futures.immediateFuture(new TaskStatusResponse(taskId, 
createRunningTaskStatus(taskId)));
+      }
+    }
+
+    @Override
+    public ListenableFuture<TaskReport.ReportMap> taskReportAsMap(String 
taskId)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<TaskPayloadResponse> taskPayload(String taskId)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<Map<String, String>> postSupervisor(SupervisorSpec 
supervisor)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<Map<String, String>> terminateSupervisor(String 
supervisorId)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<CloseableIterator<SupervisorStatus>> 
supervisorStatuses()
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<Map<String, List<Interval>>> 
findLockedIntervals(List<LockFilterPolicy> lockFilterPolicies)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<Integer> killPendingSegments(String dataSource, 
Interval interval)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<List<IndexingWorkerInfo>> getWorkers()
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<IndexingTotalWorkerCapacityInfo> 
getTotalWorkerCapacity()
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<Boolean> isCompactionSupervisorEnabled()
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<ClusterCompactionConfig> 
getClusterCompactionConfig()
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<UpdateResponse> 
updateClusterCompactionConfig(ClusterCompactionConfig config)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<SegmentUpdateResponse> 
markNonOvershadowedSegmentsAsUsed(String dataSource)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<SegmentUpdateResponse> 
markNonOvershadowedSegmentsAsUsed(
+        String dataSource,
+        SegmentsToUpdateFilter filter
+    )
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<SegmentUpdateResponse> markSegmentAsUsed(SegmentId 
segmentId)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<SegmentUpdateResponse> markSegmentsAsUnused(String 
dataSource)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<SegmentUpdateResponse> markSegmentsAsUnused(
+        String dataSource,
+        SegmentsToUpdateFilter filter
+    )
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public ListenableFuture<SegmentUpdateResponse> 
markSegmentAsUnused(SegmentId segmentId)
+    {
+      throw new UOE("Not implemented");
+    }
+
+    @Override
+    public OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
+    {
+      return this;
+    }
+
+    public void addUnknownLocationWorker(int workerNumber)
+    {
+      unknownLocationWorkers.add(workerNumber);
+    }
+
+    public void addFailedWorker(int workerNumber)
+    {
+      failedWorkers.add(workerNumber);
+    }
+
+    public void removefailedWorker(int workerNumber)
+    {
+      failedWorkers.remove(workerNumber);
+    }
+
+    public void removeUnknownLocationWorker(int workerNumber)
+    {
+      unknownLocationWorkers.remove(workerNumber);
+    }
+
+    private TaskStatusPlus createRunningTaskStatusWithOutLocation(String 
taskid)
+    {
+      return createTaskStatusPlus(taskid, TaskState.RUNNING, 
TaskLocation.unknown());
+    }
+
+    private TaskStatusPlus createRunningTaskStatus(String taskid)
+    {
+      return createTaskStatusPlus(taskid, TaskState.RUNNING, 
RUNNING_TASK_LOCATION);
+    }
+
+    private TaskStatusPlus createFailedTaskStatus(String taskid)
+    {
+      return createTaskStatusPlus(taskid, TaskState.FAILED, 
RUNNING_TASK_LOCATION);
+    }
+
+    private TaskStatusPlus createTaskStatusPlus(String taskid, TaskState 
taskState, TaskLocation location)
+    {
+      return new TaskStatusPlus(
+          taskid,
+          "group-id",
+          "type",
+          DateTime.now(DateTimeZone.UTC),
+          DateTime.now(DateTimeZone.UTC),
+          taskState,
+          RunnerTaskState.RUNNING,
+          1000L,
+          location,
+          "dataSource",
+          null
+      );
+    }
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
index 1e00be629a1..3cd1b5b98f8 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
@@ -21,6 +21,8 @@ package org.apache.druid.msq.indexing;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.msq.exec.WorkerFailureListener;
 import 
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.junit.Assert;
@@ -42,7 +44,6 @@ public class MSQWorkerTaskLauncherTest
         "controller-id",
         "foo",
         Mockito.mock(OverlordClient.class),
-        (task, fault) -> {},
         ImmutableMap.of(),
         TimeUnit.SECONDS.toMillis(5),
         new MSQWorkerTaskLauncherConfig()
@@ -57,4 +58,17 @@ public class MSQWorkerTaskLauncherTest
 
     Assert.assertEquals(target.getWorkersToRelaunch(), ImmutableSet.of(1));
   }
+
+  @Test
+  public void testMultipleWorkerFailureRegistration()
+  {
+    target.start(getWorkerFailureListener());
+    Assert.assertThrows(DruidException.class, () -> 
target.start(getWorkerFailureListener()));
+  }
+
+  private static WorkerFailureListener getWorkerFailureListener()
+  {
+    return (a, b) -> {
+    };
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 6587703dfef..dd618faafd6 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -53,7 +53,6 @@ import org.apache.druid.msq.exec.MSQMetriceEventBuilder;
 import org.apache.druid.msq.exec.SegmentSource;
 import org.apache.druid.msq.exec.Worker;
 import org.apache.druid.msq.exec.WorkerClient;
-import org.apache.druid.msq.exec.WorkerFailureListener;
 import org.apache.druid.msq.exec.WorkerImpl;
 import org.apache.druid.msq.exec.WorkerManager;
 import org.apache.druid.msq.exec.WorkerMemoryParameters;
@@ -347,8 +346,7 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
   public WorkerManager newWorkerManager(
       String queryId,
       MSQSpec querySpec,
-      ControllerQueryKernelConfig queryKernelConfig,
-      WorkerFailureListener workerFailureListener
+      ControllerQueryKernelConfig queryKernelConfig
   )
   {
     MSQWorkerTaskLauncherConfig taskLauncherConfig = new 
MSQWorkerTaskLauncherConfig();
@@ -360,7 +358,6 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
         controller.queryId(),
         "test-datasource",
         overlordClient,
-        workerFailureListener,
         IndexerControllerContext.makeTaskContext(querySpec, queryKernelConfig, 
ImmutableMap.of()),
         0,
         taskLauncherConfig
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MSQFaultUtilsTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MSQFaultUtilsTest.java
index 21b04623eda..d7d0af09e2f 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MSQFaultUtilsTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MSQFaultUtilsTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.util;
 
+import org.apache.druid.indexing.overlord.TaskQueue;
 import org.apache.druid.msq.indexing.error.MSQFaultUtils;
 import org.apache.druid.msq.indexing.error.UnknownFault;
 import org.apache.druid.msq.indexing.error.WorkerFailedFault;
@@ -34,6 +35,10 @@ public class MSQFaultUtilsTest
   {
     Assert.assertEquals(UnknownFault.CODE, 
MSQFaultUtils.getErrorCodeFromMessage(
         "Task execution process exited unsuccessfully with code[137]. See 
middleManager logs for more details..."));
+    Assert.assertEquals(
+        UnknownFault.CODE,
+        
MSQFaultUtils.getErrorCodeFromMessage(TaskQueue.FAILED_TO_RUN_TASK_SEE_OVERLORD_MSG)
+    );
 
     Assert.assertEquals(UnknownFault.CODE, 
MSQFaultUtils.getErrorCodeFromMessage(""));
     Assert.assertEquals(UnknownFault.CODE, 
MSQFaultUtils.getErrorCodeFromMessage(null));
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index f253c5a0607..f18c3b4465e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -102,6 +102,7 @@ import java.util.stream.Collectors;
  */
 public class TaskQueue
 {
+  public static final String FAILED_TO_RUN_TASK_SEE_OVERLORD_MSG = "Failed to 
run task. See overlord logs for more details.";
   private static final long MANAGEMENT_WAIT_TIMEOUT_NANOS = 
TimeUnit.SECONDS.toNanos(60);
   private static final long MIN_WAIT_TIME_MS = 100;
 
@@ -782,7 +783,7 @@ public class TaskQueue
             statusUpdatesInQueue.incrementAndGet();
             TaskStatus status = TaskStatus.failure(
                 task.getId(),
-                "Failed to run task. See overlord logs for more details."
+                FAILED_TO_RUN_TASK_SEE_OVERLORD_MSG
             );
             taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to