This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e834e492908 supervisor/autoscaler: Fix clearing of collected lags on 
skipped scale actions (#17356)
e834e492908 is described below

commit e834e4929088dafe63e6b35b04dd3a842f9fa51a
Author: Adithya Chakilam <[email protected]>
AuthorDate: Thu Oct 17 13:05:16 2024 -0500

    supervisor/autoscaler: Fix clearing of collected lags on skipped scale 
actions (#17356)
    
    * superviosr/autoscaler: Fix clearing of collected lags on skipped scale 
actions
    
    * comments
    
    * supervisor/autoscaler: Skip scaling when partitions are less than 
minTaskCount (#17335)
    
    * Fix pip installation after ubuntu upgrade (#17358)
    
    * fix tests
    
    ---------
    
    Co-authored-by: Pranav <[email protected]>
---
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  2 ++
 .../supervisor/SeekableStreamSupervisor.java       | 23 ++++++++++++++++------
 .../supervisor/autoscaler/LagBasedAutoScaler.java  | 19 +++++++++++++-----
 3 files changed, 33 insertions(+), 11 deletions(-)

diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 24d919918f4..fe851a183a2 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -358,6 +358,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     Thread.sleep(1 * 1000);
     int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(2, taskCountAfterScale);
+    autoscaler.stop();
   }
 
   @Test
@@ -435,6 +436,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     Thread.sleep(1 * 1000);
     int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
     Assert.assertEquals(1, taskCountAfterScale);
+    autoscaler.stop();
   }
 
   @Test
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 1da5b4fbb9c..86c4ba385bd 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -422,13 +422,19 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // change taskCount without resubmitting.
   private class DynamicAllocationTasksNotice implements Notice
   {
-    Callable<Integer> scaleAction;
+    Callable<Integer> computeDesiredTaskCount;
     ServiceEmitter emitter;
+    Runnable onSuccessfulScale;
     private static final String TYPE = "dynamic_allocation_tasks_notice";
 
-    DynamicAllocationTasksNotice(Callable<Integer> scaleAction, ServiceEmitter 
emitter)
+    DynamicAllocationTasksNotice(
+        Callable<Integer> computeDesiredTaskCount,
+        Runnable onSuccessfulScale,
+        ServiceEmitter emitter
+    )
     {
-      this.scaleAction = scaleAction;
+      this.computeDesiredTaskCount = computeDesiredTaskCount;
+      this.onSuccessfulScale = onSuccessfulScale;
       this.emitter = emitter;
     }
 
@@ -470,7 +476,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
               return;
             }
           }
-          final Integer desiredTaskCount = scaleAction.call();
+          final Integer desiredTaskCount = computeDesiredTaskCount.call();
           ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
               .setDimension(DruidMetrics.DATASOURCE, dataSource)
               .setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
@@ -500,6 +506,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
           boolean allocationSuccess = changeTaskCount(desiredTaskCount);
           if (allocationSuccess) {
+            onSuccessfulScale.run();
             dynamicTriggerLastRunTime = nowTime;
           }
         }
@@ -1260,9 +1267,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
-  public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction, 
ServiceEmitter emitter)
+  public Runnable buildDynamicAllocationTask(
+      Callable<Integer> scaleAction,
+      Runnable onSuccessfulScale,
+      ServiceEmitter emitter
+  )
   {
-    return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, 
emitter));
+    return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, 
onSuccessfulScale, emitter));
   }
 
   private Runnable buildRunTask()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index 22e36841199..648d8a655e9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -86,10 +86,6 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
       int desiredTaskCount = -1;
       try {
         desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
-
-        if (desiredTaskCount != -1) {
-          lagMetricsQueue.clear();
-        }
       }
       catch (Exception ex) {
         log.warn(ex, "Exception while computing desired task count for [%s]", 
dataSource);
@@ -100,6 +96,19 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
       return desiredTaskCount;
     };
 
+    Runnable onSuccessfulScale = () -> {
+      LOCK.lock();
+      try {
+        lagMetricsQueue.clear();
+      }
+      catch (Exception ex) {
+        log.warn(ex, "Exception while clearing lags for [%s]", dataSource);
+      }
+      finally {
+        LOCK.unlock();
+      }
+    };
+
     lagComputationExec.scheduleAtFixedRate(
         computeAndCollectLag(),
         lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for 
tasks to start up
@@ -107,7 +116,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
         TimeUnit.MILLISECONDS
     );
     allocationExec.scheduleAtFixedRate(
-        supervisor.buildDynamicAllocationTask(scaleAction, emitter),
+        supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, 
emitter),
         lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + 
lagBasedAutoScalerConfig
             .getLagCollectionRangeMillis(),
         lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to