This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 63268a5023 Relaunch track of failed workers without work orders
(#14166)
63268a5023 is described below
commit 63268a50236bf7b621b07cf64eacb69c955b0efe
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Apr 27 19:38:05 2023 +0530
Relaunch track of failed workers without work orders (#14166)
* If a worker dies after it has finished generating results, MSQ decides to
not retry it as it has no active work orders. However, since we don't keep
track of it further, if it is required for a future stage, the controller hangs
waiting for the worker to be ready. This PR keeps tracks of any workers the
controller decides to not restart immediately and while starting workers for
the next stage, queues these workers for retry.
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 1 +
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 44 ++++++++++++++--
.../msq/indexing/MSQWorkerTaskLauncherTest.java | 58 ++++++++++++++++++++++
3 files changed, 100 insertions(+), 3 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index c65dfc5498..c155c144df 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -670,6 +670,7 @@ public class ControllerImpl implements Controller
"Worker[%d] has no active workOrders that need relaunch therefore
not relaunching",
worker
);
+ workerTaskLauncher.reportFailedInactiveWorker(worker);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index d9870daf39..0890c14a84 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -124,6 +124,11 @@ public class MSQWorkerTaskLauncher
// workers to relaunch
private final Set<Integer> workersToRelaunch = ConcurrentHashMap.newKeySet();
+ // workers that failed, but without active work orders. There is no need to
retry these unless a future stage
+ // requires it.
+ @GuardedBy("taskIds")
+ private final Set<Integer> failedInactiveWorkers =
ConcurrentHashMap.newKeySet();
+
private final ConcurrentHashMap<Integer, List<String>> workerToTaskIds = new
ConcurrentHashMap<>();
private final RetryTask retryTask;
@@ -222,6 +227,8 @@ public class MSQWorkerTaskLauncher
public void launchTasksIfNeeded(final int taskCount) throws
InterruptedException
{
synchronized (taskIds) {
+ retryInactiveTasksIfNeeded(taskCount);
+
if (taskCount > desiredTaskCount) {
desiredTaskCount = taskCount;
taskIds.notifyAll();
@@ -238,16 +245,48 @@ public class MSQWorkerTaskLauncher
}
}
+ public void retryInactiveTasksIfNeeded(int taskCount)
+ {
+ synchronized (taskIds) {
+ // Fetch the list of workers which failed without work orders
+ Iterator<Integer> iterator = failedInactiveWorkers.iterator();
+ while (iterator.hasNext()) {
+ Integer workerNumber = iterator.next();
+ // If the controller expects the task to be running, queue it for retry
+ if (workerNumber < taskCount) {
+ submitForRelaunch(workerNumber);
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ Set<Integer> getWorkersToRelaunch()
+ {
+ return workersToRelaunch;
+ }
+
/**
* Queues worker for relaunch. A noop if the worker is already in the queue.
*
- * @param workerNumber
+ * @param workerNumber worker number
*/
public void submitForRelaunch(int workerNumber)
{
workersToRelaunch.add(workerNumber);
}
+ /**
+ * Report a worker that failed without active orders. To be retried if it is
requried for future stages only.
+ * @param workerNumber worker number
+ */
+ public void reportFailedInactiveWorker(int workerNumber)
+ {
+ synchronized (taskIds) {
+ failedInactiveWorkers.add(workerNumber);
+ }
+ }
+
/**
* Blocks the call untill the worker tasks are ready to be contacted for
work.
*
@@ -605,11 +644,10 @@ public class MSQWorkerTaskLauncher
context.workerManager().cancel(taskId);
}
}
-
}
/**
- * Cleans the task indentified in {@link
MSQWorkerTaskLauncher#relaunchTasks()} for relaunch. Asks the overlord to
cancel the task.
+ * Cleans the task identified in {@link
MSQWorkerTaskLauncher#relaunchTasks()} for relaunch. Asks the overlord to
cancel the task.
*/
private void cleanFailedTasksWhichAreRelaunched()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
new file mode 100644
index 0000000000..bc3f24065a
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.msq.exec.ControllerContext;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.TimeUnit;
+
+public class MSQWorkerTaskLauncherTest
+{
+
+ MSQWorkerTaskLauncher target;
+
+ @Before
+ public void setUp()
+ {
+ target = new MSQWorkerTaskLauncher(
+ "controller-id",
+ "foo",
+ Mockito.mock(ControllerContext.class),
+ (task, fault) -> {},
+ ImmutableMap.of(),
+ TimeUnit.SECONDS.toMillis(5)
+ );
+ }
+
+ @Test
+ public void testRetryInactiveTasks()
+ {
+ target.reportFailedInactiveWorker(1);
+ target.retryInactiveTasksIfNeeded(5);
+
+ Assert.assertEquals(target.getWorkersToRelaunch(), ImmutableSet.of(1));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]