This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a8283975a26 Fixing deadlock in MSQ worker retry logic. (#18254)
a8283975a26 is described below
commit a8283975a26cc234c0cfaa0e324efd8ab713a909
Author: Karan Kumar <[email protected]>
AuthorDate: Sat Jul 19 21:28:35 2025 +0530
Fixing deadlock in MSQ worker retry logic. (#18254)
* Fixing deadlock in MSQ worker retry logic.
* Fixing deadlock in MSQ worker retry logic.
* Better logging.
* Better logging.
* Minor rename.
* Fixing static checks.
* Removing extra spaces.
* Fixing imports.
* Addressing feedback
* Better tests.
* Review adjustments.
---
.../msq/dart/controller/DartControllerContext.java | 4 +-
.../msq/dart/controller/DartWorkerManager.java | 3 +-
.../apache/druid/msq/exec/ControllerContext.java | 10 +-
.../org/apache/druid/msq/exec/ControllerImpl.java | 57 ++-
.../druid/msq/exec/RetryCapableWorkerManager.java | 4 +-
.../org/apache/druid/msq/exec/WorkerManager.java | 7 +-
.../apache/druid/msq/exec/WorkerSketchFetcher.java | 3 +-
.../msq/indexing/IndexerControllerContext.java | 7 +-
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 98 +++-
.../msq/dart/controller/DartWorkerManagerTest.java | 6 +-
.../org/apache/druid/msq/exec/MSQTasksTest.java | 3 +-
.../indexing/MSQWorkerTaskLauncherRetryTests.java | 503 +++++++++++++++++++++
.../msq/indexing/MSQWorkerTaskLauncherTest.java | 16 +-
.../druid/msq/test/MSQTestControllerContext.java | 5 +-
.../apache/druid/msq/util/MSQFaultUtilsTest.java | 5 +
.../apache/druid/indexing/overlord/TaskQueue.java | 3 +-
16 files changed, 659 insertions(+), 75 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index a44089c5651..cdd317c5466 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -35,7 +35,6 @@ import org.apache.druid.msq.exec.ControllerMemoryParameters;
import org.apache.druid.msq.exec.MSQMetriceEventBuilder;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.exec.SegmentSource;
-import org.apache.druid.msq.exec.WorkerFailureListener;
import org.apache.druid.msq.exec.WorkerManager;
import org.apache.druid.msq.indexing.IndexerControllerContext;
import org.apache.druid.msq.indexing.MSQSpec;
@@ -203,8 +202,7 @@ public class DartControllerContext implements
ControllerContext
public WorkerManager newWorkerManager(
String queryId,
MSQSpec querySpec,
- ControllerQueryKernelConfig queryKernelConfig,
- WorkerFailureListener workerFailureListener
+ ControllerQueryKernelConfig queryKernelConfig
)
{
// We're ignoring WorkerFailureListener. Dart worker failures are routed
into the controller by
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
index c49e0b98aed..5ce31286a5f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
@@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.dart.worker.DartWorkerClient;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.WorkerClient;
+import org.apache.druid.msq.exec.WorkerFailureListener;
import org.apache.druid.msq.exec.WorkerManager;
import org.apache.druid.msq.exec.WorkerStats;
import org.apache.druid.msq.indexing.WorkerCount;
@@ -85,7 +86,7 @@ public class DartWorkerManager implements WorkerManager
}
@Override
- public ListenableFuture<?> start()
+ public ListenableFuture<?> start(WorkerFailureListener workerFailureListener)
{
if (!state.compareAndSet(State.NEW, State.STARTED)) {
throw new ISE("Cannot start from state[%s]", state.get());
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index a25c944cf63..7c0f7c2300a 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -99,16 +99,14 @@ public interface ControllerContext
/**
* Provides services about workers: starting, canceling, obtaining status.
*
- * @param queryId query ID
- * @param querySpec query spec
- * @param queryKernelConfig config from {@link
#queryKernelConfig(MSQSpec)}
- * @param workerFailureListener listener that receives callbacks when
workers fail
+ * @param queryId query ID
+ * @param querySpec query spec
+ * @param queryKernelConfig config from {@link #queryKernelConfig(MSQSpec)}
*/
WorkerManager newWorkerManager(
String queryId,
MSQSpec querySpec,
- ControllerQueryKernelConfig queryKernelConfig,
- WorkerFailureListener workerFailureListener
+ ControllerQueryKernelConfig queryKernelConfig
);
/**
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 0851fa7be7b..7bd6d79b557 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -213,6 +213,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -281,8 +282,8 @@ public class ControllerImpl implements Controller
private WorkerSketchFetcher workerSketchFetcher;
- // WorkerNumber -> WorkOrders which need to be retried and our determined by
the controller.
- // Map is always populated in the main controller thread by addToRetryQueue,
and pruned in retryFailedTasks.
+ // WorkerNumber -> WorkOrders which need to be retried and are determined by
the controller.
+ // Map is always populated in the main controller thread by addToRetryQueue
and pruned in retryFailedTasks.
private final Map<Integer, Set<WorkOrder>> workOrdersToRetry = new
HashMap<>();
// Time at which the query started.
@@ -300,6 +301,8 @@ public class ControllerImpl implements Controller
@Nullable
private MSQSegmentReport segmentReport;
+ private final AtomicLong mainThreadId = new AtomicLong();
+
public ControllerImpl(
final LegacyMSQSpec querySpec,
final ResultsContext resultsContext,
@@ -391,6 +394,8 @@ public class ControllerImpl implements Controller
final TaskState taskStateForReport;
final MSQErrorReport errorForReport;
+ mainThreadId.set(Thread.currentThread().getId());
+
try {
// Planning-related: convert the native query from MSQSpec into a
multi-stage QueryDefinition.
this.queryStartTime = DateTimes.nowUtc();
@@ -427,7 +432,7 @@ public class ControllerImpl implements Controller
exceptionEncountered = e;
}
- // Fetch final counters in separate try, in case runQueryUntilDone threw
an exception.
+ // Fetch final counters in a separate try, in case runQueryUntilDone threw
an exception.
try {
countersSnapshot = getFinalCountersSnapshot(queryKernel);
}
@@ -744,16 +749,7 @@ public class ControllerImpl implements Controller
workerManager = context.newWorkerManager(
context.queryId(),
querySpec,
- queryKernelConfig,
- (failedTask, fault) -> {
- if (queryKernelConfig.isFaultTolerant() &&
ControllerQueryKernel.isRetriableFault(fault)) {
- addToKernelManipulationQueue(kernel -> {
- addToRetryQueue(kernel, failedTask.getWorkerNumber(), fault);
- });
- } else {
- throw new MSQException(fault);
- }
- }
+ queryKernelConfig
);
if (queryKernelConfig.isFaultTolerant() && !(workerManager instanceof
RetryCapableWorkerManager)) {
@@ -786,6 +782,22 @@ public class ControllerImpl implements Controller
return queryDef;
}
+ private WorkerFailureListener getWorkerFailureListener(ControllerQueryKernel
controllerQueryKernel)
+ {
+ return (failedTask, fault) -> {
+ throwIfNonRetriableFault(fault);
+ if (Thread.currentThread().getId() == mainThreadId.get()) {
+ // this is called from the main controller thread, so we can directly
access the kernel.
+ addToRetryQueue(controllerQueryKernel, failedTask.getWorkerNumber(),
fault);
+ } else {
+ // since this is called from the task launcher thread, we need to add
it to the kernel manipulation queue so that only the controller thread can
manipulate the kernel.
+ addToKernelManipulationQueue(kernel -> {
+ addToRetryQueue(kernel, failedTask.getWorkerNumber(), fault);
+ });
+ }
+ };
+ }
+
/**
* Adds the work orders for worker to {@link
ControllerImpl#workOrdersToRetry} if the {@link ControllerQueryKernel}
determines that there
* are work orders which needs reprocessing.
@@ -800,7 +812,7 @@ public class ControllerImpl implements Controller
List<WorkOrder> retriableWorkOrders =
kernel.getWorkInCaseWorkerEligibleForRetryElseThrow(worker, fault);
if (!retriableWorkOrders.isEmpty()) {
- log.info("Submitting worker[%s] for relaunch because of fault[%s]",
worker, fault);
+ log.debug("Submitting worker[%s] for relaunch because of fault[%s]",
worker, fault);
retryCapableWorkerManager.submitForRelaunch(worker);
workOrdersToRetry.compute(
worker,
@@ -1379,7 +1391,7 @@ public class ControllerImpl implements Controller
final boolean retryOnFailure
)
{
- // Sorted copy of target worker numbers to ensure consistent iteration
order.
+ // Sorted copy of target worker numbers to ensure a consistent iteration
order.
final List<Integer> workersCopy = Ordering.natural().sortedCopy(workers);
final List<String> workerIds = getWorkerIds();
final List<ListenableFuture<Void>> workerFutures = new
ArrayList<>(workersCopy.size());
@@ -2222,7 +2234,7 @@ public class ControllerImpl implements Controller
private final ControllerQueryKernel queryKernel;
/**
- * Return value of {@link WorkerManager#start()}. Set by {@link
#startTaskLauncher()}.
+ * Return value of {@link WorkerManager#start(WorkerFailureListener)} )}.
Set by {@link #startTaskLauncher()}.
*/
private ListenableFuture<?> workerTaskLauncherFuture;
@@ -2433,13 +2445,13 @@ public class ControllerImpl implements Controller
// Start tasks.
log.debug("Query [%s] starting task launcher.", queryDef.getQueryId());
- workerTaskLauncherFuture = workerManager.start();
+ workerTaskLauncherFuture =
workerManager.start(getWorkerFailureListener(queryKernel));
closer.register(() -> workerManager.stop(true));
workerTaskLauncherFuture.addListener(
() ->
addToKernelManipulationQueue(queryKernel -> {
- // Throw an exception in the main loop, if anything went wrong.
+ // Throw an exception in the main loop if anything went wrong.
FutureUtils.getUncheckedImmediately(workerTaskLauncherFuture);
}),
Execs.directExecutor()
@@ -2529,7 +2541,7 @@ public class ControllerImpl implements Controller
);
for (final StageId stageId : newStageIds) {
- // Allocate segments, if this is the final stage of an ingestion.
+ // Allocate segments if this is the final stage of ingestion.
if (MSQControllerTask.isIngestion(querySpec)
&& stageId.getStageNumber() ==
queryDef.getFinalStageDefinition().getStageNumber()
&& (((DataSourceMSQDestination)
querySpec.getDestination()).getTerminalStageSpec() instanceof
SegmentGenerationStageSpec)) {
@@ -2848,6 +2860,13 @@ public class ControllerImpl implements Controller
}
}
+ private void throwIfNonRetriableFault(MSQFault fault)
+ {
+ if (!queryKernelConfig.isFaultTolerant() ||
!ControllerQueryKernel.isRetriableFault(fault)) {
+ throw new MSQException(fault);
+ }
+ }
+
static ClusterStatisticsMergeMode finalizeClusterStatisticsMergeMode(
StageDefinition stageDef,
ClusterStatisticsMergeMode initialMode
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RetryCapableWorkerManager.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RetryCapableWorkerManager.java
index d5b3d41d7a2..b38f6240d98 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RetryCapableWorkerManager.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RetryCapableWorkerManager.java
@@ -30,7 +30,7 @@ public interface RetryCapableWorkerManager extends
WorkerManager
void submitForRelaunch(int workerNumber);
/**
- * Report a worker that failed without active orders. To be retried if it is
requried for future stages only.
+ * Report a worker that failed without active orders. To be retried if it is
required for future stages only.
*/
void reportFailedInactiveWorker(int workerNumber);
@@ -39,7 +39,7 @@ public interface RetryCapableWorkerManager extends
WorkerManager
* to figure out if the worker taskId is canceled by the controller. If yes,
the errors from that worker taskId
* are ignored for the error reports.
*
- * @return true if task is canceled by the controller, else false
+ * @return true if the task is canceled by the controller, else false
*/
boolean isTaskCanceledByController(String taskId);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
index a4dc62aefd2..db25641fb13 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerManager.java
@@ -42,7 +42,7 @@ public interface WorkerManager
* resolves to an exception if one of the workers fails without being
explicitly canceled, or if something else
* goes wrong.
*/
- ListenableFuture<?> start();
+ ListenableFuture<?> start(WorkerFailureListener workerFailureListener);
/**
* Launch additional workers, if needed, to bring the number of running
workers up to {@code workerCount}.
@@ -56,13 +56,14 @@ public interface WorkerManager
*/
void waitForWorkers(Set<Integer> workerNumbers) throws InterruptedException;
+
/**
- * List of currently-active workers.
+ * List of currently active workers.
*/
List<String> getWorkerIds();
/**
- * Number of currently-active and currently-pending workers.
+ * Number of currently active and currently-pending workers.
*/
WorkerCount getWorkerCount();
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
index 0139185a005..8baf54c46fc 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java
@@ -138,6 +138,7 @@ public class WorkerSketchFetcher implements AutoCloseable
}
try {
+ // since this is running another in thread, hence the controller thread
does not block; hence we need not call with failure handler.
workerManager.waitForWorkers(ImmutableSet.of(worker));
}
catch (InterruptedException interruptedException) {
@@ -146,7 +147,7 @@ public class WorkerSketchFetcher implements AutoCloseable
return;
}
- // if task is not the latest task. It must have retried.
+ // if the task is not the latest task, it must have retried.
if (!workerManager.isWorkerActive(taskId)) {
log.info("Task[%s] is no longer the latest task for worker[%d]. Skipping
fetch.", taskId, worker);
return;
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 079af24c390..67fc6aadb24 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -38,7 +38,6 @@ import org.apache.druid.msq.exec.MSQMetriceEventBuilder;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.exec.WorkerClient;
-import org.apache.druid.msq.exec.WorkerFailureListener;
import org.apache.druid.msq.exec.WorkerManager;
import org.apache.druid.msq.guice.MultiStageQuery;
import
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
@@ -217,15 +216,13 @@ public class IndexerControllerContext implements
ControllerContext
public WorkerManager newWorkerManager(
final String queryId,
final MSQSpec querySpec,
- final ControllerQueryKernelConfig queryKernelConfig,
- final WorkerFailureListener workerFailureListener
+ final ControllerQueryKernelConfig queryKernelConfig
)
{
return new MSQWorkerTaskLauncher(
queryId,
taskDataSource,
overlordClient,
- workerFailureListener,
makeTaskContext(querySpec, queryKernelConfig, taskContext),
// 10 minutes +- 2 minutes jitter
TimeUnit.SECONDS.toMillis(600 +
ThreadLocalRandom.current().nextInt(-4, 5) * 30L),
@@ -338,7 +335,7 @@ public class IndexerControllerContext implements
ControllerContext
}
/**
- * Helper method for {@link #newWorkerManager}, split out to be used in
tests.
+ * Helper method for {@link ControllerContext#newWorkerManager}, split out
to be used in tests.
*
* @param querySpec MSQ query spec; used for
*/
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index c7b1471380c..5155eb25493 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -29,9 +29,11 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -42,6 +44,7 @@ import org.apache.druid.msq.exec.RetryCapableWorkerManager;
import org.apache.druid.msq.exec.WorkerFailureListener;
import org.apache.druid.msq.exec.WorkerStats;
import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQFault;
import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault;
import org.apache.druid.msq.indexing.error.TooManyAttemptsForJob;
import org.apache.druid.msq.indexing.error.TooManyAttemptsForWorker;
@@ -67,7 +70,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
- * Like {@link
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor}, but
different.
+ * Like {@link TaskMonitor}, but different.
*/
public class MSQWorkerTaskLauncher implements RetryCapableWorkerManager
{
@@ -146,7 +149,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
private final Set<Integer> failedInactiveWorkers =
ConcurrentHashMap.newKeySet();
private final ConcurrentHashMap<Integer, List<String>> workerToTaskIds = new
ConcurrentHashMap<>();
- private final WorkerFailureListener workerFailureListener;
+ private final AtomicReference<WorkerFailureListener>
workerFailureListenerRef = new AtomicReference<>();
private final AtomicLong recentFullyStartedWorkerTimeInMillis = new
AtomicLong(System.currentTimeMillis());
@@ -154,7 +157,6 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
final String controllerTaskId,
final String dataSource,
final OverlordClient overlordClient,
- final WorkerFailureListener workerFailureListener,
final Map<String, Object> taskContextOverrides,
final long maxTaskStartDelayMillis,
final MSQWorkerTaskLauncherConfig config
@@ -167,14 +169,17 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
this.exec = Execs.singleThreaded(
"multi-stage-query-task-launcher[" +
StringUtils.encodeForFormat(controllerTaskId) + "]-%s"
);
- this.workerFailureListener = workerFailureListener;
this.maxTaskStartDelayMillis = maxTaskStartDelayMillis;
this.config = config;
}
@Override
- public ListenableFuture<?> start()
+ public ListenableFuture<?> start(WorkerFailureListener workerFailureListener)
{
+ if (!this.workerFailureListenerRef.compareAndSet(null,
workerFailureListener)) {
+ throw DruidException.defensive("WorkerFailureListener already set for
MSQWorkerTaskLauncher");
+ }
+
if (state.compareAndSet(State.NEW, State.STARTED)) {
exec.submit(() -> {
try {
@@ -185,7 +190,6 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
}
});
}
-
// Return an "everything is done" future that callers can wait for.
return stopFuture;
}
@@ -244,22 +248,36 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
}
@Override
- public void launchWorkersIfNeeded(final int taskCount) throws
InterruptedException
+ public void launchWorkersIfNeeded(final int workerCount)
+ throws InterruptedException
{
synchronized (taskIds) {
- retryInactiveTasksIfNeeded(taskCount);
+ retryInactiveTasksIfNeeded(workerCount);
- if (taskCount > desiredTaskCount) {
- desiredTaskCount = taskCount;
+ if (workerCount > desiredTaskCount) {
+ desiredTaskCount = workerCount;
taskIds.notifyAll();
}
- while (taskIds.size() < taskCount || !allTasksStarted(taskCount)) {
+ while (taskIds.size() < workerCount || !allTasksStarted(workerCount)) {
if (stopFuture.isDone() || stopFuture.isCancelled()) {
FutureUtils.getUnchecked(stopFuture, false);
throw new ISE("Stopped");
}
-
+ // add failed tasks to retry the queue
+ if (workerFailureListenerRef.get() != null) {
+ for (TaskTracker taskTracker : taskTrackers.values()) {
+ if (taskTracker.isRetrying()) {
+ invokeFailureListener(
+ taskTracker,
+ new WorkerFailedFault(
+ taskTracker.msqWorkerTask.getId(),
+ taskTracker.statusRef.get().getErrorMsg()
+ )
+ );
+ }
+ }
+ }
taskIds.wait();
}
}
@@ -301,7 +319,8 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
}
@Override
- public void waitForWorkers(Set<Integer> workerNumbers) throws
InterruptedException
+ public void waitForWorkers(Set<Integer> workerNumbers)
+ throws InterruptedException
{
synchronized (taskIds) {
while (!fullyStartedTasks.containsAll(workerNumbers)) {
@@ -309,6 +328,20 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
FutureUtils.getUnchecked(stopFuture, false);
throw new ISE("Stopped");
}
+
+ if (workerFailureListenerRef.get() != null) {
+ for (TaskTracker taskTracker : taskTrackers.values()) {
+ if (taskTracker.isRetrying() &&
workerNumbers.contains(taskTracker.workerNumber)) {
+ invokeFailureListener(taskTracker,
+ new WorkerFailedFault(
+ taskTracker.msqWorkerTask.getId(),
+
taskTracker.statusRef.get().getErrorMsg()
+ )
+ );
+ }
+ }
+ }
+
taskIds.wait();
}
}
@@ -562,14 +595,11 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
}
if (tracker.statusRef.get() == null) {
+ tracker.enableRetrying();
removeWorkerFromFullyStartedWorkers(tracker);
final String errorMessage = StringUtils.format("Task [%s] status
missing", taskId);
log.info(errorMessage + ". Trying to relaunch the worker");
- tracker.enableRetrying();
- workerFailureListener.onFailure(
- tracker.msqWorkerTask,
- UnknownFault.forMessage(errorMessage)
- );
+ invokeFailureListener(tracker, UnknownFault.forMessage(errorMessage));
} else if (tracker.didRunTimeOut(maxTaskStartDelayMillis) &&
!canceledWorkerTasks.contains(taskId)) {
removeWorkerFromFullyStartedWorkers(tracker);
@@ -579,22 +609,31 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
maxTaskStartDelayMillis
));
} else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
+ tracker.enableRetrying();
removeWorkerFromFullyStartedWorkers(tracker);
TaskStatus taskStatus = tracker.statusRef.get();
log.info("Task[%s] failed because %s. Trying to relaunch the worker",
taskId, taskStatus.getErrorMsg());
- tracker.enableRetrying();
- workerFailureListener.onFailure(
- tracker.msqWorkerTask,
- new WorkerFailedFault(taskId, taskStatus.getErrorMsg())
- );
+ invokeFailureListener(tracker, new WorkerFailedFault(taskId,
taskStatus.getErrorMsg()));
}
}
}
+ private void invokeFailureListener(TaskTracker tracker, MSQFault msqFault)
+ {
+ WorkerFailureListener workerFailureListener =
workerFailureListenerRef.get();
+ if (workerFailureListener != null) {
+ workerFailureListener.onFailure(
+ tracker.msqWorkerTask,
+ msqFault
+ );
+ }
+ }
+
private void removeWorkerFromFullyStartedWorkers(TaskTracker tracker)
{
synchronized (taskIds) {
fullyStartedTasks.remove(tracker.msqWorkerTask.getWorkerNumber());
+ taskIds.notifyAll();
}
}
@@ -616,8 +655,13 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
if (tracker == null) {
throw new ISE("Did not find taskTracker for latest taskId[%s]",
latestTaskId);
}
- // if task is not failed donot retry
+ // if the task is not failed, no need to retry
if (!tracker.isComplete()) {
+ log.info(
+ "Did not relaunch worker[%d] with task id[%s] because the task
is still running",
+ tracker.workerNumber,
+ latestTaskId
+ );
return taskHistory;
}
@@ -657,6 +701,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
return taskHistory;
});
+ // remove the worker from the relaunch set
iterator.remove();
}
}
@@ -882,5 +927,10 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
return Math.max(0, currentFullyStartingTime - startTimeMillis);
}
}
+
+ public int getWorkerNumber()
+ {
+ return workerNumber;
+ }
}
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
index ea781e2e992..10aca82bf98 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartWorkerManagerTest.java
@@ -140,7 +140,7 @@ public class DartWorkerManagerTest
Mockito.when(workerClient.stopWorker(WORKERS.get(1)))
.thenReturn(Futures.immediateFuture(null));
- final ListenableFuture<?> future = workerManager.start();
+ final ListenableFuture<?> future = workerManager.start(null);
workerManager.stop(false);
// Ensure the future from start() resolves.
@@ -155,7 +155,7 @@ public class DartWorkerManagerTest
Mockito.when(workerClient.stopWorker(WORKERS.get(1)))
.thenReturn(Futures.immediateFuture(null));
- final ListenableFuture<?> future = workerManager.start();
+ final ListenableFuture<?> future = workerManager.start(null);
workerManager.stop(true);
// Ensure the future from start() resolves.
@@ -170,7 +170,7 @@ public class DartWorkerManagerTest
Mockito.when(workerClient.stopWorker(WORKERS.get(1)))
.thenReturn(Futures.immediateFuture(null));
- final ListenableFuture<?> future = workerManager.start();
+ final ListenableFuture<?> future = workerManager.start(null);
workerManager.stop(true);
// Ensure the future from start() resolves.
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
index 24df8823346..02e11444c2e 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
@@ -230,14 +230,13 @@ public class MSQTasksTest
CONTROLLER_ID,
"foo",
new TasksTestOverlordClient(numSlots),
- (task, fault) -> {},
ImmutableMap.of(),
TimeUnit.SECONDS.toMillis(5),
new MSQWorkerTaskLauncherConfig()
);
try {
- msqWorkerTaskLauncher.start();
+ msqWorkerTaskLauncher.start(null);
msqWorkerTaskLauncher.launchWorkersIfNeeded(numTasks);
fail();
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTests.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTests.java
new file mode 100644
index 00000000000..f2e2477dd47
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTests.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
+import org.apache.druid.client.indexing.IndexingWorkerInfo;
+import org.apache.druid.client.indexing.TaskPayloadResponse;
+import org.apache.druid.client.indexing.TaskStatusResponse;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.overlord.TaskQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
+import org.apache.druid.msq.exec.MSQTasks;
+import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.apache.druid.rpc.UpdateResponse;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.rpc.indexing.SegmentUpdateResponse;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.http.SegmentsToUpdateFilter;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Interval;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class MSQWorkerTaskLauncherRetryTests
+{
+
+ private static final TaskLocation RUNNING_TASK_LOCATION = new
TaskLocation("host", 1, 2, null);
+
+ @Test
+ public void mainThreadBlockingSimulationTest() throws Exception
+ {
+ final ExecutorService executors = Executors.newSingleThreadExecutor(new
ThreadFactoryBuilder().setDaemon(false)
+
.setNameFormat(
+
"Controller-simulator-%d")
+
.build());
+
+ final TestOverlordClient overlordClient = new TestOverlordClient();
+ final int failedWorkerNumber = 2;
+ final CountDownLatch workerFailedLatch = new CountDownLatch(1);
+ final CountDownLatch workerStartedLatch = new CountDownLatch(1);
+ overlordClient.addFailedWorker(2);
+ overlordClient.addUnknownLocationWorker(1);
+
+ final MSQWorkerTaskLauncher msqWorkerTaskLauncher = new
MSQWorkerTaskLauncher(
+ "controller-id",
+ "foo",
+ overlordClient,
+ ImmutableMap.of(),
+ TimeUnit.SECONDS.toMillis(5),
+ new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig()
+ );
+
+ try {
+ final long workerThreadId = Thread.currentThread().getId();
+
+ startTaskLauncher(
+ msqWorkerTaskLauncher,
+ failedWorkerNumber,
+ workerFailedLatch,
+ overlordClient,
+ workerThreadId,
+ workerStartedLatch
+ );
+
+ MockConsumer mockConsumer = new MockConsumer(
+ msqWorkerTaskLauncher,
+ 3,
+ workerStartedLatch
+ );
+ Future<?> futures = executors.submit(mockConsumer);
+ // hook called but worker not queued for relaunch.
+ workerFailedLatch.await();
+ Assertions.assertEquals(1, workerStartedLatch.getCount());
+ // we would need to call hooks to allow the main thread to proceed since
we are using an exec service to so the thread id's would not match.
+ enableWorkerRelaunch(overlordClient, failedWorkerNumber,
msqWorkerTaskLauncher, workerStartedLatch);
+ // future should be completed in 5 seconds else throw an exception.
+ Assertions.assertNull(futures.get(5, TimeUnit.SECONDS));
+ }
+ finally {
+ msqWorkerTaskLauncher.stop(true);
+ executors.shutdownNow();
+ }
+ }
+
+ private static void enableWorkerRelaunch(
+ TestOverlordClient overlordClient,
+ int failedWorkerNumber,
+ MSQWorkerTaskLauncher msqWorkerTaskLauncher,
+ CountDownLatch workerStartedLatch
+ )
+ {
+ overlordClient.removeUnknownLocationWorker(1);
+ overlordClient.removefailedWorker(failedWorkerNumber);
+ msqWorkerTaskLauncher.submitForRelaunch(failedWorkerNumber);
+ workerStartedLatch.countDown();
+ }
+
+ private static void startTaskLauncher(
+ MSQWorkerTaskLauncher msqWorkerTaskLauncher,
+ int failedWorkerNumber,
+ CountDownLatch workerFailedLatch,
+ TestOverlordClient overlordClient,
+ long workerThreadId,
+ CountDownLatch workerStartedLatch
+ )
+ {
+ msqWorkerTaskLauncher.start((task, fault) -> {
+ Assertions.assertEquals(failedWorkerNumber, task.getWorkerNumber());
+ workerFailedLatch.countDown();
+ if (workerThreadId == Thread.currentThread().getId()) {
+ // If the worker thread is the same as the main thread, we can
directly relaunch the worker.
+ enableWorkerRelaunch(overlordClient, failedWorkerNumber,
msqWorkerTaskLauncher, workerStartedLatch);
+ }
+ });
+ }
+
+
+ @Test
+ public void mainThreadNonBlockingSimulationTest() throws Exception
+ {
+ final TestOverlordClient overlordClient = new TestOverlordClient();
+ final int failedWorkerNumber = 2;
+ final CountDownLatch workerFailedLatch = new CountDownLatch(1);
+ final CountDownLatch workerStartedLatch = new CountDownLatch(1);
+ overlordClient.addFailedWorker(2);
+ overlordClient.addUnknownLocationWorker(1);
+
+ final MSQWorkerTaskLauncher msqWorkerTaskLauncher = new
MSQWorkerTaskLauncher(
+ "controller-id",
+ "foo",
+ overlordClient,
+ ImmutableMap.of(),
+ TimeUnit.SECONDS.toMillis(5),
+ new MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig()
+ );
+
+ try {
+ final long workerThreadId = Thread.currentThread().getId();
+
+ startTaskLauncher(
+ msqWorkerTaskLauncher,
+ failedWorkerNumber,
+ workerFailedLatch,
+ overlordClient,
+ workerThreadId,
+ workerStartedLatch
+ );
+
+
+ MockConsumer mockConsumer = new MockConsumer(
+ msqWorkerTaskLauncher,
+ 3,
+ workerStartedLatch
+ );
+ mockConsumer.run();
+ // failed latch called
+ workerFailedLatch.await();
+ // worker started.
+ workerStartedLatch.await();
+ }
+ finally {
+ msqWorkerTaskLauncher.stop(true);
+ }
+ }
+
+
+ private static class MockConsumer implements Runnable
+ {
+
+ private final MSQWorkerTaskLauncher msqWorkerTaskLauncher;
+ private final int taskCount;
+ private final CountDownLatch workerStartedLatch;
+
+ public MockConsumer(
+ MSQWorkerTaskLauncher msqWorkerTaskLauncher,
+ int tasksCount,
+ CountDownLatch workerStartedLatch
+ )
+ {
+ this.msqWorkerTaskLauncher = msqWorkerTaskLauncher;
+ this.taskCount = tasksCount;
+ this.workerStartedLatch = workerStartedLatch;
+ }
+
+
+ @Override
+ public void run()
+ {
+ // start stages
+ try {
+ msqWorkerTaskLauncher.launchWorkersIfNeeded(taskCount);
+ workerStartedLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ Set<Integer> workerNumbers = new HashSet<>();
+ for (int i = 0; i < taskCount; i++) {
+ workerNumbers.add(i);
+ }
+
+ // submit work worders
+ try {
+ msqWorkerTaskLauncher.waitForWorkers(workerNumbers);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+
+ private static class TestOverlordClient implements OverlordClient
+ {
+ private final ConcurrentSkipListSet<Integer> unknownLocationWorkers = new
ConcurrentSkipListSet<>();
+ private final ConcurrentSkipListSet<Integer> failedWorkers = new
ConcurrentSkipListSet<>();
+
+ public TestOverlordClient()
+ {
+ }
+
+ @Override
+ public ListenableFuture<URI> findCurrentLeader()
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<Void> runTask(String taskId, Object taskObject)
+ {
+ return Futures.immediateFuture(null);
+ }
+
+ @Override
+ public ListenableFuture<Void> cancelTask(String taskId)
+ {
+ if (failedWorkers.contains(MSQTasks.workerFromTaskId(taskId))) {
+ return Futures.immediateFuture(null);
+ } else {
+ throw DruidException.defensive("Task %s should not be cancelled",
taskId);
+ }
+ }
+
+ @Override
+ public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
+ @Nullable String state,
+ @Nullable String dataSource,
+ @Nullable Integer maxCompletedTasks
+ )
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<Map<String, TaskStatus>> taskStatuses(Set<String>
taskIds)
+ {
+ final Map<String, TaskStatus> taskStatusMap = new HashMap<>();
+ for (String taskId : taskIds) {
+ int workerNumber = MSQTasks.workerFromTaskId(taskId);
+ if (failedWorkers.contains(workerNumber)) {
+ taskStatusMap.put(taskId, TaskStatus.failure(taskId,
TaskQueue.FAILED_TO_RUN_TASK_SEE_OVERLORD_MSG));
+ } else if (unknownLocationWorkers.contains(workerNumber)) {
+ taskStatusMap.put(taskId,
TaskStatus.running(taskId).withLocation(TaskLocation.unknown()));
+ } else {
+ taskStatusMap.put(taskId,
TaskStatus.running(taskId).withLocation(RUNNING_TASK_LOCATION));
+ }
+ }
+ return Futures.immediateFuture(taskStatusMap);
+ }
+
+ @Override
+ public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
+ {
+ if (failedWorkers.contains(MSQTasks.workerFromTaskId(taskId))) {
+ return Futures.immediateFuture(new TaskStatusResponse(taskId,
createFailedTaskStatus(taskId)));
+ }
+ if (unknownLocationWorkers.contains(MSQTasks.workerFromTaskId(taskId))) {
+ return Futures.immediateFuture(new TaskStatusResponse(taskId,
createRunningTaskStatusWithOutLocation(taskId)));
+ } else {
+ return Futures.immediateFuture(new TaskStatusResponse(taskId,
createRunningTaskStatus(taskId)));
+ }
+ }
+
+ @Override
+ public ListenableFuture<TaskReport.ReportMap> taskReportAsMap(String
taskId)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<TaskPayloadResponse> taskPayload(String taskId)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<Map<String, String>> postSupervisor(SupervisorSpec
supervisor)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<Map<String, String>> terminateSupervisor(String
supervisorId)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<CloseableIterator<SupervisorStatus>>
supervisorStatuses()
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<Map<String, List<Interval>>>
findLockedIntervals(List<LockFilterPolicy> lockFilterPolicies)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<Integer> killPendingSegments(String dataSource,
Interval interval)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<List<IndexingWorkerInfo>> getWorkers()
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<IndexingTotalWorkerCapacityInfo>
getTotalWorkerCapacity()
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<Boolean> isCompactionSupervisorEnabled()
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<ClusterCompactionConfig>
getClusterCompactionConfig()
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<UpdateResponse>
updateClusterCompactionConfig(ClusterCompactionConfig config)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<SegmentUpdateResponse>
markNonOvershadowedSegmentsAsUsed(String dataSource)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<SegmentUpdateResponse>
markNonOvershadowedSegmentsAsUsed(
+ String dataSource,
+ SegmentsToUpdateFilter filter
+ )
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<SegmentUpdateResponse> markSegmentAsUsed(SegmentId
segmentId)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<SegmentUpdateResponse> markSegmentsAsUnused(String
dataSource)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<SegmentUpdateResponse> markSegmentsAsUnused(
+ String dataSource,
+ SegmentsToUpdateFilter filter
+ )
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public ListenableFuture<SegmentUpdateResponse>
markSegmentAsUnused(SegmentId segmentId)
+ {
+ throw new UOE("Not implemented");
+ }
+
+ @Override
+ public OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
+ {
+ return this;
+ }
+
+ public void addUnknownLocationWorker(int workerNumber)
+ {
+ unknownLocationWorkers.add(workerNumber);
+ }
+
+ public void addFailedWorker(int workerNumber)
+ {
+ failedWorkers.add(workerNumber);
+ }
+
+ public void removefailedWorker(int workerNumber)
+ {
+ failedWorkers.remove(workerNumber);
+ }
+
+ public void removeUnknownLocationWorker(int workerNumber)
+ {
+ unknownLocationWorkers.remove(workerNumber);
+ }
+
+ private TaskStatusPlus createRunningTaskStatusWithOutLocation(String
taskid)
+ {
+ return createTaskStatusPlus(taskid, TaskState.RUNNING,
TaskLocation.unknown());
+ }
+
+ private TaskStatusPlus createRunningTaskStatus(String taskid)
+ {
+ return createTaskStatusPlus(taskid, TaskState.RUNNING,
RUNNING_TASK_LOCATION);
+ }
+
+ private TaskStatusPlus createFailedTaskStatus(String taskid)
+ {
+ return createTaskStatusPlus(taskid, TaskState.FAILED,
RUNNING_TASK_LOCATION);
+ }
+
+ private TaskStatusPlus createTaskStatusPlus(String taskid, TaskState
taskState, TaskLocation location)
+ {
+ return new TaskStatusPlus(
+ taskid,
+ "group-id",
+ "type",
+ DateTime.now(DateTimeZone.UTC),
+ DateTime.now(DateTimeZone.UTC),
+ taskState,
+ RunnerTaskState.RUNNING,
+ 1000L,
+ location,
+ "dataSource",
+ null
+ );
+ }
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
index 1e00be629a1..3cd1b5b98f8 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
@@ -21,6 +21,8 @@ package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.msq.exec.WorkerFailureListener;
import
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.junit.Assert;
@@ -42,7 +44,6 @@ public class MSQWorkerTaskLauncherTest
"controller-id",
"foo",
Mockito.mock(OverlordClient.class),
- (task, fault) -> {},
ImmutableMap.of(),
TimeUnit.SECONDS.toMillis(5),
new MSQWorkerTaskLauncherConfig()
@@ -57,4 +58,17 @@ public class MSQWorkerTaskLauncherTest
Assert.assertEquals(target.getWorkersToRelaunch(), ImmutableSet.of(1));
}
+
+ @Test
+ public void testMultipleWorkerFailureRegistration()
+ {
+ target.start(getWorkerFailureListener());
+ Assert.assertThrows(DruidException.class, () ->
target.start(getWorkerFailureListener()));
+ }
+
+ private static WorkerFailureListener getWorkerFailureListener()
+ {
+ return (a, b) -> {
+ };
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 6587703dfef..dd618faafd6 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -53,7 +53,6 @@ import org.apache.druid.msq.exec.MSQMetriceEventBuilder;
import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerClient;
-import org.apache.druid.msq.exec.WorkerFailureListener;
import org.apache.druid.msq.exec.WorkerImpl;
import org.apache.druid.msq.exec.WorkerManager;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
@@ -347,8 +346,7 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
public WorkerManager newWorkerManager(
String queryId,
MSQSpec querySpec,
- ControllerQueryKernelConfig queryKernelConfig,
- WorkerFailureListener workerFailureListener
+ ControllerQueryKernelConfig queryKernelConfig
)
{
MSQWorkerTaskLauncherConfig taskLauncherConfig = new
MSQWorkerTaskLauncherConfig();
@@ -360,7 +358,6 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
controller.queryId(),
"test-datasource",
overlordClient,
- workerFailureListener,
IndexerControllerContext.makeTaskContext(querySpec, queryKernelConfig,
ImmutableMap.of()),
0,
taskLauncherConfig
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MSQFaultUtilsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MSQFaultUtilsTest.java
index 21b04623eda..d7d0af09e2f 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MSQFaultUtilsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MSQFaultUtilsTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.msq.util;
+import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
@@ -34,6 +35,10 @@ public class MSQFaultUtilsTest
{
Assert.assertEquals(UnknownFault.CODE,
MSQFaultUtils.getErrorCodeFromMessage(
"Task execution process exited unsuccessfully with code[137]. See
middleManager logs for more details..."));
+ Assert.assertEquals(
+ UnknownFault.CODE,
+
MSQFaultUtils.getErrorCodeFromMessage(TaskQueue.FAILED_TO_RUN_TASK_SEE_OVERLORD_MSG)
+ );
Assert.assertEquals(UnknownFault.CODE,
MSQFaultUtils.getErrorCodeFromMessage(""));
Assert.assertEquals(UnknownFault.CODE,
MSQFaultUtils.getErrorCodeFromMessage(null));
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index f253c5a0607..f18c3b4465e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -102,6 +102,7 @@ import java.util.stream.Collectors;
*/
public class TaskQueue
{
+ public static final String FAILED_TO_RUN_TASK_SEE_OVERLORD_MSG = "Failed to
run task. See overlord logs for more details.";
private static final long MANAGEMENT_WAIT_TIMEOUT_NANOS =
TimeUnit.SECONDS.toNanos(60);
private static final long MIN_WAIT_TIME_MS = 100;
@@ -782,7 +783,7 @@ public class TaskQueue
statusUpdatesInQueue.incrementAndGet();
TaskStatus status = TaskStatus.failure(
task.getId(),
- "Failed to run task. See overlord logs for more details."
+ FAILED_TO_RUN_TASK_SEE_OVERLORD_MSG
);
taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]