This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 450ecd6370 More efficient generation of ImmutableWorkerHolder from 
WorkerHolder. (#14546)
450ecd6370 is described below

commit 450ecd6370d47aff58b4160efd95a82396897d5f
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jul 13 07:57:16 2023 -0700

    More efficient generation of ImmutableWorkerHolder from WorkerHolder. 
(#14546)
    
    * More efficient generation of ImmutableWorkerHolder from WorkerHolder.
    
    Taking the work done in #12096 a little further:
    
    1) Applying a similar optimization to WorkerHolder (HttpRemoteTaskRunner).
       The original patch only helped with the ZkWorker (RemoteTaskRunner).
    
    2) Improve the ZkWorker version somewhat by avoiding multiple iterations
       through the task announcements map.
    
    * Pick better names and use better logic.
    
    * Only runnable tasks.
    
    * Fix test.
    
    * Fix testBlacklistZKWorkers50Percent.
---
 .../indexing/overlord/ImmutableWorkerInfo.java     | 55 +++++++++++++++++++++-
 .../druid/indexing/overlord/RemoteTaskRunner.java  | 10 ++++
 .../apache/druid/indexing/overlord/ZkWorker.java   |  9 +---
 ...PendingTaskBasedWorkerProvisioningStrategy.java |  5 +-
 .../druid/indexing/overlord/hrtr/WorkerHolder.java | 47 +-----------------
 ...teTaskRunnerRunPendingTasksConcurrencyTest.java |  4 +-
 .../indexing/overlord/RemoteTaskRunnerTest.java    | 54 +++++++++++----------
 .../overlord/RemoteTaskRunnerTestUtils.java        |  2 +-
 8 files changed, 103 insertions(+), 83 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
index aaea3f453d..b1eafe0dc5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
@@ -20,16 +20,20 @@
 package org.apache.druid.indexing.overlord;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.guice.annotations.PublicApi;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.worker.TaskAnnouncement;
 import org.apache.druid.indexing.worker.Worker;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -44,6 +48,8 @@ public class ImmutableWorkerInfo
   private final ImmutableSet<String> availabilityGroups;
   private final ImmutableSet<String> runningTasks;
   private final DateTime lastCompletedTaskTime;
+
+  @Nullable
   private final DateTime blacklistedUntil;
 
   @JsonCreator
@@ -76,7 +82,8 @@ public class ImmutableWorkerInfo
   )
   {
     this(worker, currCapacityUsed, currParallelIndexCapacityUsed, 
availabilityGroups,
-         runningTasks, lastCompletedTaskTime, null);
+         runningTasks, lastCompletedTaskTime, null
+    );
   }
 
   public ImmutableWorkerInfo(
@@ -90,6 +97,51 @@ public class ImmutableWorkerInfo
     this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks, 
lastCompletedTaskTime, null);
   }
 
+  /**
+   * Helper used by {@link ZkWorker} and {@link 
org.apache.druid.indexing.overlord.hrtr.WorkerHolder}.
+   */
+  public static ImmutableWorkerInfo fromWorkerAnnouncements(
+      final Worker worker,
+      final Map<String, TaskAnnouncement> announcements,
+      final DateTime lastCompletedTaskTime,
+      @Nullable final DateTime blacklistedUntil
+  )
+  {
+    int currCapacityUsed = 0;
+    int currParallelIndexCapacityUsed = 0;
+    ImmutableSet.Builder<String> taskIds = ImmutableSet.builder();
+    ImmutableSet.Builder<String> availabilityGroups = ImmutableSet.builder();
+
+    for (final Map.Entry<String, TaskAnnouncement> entry : 
announcements.entrySet()) {
+      final TaskAnnouncement announcement = entry.getValue();
+
+      if (announcement.getStatus().isRunnable()) {
+        final String taskId = entry.getKey();
+        final TaskResource taskResource = announcement.getTaskResource();
+        final int requiredCapacity = taskResource.getRequiredCapacity();
+
+        currCapacityUsed += requiredCapacity;
+
+        if 
(ParallelIndexSupervisorTask.TYPE.equals(announcement.getTaskType())) {
+          currParallelIndexCapacityUsed += requiredCapacity;
+        }
+
+        taskIds.add(taskId);
+        availabilityGroups.add(taskResource.getAvailabilityGroup());
+      }
+    }
+
+    return new ImmutableWorkerInfo(
+        worker,
+        currCapacityUsed,
+        currParallelIndexCapacityUsed,
+        availabilityGroups.build(),
+        taskIds.build(),
+        lastCompletedTaskTime,
+        blacklistedUntil
+    );
+  }
+
   @JsonProperty("worker")
   public Worker getWorker()
   {
@@ -132,6 +184,7 @@ public class ImmutableWorkerInfo
   }
 
   @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
   public DateTime getBlacklistedUntil()
   {
     return blacklistedUntil;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 0ea3039019..4bae1d2cb2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -523,6 +523,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer
     return Optional.fromNullable(provisioningService.getStats());
   }
 
+  @Nullable
   public ZkWorker findWorkerRunningTask(String taskId)
   {
     for (ZkWorker zkWorker : zkWorkers.values()) {
@@ -533,6 +534,15 @@ public class RemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer
     return null;
   }
 
+  /**
+   * Retrieve {@link ZkWorker} based on an ID (host), or null if the ID 
doesn't exist.
+   */
+  @Nullable
+  ZkWorker findWorkerId(String workerId)
+  {
+    return zkWorkers.get(workerId);
+  }
+
   public boolean isWorkerRunningTask(ZkWorker worker, String taskId)
   {
     return Preconditions.checkNotNull(worker, "worker").isRunningTask(taskId);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
index 67d30ebc8d..dadb557e84 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
@@ -229,14 +229,9 @@ public class ZkWorker implements Closeable
 
   public ImmutableWorkerInfo toImmutable()
   {
-    Map<String, TaskAnnouncement> tasks = getRunningTasks();
-
-    return new ImmutableWorkerInfo(
+    return ImmutableWorkerInfo.fromWorkerAnnouncements(
         worker.get(),
-        getCurrCapacityUsed(tasks),
-        getCurrParallelIndexCapacityUsed(tasks),
-        getAvailabilityGroups(tasks),
-        tasks.keySet(),
+        getRunningTasks(),
         lastCompletedTaskTime.get(),
         blacklistedUntil.get()
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
index 1c9ba5f486..6990973e56 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
@@ -49,6 +49,7 @@ import org.joda.time.Duration;
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -494,9 +495,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy 
extends AbstractWorkerPr
         ),
         Sets.union(
             immutableWorker.getRunningTasks(),
-            Sets.newHashSet(
-                task.getId()
-            )
+            Collections.singleton(task.getId())
         ),
         DateTimes.nowUtc()
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index 5e581d815c..6294318ec5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
-import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.TaskRunnerUtils;
 import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
@@ -47,10 +46,8 @@ import org.joda.time.DateTime;
 
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -58,7 +55,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 /**
  */
@@ -133,42 +129,6 @@ public class WorkerHolder
     return worker;
   }
 
-  private Map<String, TaskAnnouncement> getRunningTasks()
-  {
-    return tasksSnapshotRef.get().entrySet().stream().filter(
-        e -> e.getValue().getTaskStatus().isRunnable()
-    ).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
-  }
-
-  private int getCurrCapacityUsed()
-  {
-    int currCapacity = 0;
-    for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
-      currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity();
-    }
-    return currCapacity;
-  }
-
-  private int getCurrParallelIndexCapcityUsed()
-  {
-    int currParallelIndexCapacityUsed = 0;
-    for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
-      if 
(taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
-        currParallelIndexCapacityUsed += 
taskAnnouncement.getTaskResource().getRequiredCapacity();
-      }
-    }
-    return currParallelIndexCapacityUsed;
-  }
-
-  private Set<String> getAvailabilityGroups()
-  {
-    Set<String> retVal = new HashSet<>();
-    for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
-      retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup());
-    }
-    return retVal;
-  }
-
   public DateTime getBlacklistedUntil()
   {
     return blacklistedUntil.get();
@@ -201,12 +161,9 @@ public class WorkerHolder
       w = disabledWorker;
     }
 
-    return new ImmutableWorkerInfo(
+    return ImmutableWorkerInfo.fromWorkerAnnouncements(
         w,
-        getCurrCapacityUsed(),
-        getCurrParallelIndexCapcityUsed(),
-        getAvailabilityGroups(),
-        getRunningTasks().keySet(),
+        tasksSnapshotRef.get(),
         lastCompletedTaskTime.get(),
         blacklistedUntil.get()
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
index 0cb978304a..62c18d7bd2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
@@ -126,7 +126,7 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
     tasks[5] = TestTasks.unending("task5");
     results[5] = remoteTaskRunner.run(tasks[5]);
     waitForOneWorkerToHaveUnackedTasks();
-    if (rtrTestUtils.taskAnnounced("worker0", tasks[5].getId())) {
+    if (rtrTestUtils.taskAssigned("worker0", tasks[5].getId())) {
       rtrTestUtils.mockWorkerRunningTask("worker0", tasks[5]);
       rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[5]);
     } else {
@@ -138,7 +138,7 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
 
   private void mockWorkerRunningAndCompletionSuccessfulTasks(Task t1, Task t2) 
throws Exception
   {
-    if (rtrTestUtils.taskAnnounced("worker0", t1.getId())) {
+    if (rtrTestUtils.taskAssigned("worker0", t1.getId())) {
       rtrTestUtils.mockWorkerRunningTask("worker0", t1);
       rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t1);
       rtrTestUtils.mockWorkerRunningTask("worker1", t2);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 99b6e39717..a76766d226 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -101,7 +101,8 @@ public class RemoteTaskRunnerTest
   private Worker worker;
 
   @Rule
-  public TestRule watcher = new TestWatcher() {
+  public TestRule watcher = new TestWatcher()
+  {
     @Override
     protected void starting(Description description)
     {
@@ -621,7 +622,7 @@ public class RemoteTaskRunnerTest
 
   private boolean taskAnnounced(final String taskId)
   {
-    return rtrTestUtils.taskAnnounced(WORKER_HOST, taskId);
+    return rtrTestUtils.taskAssigned(WORKER_HOST, taskId);
   }
 
   private boolean workerRunningTask(final String taskId)
@@ -890,8 +891,8 @@ public class RemoteTaskRunnerTest
   }
 
   /**
-   * With 2 workers and maxPercentageBlacklistWorkers(25), neither worker 
should ever be blacklisted even after
-   * exceeding maxRetriesBeforeBlacklist.
+   * With 2 workers and maxPercentageBlacklistWorkers(25), no worker should be 
blacklisted even after exceeding
+   * maxRetriesBeforeBlacklist.
    */
   @Test
   public void testBlacklistZKWorkers25Percent() throws Exception
@@ -904,8 +905,7 @@ public class RemoteTaskRunnerTest
 
     makeRemoteTaskRunner(rtrConfig);
 
-    String firstWorker = null;
-    String secondWorker = null;
+    String assignedWorker = null;
 
     for (int i = 1; i < 13; i++) {
       String taskId = StringUtils.format("rt-%d", i);
@@ -920,26 +920,23 @@ public class RemoteTaskRunnerTest
       Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
 
       if (i == 1) {
-        if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
-          firstWorker = "worker2";
-          secondWorker = "worker";
+        if (rtrTestUtils.taskAssigned("worker2", task.getId())) {
+          assignedWorker = "worker2";
         } else {
-          firstWorker = "worker";
-          secondWorker = "worker2";
+          assignedWorker = "worker";
         }
       }
 
-      final String expectedWorker = i % 2 == 0 ? secondWorker : firstWorker;
-
-      Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, 
task.getId()));
-      rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
-      rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
+      Assert.assertTrue(rtrTestUtils.taskAssigned(assignedWorker, 
task.getId()));
+      rtrTestUtils.mockWorkerRunningTask(assignedWorker, task);
+      rtrTestUtils.mockWorkerCompleteFailedTask(assignedWorker, task);
 
       Assert.assertTrue(taskFuture.get().isFailure());
       Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
       Assert.assertEquals(
-          ((i + 1) / 2),
-          
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
+          i,
+          
remoteTaskRunner.findWorkerId("worker").getContinuouslyFailedTasksCount()
+          + 
remoteTaskRunner.findWorkerId("worker2").getContinuouslyFailedTasksCount()
       );
     }
   }
@@ -975,7 +972,7 @@ public class RemoteTaskRunnerTest
       Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
 
       if (i == 1) {
-        if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
+        if (rtrTestUtils.taskAssigned("worker2", task.getId())) {
           firstWorker = "worker2";
           secondWorker = "worker";
         } else {
@@ -984,17 +981,26 @@ public class RemoteTaskRunnerTest
         }
       }
 
-      final String expectedWorker = i % 2 == 0 || i > 4 ? secondWorker : 
firstWorker;
+      final String expectedWorker = i > 2 ? secondWorker : firstWorker;
 
-      Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker, 
task.getId()));
+      Assert.assertTrue(
+          StringUtils.format("Task[%s] assigned to worker[%s]", i, 
expectedWorker),
+          rtrTestUtils.taskAssigned(expectedWorker, task.getId())
+      );
       rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
       rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
 
       Assert.assertTrue(taskFuture.get().isFailure());
-      Assert.assertEquals(i > 2 ? 1 : 0, 
remoteTaskRunner.getBlackListedWorkers().size());
       Assert.assertEquals(
-          i > 4 ? i - 2 : ((i + 1) / 2),
-          
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
+          StringUtils.format("Blacklisted workers after task[%s]", i),
+          i >= 2 ? 1 : 0,
+          remoteTaskRunner.getBlackListedWorkers().size()
+      );
+      Assert.assertEquals(
+          StringUtils.format("Continuously failed tasks after task[%s]", i),
+          i,
+          
remoteTaskRunner.findWorkerId("worker").getContinuouslyFailedTasksCount()
+          + 
remoteTaskRunner.findWorkerId("worker2").getContinuouslyFailedTasksCount()
       );
     }
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
index 6711844140..bdf886aa41 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
@@ -219,7 +219,7 @@ public class RemoteTaskRunnerTestUtils
     return pathExists(JOINER.join(STATUS_PATH, workerId, taskId));
   }
 
-  boolean taskAnnounced(final String workerId, final String taskId)
+  boolean taskAssigned(final String workerId, final String taskId)
   {
     return pathExists(JOINER.join(TASKS_PATH, workerId, taskId));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to