zhangyue19921010 commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r531667591



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()
+    {
+      lock.lock();
+      try {
+        long nowTime = System.currentTimeMillis();
+        long minTriggerDynamicFrequency = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
 1200000)));
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.info("PendingCompletionTaskGroups is : " + 
pendingCompletionTaskGroups);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[" + pendingCompletionTaskGroups + "]");
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+          log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime - 
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" + 
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+          return;
+        }
+        if (!queue.isAtFullCapacity()) {
+          log.info("Metrics collection is not at full capacity, skip to check 
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+          return;
+        }
+        List<Long> lags = collectTotalLags();
+        boolean allocationSuccess = dynamicAllocate(lags);
+        if (allocationSuccess) {
+          dynamicTriggerLastRunTime = nowTime;
+          queue.clear();
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+      }
+      finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private boolean dynamicAllocate(List<Long> lags) throws 
InterruptedException, ExecutionException, TimeoutException

Review comment:
       All done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to