This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 021a01df45 RTR, HRTR: Fix incorrect maxLazyWorkers check in 
markLazyWorkers. (#14545)
021a01df45 is described below

commit 021a01df4575519dd73aa734804f3a3ad4c1e225
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Jul 7 10:08:12 2023 -0700

    RTR, HRTR: Fix incorrect maxLazyWorkers check in markLazyWorkers. (#14545)
    
    Recently #14532 fixed a problem when maxLazyWorkers == 0 and lazyWorkers
    starts out empty. Unfortunately, even after that patch, there remained
    a more general version of this problem when maxLazyWorkers == 
lazyWorkers.size().
    This patch fixes it.
    
    I'm not sure if this would actually happen in production, because the
    provisioning strategies do try to avoid calling markWorkersLazy until
    previously-initiated terminations have finished. Nevertheless, it still
    seems like a good thing to fix.
---
 .../druid/indexing/overlord/RemoteTaskRunner.java      | 18 ++++++++++--------
 .../druid/indexing/overlord/WorkerTaskRunner.java      | 11 +++++++++--
 .../indexing/overlord/hrtr/HttpRemoteTaskRunner.java   | 17 +++++++++--------
 3 files changed, 28 insertions(+), 18 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 10e6639599..0ea3039019 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -1401,29 +1401,31 @@ public class RemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
   {
     // skip the lock and bail early if we should not mark any workers lazy 
(e.g. number
     // of current workers is at or below the minNumWorkers of autoscaler 
config)
-    if (maxLazyWorkers < 1) {
-      return Collections.emptyList();
+    if (lazyWorkers.size() >= maxLazyWorkers) {
+      return getLazyWorkers();
     }
-    // status lock is used to prevent any tasks being assigned to the worker 
while we mark it lazy
+
+    // Search for new workers to mark lazy.
+    // Status lock is used to prevent any tasks being assigned to workers 
while we mark them lazy
     synchronized (statusLock) {
       for (Map.Entry<String, ZkWorker> worker : zkWorkers.entrySet()) {
+        if (lazyWorkers.size() >= maxLazyWorkers) {
+          break;
+        }
         final ZkWorker zkWorker = worker.getValue();
         try {
           if (getAssignedTasks(zkWorker.getWorker()).isEmpty() && 
isLazyWorker.apply(zkWorker.toImmutable())) {
             log.info("Adding Worker[%s] to lazySet!", 
zkWorker.getWorker().getHost());
             lazyWorkers.put(worker.getKey(), zkWorker);
-            if (lazyWorkers.size() == maxLazyWorkers) {
-              // only mark excess workers as lazy and allow their cleanup
-              break;
-            }
           }
         }
         catch (Exception e) {
           throw new RuntimeException(e);
         }
       }
-      return getWorkerFromZK(lazyWorkers.values());
     }
+
+    return getLazyWorkers();
   }
 
   protected List<String> getAssignedTasks(Worker worker) throws Exception
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java
index 9ef9065dc8..04188f01c9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/WorkerTaskRunner.java
@@ -26,6 +26,7 @@ import 
org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
 import org.apache.druid.indexing.worker.Worker;
 
 import java.util.Collection;
+import java.util.List;
 
 @PublicApi
 public interface WorkerTaskRunner extends TaskRunner
@@ -47,10 +48,16 @@ public interface WorkerTaskRunner extends TaskRunner
   Collection<Worker> getLazyWorkers();
 
   /**
-   * Check which workers can be marked as lazy
+   * Mark workers matching a predicate as lazy, up to a maximum. If the number 
of workers previously marked lazy is
+   * equal to or higher than the provided maximum, this method will return 
those previously marked workers and will
+   * not mark any additional workers. Workers are never un-marked lazy once 
they are marked lazy.
+   *
+   * This method is called by {@link 
org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy}
+   * implementations. It is expected that the lazy workers returned by this 
method will be terminated using
+   * {@link 
org.apache.druid.indexing.overlord.autoscaling.AutoScaler#terminate(List)}.
    *
    * @param isLazyWorker   predicate that checks if a worker is lazy
-   * @param maxLazyWorkers maximum number of lazy workers to return
+   * @param maxLazyWorkers desired maximum number of lazy workers (actual 
number may be higher)
    */
   Collection<Worker> markWorkersLazy(Predicate<ImmutableWorkerInfo> 
isLazyWorker, int maxLazyWorkers);
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 935f2bc61c..57d610ab73 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -96,7 +96,6 @@ import java.io.InputStream;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -927,29 +926,31 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
   {
     // skip the lock and bail early if we should not mark any workers lazy 
(e.g. number
     // of current workers is at or below the minNumWorkers of autoscaler 
config)
-    if (maxLazyWorkers < 1) {
-      return Collections.emptyList();
+    if (lazyWorkers.size() >= maxLazyWorkers) {
+      return getLazyWorkers();
     }
 
+    // Search for new workers to mark lazy.
+    // Status lock is used to prevent any tasks being assigned to workers 
while we mark them lazy
     synchronized (statusLock) {
       for (Map.Entry<String, WorkerHolder> worker : workers.entrySet()) {
+        if (lazyWorkers.size() >= maxLazyWorkers) {
+          break;
+        }
         final WorkerHolder workerHolder = worker.getValue();
         try {
           if (isWorkerOkForMarkingLazy(workerHolder.getWorker()) && 
isLazyWorker.apply(workerHolder.toImmutable())) {
             log.info("Adding Worker[%s] to lazySet!", 
workerHolder.getWorker().getHost());
             lazyWorkers.put(worker.getKey(), workerHolder);
-            if (lazyWorkers.size() == maxLazyWorkers) {
-              // only mark excess workers as lazy and allow their cleanup
-              break;
-            }
           }
         }
         catch (Exception e) {
           throw new RuntimeException(e);
         }
       }
-      return getLazyWorkers();
     }
+
+    return getLazyWorkers();
   }
 
   private boolean isWorkerOkForMarkingLazy(Worker worker)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to