This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 021a01df45 RTR, HRTR: Fix incorrect maxLazyWorkers check in
markLazyWorkers. (#14545)
021a01df45 is described below
commit 021a01df4575519dd73aa734804f3a3ad4c1e225
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Jul 7 10:08:12 2023 -0700
RTR, HRTR: Fix incorrect maxLazyWorkers check in markLazyWorkers. (#14545)
Recently #14532 fixed a problem when maxLazyWorkers == 0 and lazyWorkers
starts out empty. Unfortunately, even after that patch, there remained
a more general version of this problem when maxLazyWorkers ==
lazyWorkers.size().
This patch fixes it.
I'm not sure if this would actually happen in production, because the
provisioning strategies do try to avoid calling markWorkersLazy until
previously-initiated terminations have finished. Nevertheless, it still
seems like a good thing to fix.
---
.../druid/indexing/overlord/RemoteTaskRunner.java | 18 ++++++++++--------
.../druid/indexing/overlord/WorkerTaskRunner.java | 11 +++++++++--
.../indexing/overlord/hrtr/HttpRemoteTaskRunner.java | 17 +++++++++--------
3 files changed, 28 insertions(+), 18 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 10e6639599..0ea3039019 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -1401,29 +1401,31 @@ public class RemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
{
// skip the lock and bail early if we should not mark any workers lazy
(e.g. number
// of current workers is at or below the minNumWorkers of autoscaler
config)
- if (maxLazyWorkers < 1) {
- return Collections.emptyList();
+ if (lazyWorkers.size() >= maxLazyWorkers) {
+ return getLazyWorkers();
}
- // status lock is used to prevent any tasks being assigned to the worker
while we mark it lazy
+
+ // Search for new workers to mark lazy.
+ // Status lock is used to prevent any tasks being assigned to workers
while we mark them lazy
synchronized (statusLock) {
for (Map.Entry<String, ZkWorker> worker : zkWorkers.entrySet()) {
+ if (lazyWorkers.size() >= maxLazyWorkers) {
+ break;
+ }
final ZkWorker zkWorker = worker.getValue();
try {
if (getAssignedTasks(zkWorker.getWorker()).isEmpty() &&
isLazyWorker.apply(zkWorker.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!",
zkWorker.getWorker().getHost());
lazyWorkers.put(worker.getKey(), zkWorker);
- if (lazyWorkers.size() == maxLazyWorkers) {
- // only mark excess workers as lazy and allow their cleanup
- break;
- }
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
- return getWorkerFromZK(lazyWorkers.values());
}
+
+ return getLazyWorkers();
}
protected List<String> getAssignedTasks(Worker worker) throws Exception
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java
index 9ef9065dc8..04188f01c9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java
@@ -26,6 +26,7 @@ import
org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import org.apache.druid.indexing.worker.Worker;
import java.util.Collection;
+import java.util.List;
@PublicApi
public interface WorkerTaskRunner extends TaskRunner
@@ -47,10 +48,16 @@ public interface WorkerTaskRunner extends TaskRunner
Collection<Worker> getLazyWorkers();
/**
- * Check which workers can be marked as lazy
+ * Mark workers matching a predicate as lazy, up to a maximum. If the number
of workers previously marked lazy is
+ * equal to or higher than the provided maximum, this method will return
those previously marked workers and will
+ * not mark any additional workers. Workers are never un-marked lazy once
they are marked lazy.
+ *
+ * This method is called by {@link
org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy}
+ * implementations. It is expected that the lazy workers returned by this
method will be terminated using
+ * {@link
org.apache.druid.indexing.overlord.autoscaling.AutoScaler#terminate(List)}.
*
* @param isLazyWorker predicate that checks if a worker is lazy
- * @param maxLazyWorkers maximum number of lazy workers to return
+ * @param maxLazyWorkers desired maximum number of lazy workers (actual
number may be higher)
*/
Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo>
isLazyWorker, int maxLazyWorkers);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 935f2bc61c..57d610ab73 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -96,7 +96,6 @@ import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -927,29 +926,31 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
{
// skip the lock and bail early if we should not mark any workers lazy
(e.g. number
// of current workers is at or below the minNumWorkers of autoscaler
config)
- if (maxLazyWorkers < 1) {
- return Collections.emptyList();
+ if (lazyWorkers.size() >= maxLazyWorkers) {
+ return getLazyWorkers();
}
+ // Search for new workers to mark lazy.
+ // Status lock is used to prevent any tasks being assigned to workers
while we mark them lazy
synchronized (statusLock) {
for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) {
+ if (lazyWorkers.size() >= maxLazyWorkers) {
+ break;
+ }
final WorkerHolder workerHolder = worker.getValue();
try {
if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) &&
isLazyWorker.apply(workerHolder.toImmutable())) {
log.info("Adding Worker[%s] to lazySet!",
workerHolder.getWorker().getHost());
lazyWorkers.put(worker.getKey(), workerHolder);
- if (lazyWorkers.size() == maxLazyWorkers) {
- // only mark excess workers as lazy and allow their cleanup
- break;
- }
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
- return getLazyWorkers();
}
+
+ return getLazyWorkers();
}
private boolean isWorkerOkForMarkingLazy(Worker worker)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]