This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 450ecd6370 More efficient generation of ImmutableWorkerHolder from
WorkerHolder. (#14546)
450ecd6370 is described below
commit 450ecd6370d47aff58b4160efd95a82396897d5f
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jul 13 07:57:16 2023 -0700
More efficient generation of ImmutableWorkerHolder from WorkerHolder.
(#14546)
* More efficient generation of ImmutableWorkerHolder from WorkerHolder.
Taking the work done in #12096 a little further:
1) Applying a similar optimization to WorkerHolder (HttpRemoteTaskRunner).
The original patch only helped with the ZkWorker (RemoteTaskRunner).
2) Improve the ZkWorker version somewhat by avoiding multiple iterations
through the task announcements map.
* Pick better names and use better logic.
* Only runnable tasks.
* Fix test.
* Fix testBlacklistZKWorkers50Percent.
---
.../indexing/overlord/ImmutableWorkerInfo.java | 55 +++++++++++++++++++++-
.../druid/indexing/overlord/RemoteTaskRunner.java | 10 ++++
.../apache/druid/indexing/overlord/ZkWorker.java | 9 +---
...PendingTaskBasedWorkerProvisioningStrategy.java | 5 +-
.../druid/indexing/overlord/hrtr/WorkerHolder.java | 47 +-----------------
...teTaskRunnerRunPendingTasksConcurrencyTest.java | 4 +-
.../indexing/overlord/RemoteTaskRunnerTest.java | 54 +++++++++++----------
.../overlord/RemoteTaskRunnerTestUtils.java | 2 +-
8 files changed, 103 insertions(+), 83 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
index aaea3f453d..b1eafe0dc5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java
@@ -20,16 +20,20 @@
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
/**
@@ -44,6 +48,8 @@ public class ImmutableWorkerInfo
private final ImmutableSet<String> availabilityGroups;
private final ImmutableSet<String> runningTasks;
private final DateTime lastCompletedTaskTime;
+
+ @Nullable
private final DateTime blacklistedUntil;
@JsonCreator
@@ -76,7 +82,8 @@ public class ImmutableWorkerInfo
)
{
this(worker, currCapacityUsed, currParallelIndexCapacityUsed,
availabilityGroups,
- runningTasks, lastCompletedTaskTime, null);
+ runningTasks, lastCompletedTaskTime, null
+ );
}
public ImmutableWorkerInfo(
@@ -90,6 +97,51 @@ public class ImmutableWorkerInfo
this(worker, currCapacityUsed, 0, availabilityGroups, runningTasks,
lastCompletedTaskTime, null);
}
+ /**
+ * Helper used by {@link ZkWorker} and {@link
org.apache.druid.indexing.overlord.hrtr.WorkerHolder}.
+ */
+ public static ImmutableWorkerInfo fromWorkerAnnouncements(
+ final Worker worker,
+ final Map<String, TaskAnnouncement> announcements,
+ final DateTime lastCompletedTaskTime,
+ @Nullable final DateTime blacklistedUntil
+ )
+ {
+ int currCapacityUsed = 0;
+ int currParallelIndexCapacityUsed = 0;
+ ImmutableSet.Builder<String> taskIds = ImmutableSet.builder();
+ ImmutableSet.Builder<String> availabilityGroups = ImmutableSet.builder();
+
+ for (final Map.Entry<String, TaskAnnouncement> entry :
announcements.entrySet()) {
+ final TaskAnnouncement announcement = entry.getValue();
+
+ if (announcement.getStatus().isRunnable()) {
+ final String taskId = entry.getKey();
+ final TaskResource taskResource = announcement.getTaskResource();
+ final int requiredCapacity = taskResource.getRequiredCapacity();
+
+ currCapacityUsed += requiredCapacity;
+
+ if
(ParallelIndexSupervisorTask.TYPE.equals(announcement.getTaskType())) {
+ currParallelIndexCapacityUsed += requiredCapacity;
+ }
+
+ taskIds.add(taskId);
+ availabilityGroups.add(taskResource.getAvailabilityGroup());
+ }
+ }
+
+ return new ImmutableWorkerInfo(
+ worker,
+ currCapacityUsed,
+ currParallelIndexCapacityUsed,
+ availabilityGroups.build(),
+ taskIds.build(),
+ lastCompletedTaskTime,
+ blacklistedUntil
+ );
+ }
+
@JsonProperty("worker")
public Worker getWorker()
{
@@ -132,6 +184,7 @@ public class ImmutableWorkerInfo
}
@JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
public DateTime getBlacklistedUntil()
{
return blacklistedUntil;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 0ea3039019..4bae1d2cb2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -523,6 +523,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer
return Optional.fromNullable(provisioningService.getStats());
}
+ @Nullable
public ZkWorker findWorkerRunningTask(String taskId)
{
for (ZkWorker zkWorker : zkWorkers.values()) {
@@ -533,6 +534,15 @@ public class RemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer
return null;
}
+ /**
+ * Retrieve {@link ZkWorker} based on an ID (host), or null if the ID
doesn't exist.
+ */
+ @Nullable
+ ZkWorker findWorkerId(String workerId)
+ {
+ return zkWorkers.get(workerId);
+ }
+
public boolean isWorkerRunningTask(ZkWorker worker, String taskId)
{
return Preconditions.checkNotNull(worker, "worker").isRunningTask(taskId);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
index 67d30ebc8d..dadb557e84 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java
@@ -229,14 +229,9 @@ public class ZkWorker implements Closeable
public ImmutableWorkerInfo toImmutable()
{
- Map<String, TaskAnnouncement> tasks = getRunningTasks();
-
- return new ImmutableWorkerInfo(
+ return ImmutableWorkerInfo.fromWorkerAnnouncements(
worker.get(),
- getCurrCapacityUsed(tasks),
- getCurrParallelIndexCapacityUsed(tasks),
- getAvailabilityGroups(tasks),
- tasks.keySet(),
+ getRunningTasks(),
lastCompletedTaskTime.get(),
blacklistedUntil.get()
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
index 1c9ba5f486..6990973e56 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
@@ -49,6 +49,7 @@ import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -494,9 +495,7 @@ public class PendingTaskBasedWorkerProvisioningStrategy
extends AbstractWorkerPr
),
Sets.union(
immutableWorker.getRunningTasks(),
- Sets.newHashSet(
- task.getId()
- )
+ Collections.singleton(task.getId())
),
DateTimes.nowUtc()
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index 5e581d815c..6294318ec5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
-import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
@@ -47,10 +46,8 @@ import org.joda.time.DateTime;
import java.net.URL;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -58,7 +55,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
/**
*/
@@ -133,42 +129,6 @@ public class WorkerHolder
return worker;
}
- private Map<String, TaskAnnouncement> getRunningTasks()
- {
- return tasksSnapshotRef.get().entrySet().stream().filter(
- e -> e.getValue().getTaskStatus().isRunnable()
- ).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
- }
-
- private int getCurrCapacityUsed()
- {
- int currCapacity = 0;
- for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
- currCapacity += taskAnnouncement.getTaskResource().getRequiredCapacity();
- }
- return currCapacity;
- }
-
- private int getCurrParallelIndexCapcityUsed()
- {
- int currParallelIndexCapacityUsed = 0;
- for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
- if
(taskAnnouncement.getTaskType().equals(ParallelIndexSupervisorTask.TYPE)) {
- currParallelIndexCapacityUsed +=
taskAnnouncement.getTaskResource().getRequiredCapacity();
- }
- }
- return currParallelIndexCapacityUsed;
- }
-
- private Set<String> getAvailabilityGroups()
- {
- Set<String> retVal = new HashSet<>();
- for (TaskAnnouncement taskAnnouncement : getRunningTasks().values()) {
- retVal.add(taskAnnouncement.getTaskResource().getAvailabilityGroup());
- }
- return retVal;
- }
-
public DateTime getBlacklistedUntil()
{
return blacklistedUntil.get();
@@ -201,12 +161,9 @@ public class WorkerHolder
w = disabledWorker;
}
- return new ImmutableWorkerInfo(
+ return ImmutableWorkerInfo.fromWorkerAnnouncements(
w,
- getCurrCapacityUsed(),
- getCurrParallelIndexCapcityUsed(),
- getAvailabilityGroups(),
- getRunningTasks().keySet(),
+ tasksSnapshotRef.get(),
lastCompletedTaskTime.get(),
blacklistedUntil.get()
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
index 0cb978304a..62c18d7bd2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerRunPendingTasksConcurrencyTest.java
@@ -126,7 +126,7 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
tasks[5] = TestTasks.unending("task5");
results[5] = remoteTaskRunner.run(tasks[5]);
waitForOneWorkerToHaveUnackedTasks();
- if (rtrTestUtils.taskAnnounced("worker0", tasks[5].getId())) {
+ if (rtrTestUtils.taskAssigned("worker0", tasks[5].getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", tasks[5]);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", tasks[5]);
} else {
@@ -138,7 +138,7 @@ public class RemoteTaskRunnerRunPendingTasksConcurrencyTest
private void mockWorkerRunningAndCompletionSuccessfulTasks(Task t1, Task t2)
throws Exception
{
- if (rtrTestUtils.taskAnnounced("worker0", t1.getId())) {
+ if (rtrTestUtils.taskAssigned("worker0", t1.getId())) {
rtrTestUtils.mockWorkerRunningTask("worker0", t1);
rtrTestUtils.mockWorkerCompleteSuccessfulTask("worker0", t1);
rtrTestUtils.mockWorkerRunningTask("worker1", t2);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 99b6e39717..a76766d226 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -101,7 +101,8 @@ public class RemoteTaskRunnerTest
private Worker worker;
@Rule
- public TestRule watcher = new TestWatcher() {
+ public TestRule watcher = new TestWatcher()
+ {
@Override
protected void starting(Description description)
{
@@ -621,7 +622,7 @@ public class RemoteTaskRunnerTest
private boolean taskAnnounced(final String taskId)
{
- return rtrTestUtils.taskAnnounced(WORKER_HOST, taskId);
+ return rtrTestUtils.taskAssigned(WORKER_HOST, taskId);
}
private boolean workerRunningTask(final String taskId)
@@ -890,8 +891,8 @@ public class RemoteTaskRunnerTest
}
/**
- * With 2 workers and maxPercentageBlacklistWorkers(25), neither worker
should ever be blacklisted even after
- * exceeding maxRetriesBeforeBlacklist.
+ * With 2 workers and maxPercentageBlacklistWorkers(25), no worker should be
blacklisted even after exceeding
+ * maxRetriesBeforeBlacklist.
*/
@Test
public void testBlacklistZKWorkers25Percent() throws Exception
@@ -904,8 +905,7 @@ public class RemoteTaskRunnerTest
makeRemoteTaskRunner(rtrConfig);
- String firstWorker = null;
- String secondWorker = null;
+ String assignedWorker = null;
for (int i = 1; i < 13; i++) {
String taskId = StringUtils.format("rt-%d", i);
@@ -920,26 +920,23 @@ public class RemoteTaskRunnerTest
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
if (i == 1) {
- if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
- firstWorker = "worker2";
- secondWorker = "worker";
+ if (rtrTestUtils.taskAssigned("worker2", task.getId())) {
+ assignedWorker = "worker2";
} else {
- firstWorker = "worker";
- secondWorker = "worker2";
+ assignedWorker = "worker";
}
}
- final String expectedWorker = i % 2 == 0 ? secondWorker : firstWorker;
-
- Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker,
task.getId()));
- rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
- rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
+ Assert.assertTrue(rtrTestUtils.taskAssigned(assignedWorker,
task.getId()));
+ rtrTestUtils.mockWorkerRunningTask(assignedWorker, task);
+ rtrTestUtils.mockWorkerCompleteFailedTask(assignedWorker, task);
Assert.assertTrue(taskFuture.get().isFailure());
Assert.assertEquals(0, remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
- ((i + 1) / 2),
-
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
+ i,
+
remoteTaskRunner.findWorkerId("worker").getContinuouslyFailedTasksCount()
+ +
remoteTaskRunner.findWorkerId("worker2").getContinuouslyFailedTasksCount()
);
}
}
@@ -975,7 +972,7 @@ public class RemoteTaskRunnerTest
Future<TaskStatus> taskFuture = remoteTaskRunner.run(task);
if (i == 1) {
- if (rtrTestUtils.taskAnnounced("worker2", task.getId())) {
+ if (rtrTestUtils.taskAssigned("worker2", task.getId())) {
firstWorker = "worker2";
secondWorker = "worker";
} else {
@@ -984,17 +981,26 @@ public class RemoteTaskRunnerTest
}
}
- final String expectedWorker = i % 2 == 0 || i > 4 ? secondWorker :
firstWorker;
+ final String expectedWorker = i > 2 ? secondWorker : firstWorker;
- Assert.assertTrue(rtrTestUtils.taskAnnounced(expectedWorker,
task.getId()));
+ Assert.assertTrue(
+ StringUtils.format("Task[%s] assigned to worker[%s]", i,
expectedWorker),
+ rtrTestUtils.taskAssigned(expectedWorker, task.getId())
+ );
rtrTestUtils.mockWorkerRunningTask(expectedWorker, task);
rtrTestUtils.mockWorkerCompleteFailedTask(expectedWorker, task);
Assert.assertTrue(taskFuture.get().isFailure());
- Assert.assertEquals(i > 2 ? 1 : 0,
remoteTaskRunner.getBlackListedWorkers().size());
Assert.assertEquals(
- i > 4 ? i - 2 : ((i + 1) / 2),
-
remoteTaskRunner.findWorkerRunningTask(task.getId()).getContinuouslyFailedTasksCount()
+ StringUtils.format("Blacklisted workers after task[%s]", i),
+ i >= 2 ? 1 : 0,
+ remoteTaskRunner.getBlackListedWorkers().size()
+ );
+ Assert.assertEquals(
+ StringUtils.format("Continuously failed tasks after task[%s]", i),
+ i,
+
remoteTaskRunner.findWorkerId("worker").getContinuouslyFailedTasksCount()
+ +
remoteTaskRunner.findWorkerId("worker2").getContinuouslyFailedTasksCount()
);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
index 6711844140..bdf886aa41 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
@@ -219,7 +219,7 @@ public class RemoteTaskRunnerTestUtils
return pathExists(JOINER.join(STATUS_PATH, workerId, taskId));
}
- boolean taskAnnounced(final String workerId, final String taskId)
+ boolean taskAssigned(final String workerId, final String taskId)
{
return pathExists(JOINER.join(TASKS_PATH, workerId, taskId));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]