This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 63268a5023 Relaunch track of failed workers without work orders 
(#14166)
63268a5023 is described below

commit 63268a50236bf7b621b07cf64eacb69c955b0efe
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu Apr 27 19:38:05 2023 +0530

    Relaunch track of failed workers without work orders (#14166)
    
    * If a worker dies after it has finished generating results, MSQ decides to 
not retry it as it has no active work orders. However, since we don't keep 
track of it further, if it is required for a future stage, the controller hangs 
waiting for the worker to be ready. This PR keeps tracks of any workers the 
controller decides to not restart immediately and while starting workers for 
the next stage, queues these workers for retry.
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  1 +
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  | 44 ++++++++++++++--
 .../msq/indexing/MSQWorkerTaskLauncherTest.java    | 58 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 3 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index c65dfc5498..c155c144df 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -670,6 +670,7 @@ public class ControllerImpl implements Controller
           "Worker[%d] has no active workOrders that need relaunch therefore 
not relaunching",
           worker
       );
+      workerTaskLauncher.reportFailedInactiveWorker(worker);
     }
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index d9870daf39..0890c14a84 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -124,6 +124,11 @@ public class MSQWorkerTaskLauncher
   // workers to relaunch
   private final Set<Integer> workersToRelaunch = ConcurrentHashMap.newKeySet();
 
+  // workers that failed, but without active work orders. There is no need to 
retry these unless a future stage
+  // requires it.
+  @GuardedBy("taskIds")
+  private final Set<Integer> failedInactiveWorkers = 
ConcurrentHashMap.newKeySet();
+
   private final ConcurrentHashMap<Integer, List<String>> workerToTaskIds = new 
ConcurrentHashMap<>();
   private final RetryTask retryTask;
 
@@ -222,6 +227,8 @@ public class MSQWorkerTaskLauncher
   public void launchTasksIfNeeded(final int taskCount) throws 
InterruptedException
   {
     synchronized (taskIds) {
+      retryInactiveTasksIfNeeded(taskCount);
+
       if (taskCount > desiredTaskCount) {
         desiredTaskCount = taskCount;
         taskIds.notifyAll();
@@ -238,16 +245,48 @@ public class MSQWorkerTaskLauncher
     }
   }
 
+  public void retryInactiveTasksIfNeeded(int taskCount)
+  {
+    synchronized (taskIds) {
+      // Fetch the list of workers which failed without work orders
+      Iterator<Integer> iterator = failedInactiveWorkers.iterator();
+      while (iterator.hasNext()) {
+        Integer workerNumber = iterator.next();
+        // If the controller expects the task to be running, queue it for retry
+        if (workerNumber < taskCount) {
+          submitForRelaunch(workerNumber);
+          iterator.remove();
+        }
+      }
+    }
+  }
+
+  Set<Integer> getWorkersToRelaunch()
+  {
+    return workersToRelaunch;
+  }
+
   /**
    * Queues worker for relaunch. A noop if the worker is already in the queue.
    *
-   * @param workerNumber
+   * @param workerNumber worker number
    */
   public void submitForRelaunch(int workerNumber)
   {
     workersToRelaunch.add(workerNumber);
   }
 
+  /**
+   * Report a worker that failed without active orders. To be retried if it is 
requried for future stages only.
+   * @param workerNumber worker number
+   */
+  public void reportFailedInactiveWorker(int workerNumber)
+  {
+    synchronized (taskIds) {
+      failedInactiveWorkers.add(workerNumber);
+    }
+  }
+
   /**
    * Blocks the call untill the worker tasks are ready to be contacted for 
work.
    *
@@ -605,11 +644,10 @@ public class MSQWorkerTaskLauncher
         context.workerManager().cancel(taskId);
       }
     }
-
   }
 
   /**
-   * Cleans the task indentified in {@link 
MSQWorkerTaskLauncher#relaunchTasks()} for relaunch. Asks the overlord to 
cancel the task.
+   * Cleans the task identified in {@link 
MSQWorkerTaskLauncher#relaunchTasks()} for relaunch. Asks the overlord to 
cancel the task.
    */
   private void cleanFailedTasksWhichAreRelaunched()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
new file mode 100644
index 0000000000..bc3f24065a
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.msq.exec.ControllerContext;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.TimeUnit;
+
+public class MSQWorkerTaskLauncherTest
+{
+
+  MSQWorkerTaskLauncher target;
+
+  @Before
+  public void setUp()
+  {
+    target = new MSQWorkerTaskLauncher(
+        "controller-id",
+        "foo",
+        Mockito.mock(ControllerContext.class),
+        (task, fault) -> {},
+        ImmutableMap.of(),
+        TimeUnit.SECONDS.toMillis(5)
+    );
+  }
+
+  @Test
+  public void testRetryInactiveTasks()
+  {
+    target.reportFailedInactiveWorker(1);
+    target.retryInactiveTasksIfNeeded(5);
+
+    Assert.assertEquals(target.getWorkersToRelaunch(), ImmutableSet.of(1));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to