This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 34237bc1127 Consider max lag for kinesis while autoscaling (#16284)
34237bc1127 is described below

commit 34237bc11272513a969c74ece39ce4bb0d28635c
Author: Adithya Chakilam <[email protected]>
AuthorDate: Wed Apr 17 04:35:05 2024 -0500

    Consider max lag for kinesis while autoscaling (#16284)
    
    * Consider max lag for kinesis while autoscaling
    
    * add test for coverage
    
    * test folder
---
 .../kinesis/supervisor/KinesisSupervisor.java      |  7 ++++
 .../supervisor/autoscaler/LagBasedAutoScaler.java  |  4 +--
 .../indexing/overlord/supervisor/Supervisor.java   |  9 +++++
 .../autoscaler/{LagStats.java => LagMetric.java}   | 30 +++--------------
 .../overlord/supervisor/autoscaler/LagStats.java   | 13 ++++++++
 .../overlord/supervisor/LagStatsTest.java}         | 39 +++++++++-------------
 6 files changed, 51 insertions(+), 51 deletions(-)

diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index a142f414762..e1a7656f23c 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@@ -427,6 +428,12 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String,
     );
   }
 
+  @Override
+  public LagMetric getLagMetricForAutoScaler()
+  {
+    return LagMetric.MAX;
+  }
+
   private SeekableStreamDataSourceMetadata<String, String> 
createDataSourceMetadataWithClosedOrExpiredPartitions(
       SeekableStreamDataSourceMetadata<String, String> currentMetadata,
       Set<String> terminatedPartitionIds,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index 8235f53e33f..7813725733b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -159,8 +159,8 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
           if (lagStats == null) {
             lagMetricsQueue.offer(0L);
           } else {
-            long totalLags = lagStats.getTotalLag();
-            lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
+            long lag = lagStats.get(supervisor.getLagMetricForAutoScaler());
+            lagMetricsQueue.offer(lag > 0 ? lag : 0L);
           }
           log.debug("Current lags for dataSource[%s] are [%s].", dataSource, 
lagMetricsQueue);
         } else {
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 9d940bc55b6..8befa2adae3 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.segment.incremental.ParseExceptionReport;
 
@@ -93,6 +94,14 @@ public interface Supervisor
    */
   LagStats computeLagStats();
 
+  /**
+   * Used by AutoScaler to either scale by max/total/avg.
+   */
+  default LagMetric getLagMetricForAutoScaler()
+  {
+    return LagMetric.TOTAL;
+  }
+
   int getActiveTaskGroupsCount();
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
similarity index 67%
copy from 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
copy to 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
index 7b6e5fd0bab..d3f00b5c2c8 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
@@ -19,31 +19,9 @@
 
 package org.apache.druid.indexing.overlord.supervisor.autoscaler;
 
-public class LagStats
+public enum LagMetric
 {
-  private final long maxLag;
-  private final long totalLag;
-  private final long avgLag;
-
-  public LagStats(long maxLag, long totalLag, long avgLag)
-  {
-    this.maxLag = maxLag;
-    this.totalLag = totalLag;
-    this.avgLag = avgLag;
-  }
-
-  public long getMaxLag()
-  {
-    return maxLag;
-  }
-
-  public long getTotalLag()
-  {
-    return totalLag;
-  }
-
-  public long getAvgLag()
-  {
-    return avgLag;
-  }
+  TOTAL,
+  MAX,
+  AVERAGE;
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
index 7b6e5fd0bab..c7a6dfc6132 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
@@ -46,4 +46,17 @@ public class LagStats
   {
     return avgLag;
   }
+
+  public long get(LagMetric metric)
+  {
+    switch (metric) {
+      case AVERAGE:
+        return avgLag;
+      case TOTAL:
+        return totalLag;
+      case MAX:
+        return maxLag;
+    }
+    throw new IllegalStateException("Unknown Metric");
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
similarity index 58%
copy from 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
copy to 
server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
index 7b6e5fd0bab..b51882538b9 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++ 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
@@ -17,33 +17,26 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.overlord.supervisor.autoscaler;
+package org.apache.druid.indexing.overlord.supervisor;
 
-public class LagStats
-{
-  private final long maxLag;
-  private final long totalLag;
-  private final long avgLag;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.junit.Assert;
+import org.junit.Test;
 
-  public LagStats(long maxLag, long totalLag, long avgLag)
-  {
-    this.maxLag = maxLag;
-    this.totalLag = totalLag;
-    this.avgLag = avgLag;
-  }
-
-  public long getMaxLag()
-  {
-    return maxLag;
-  }
+public class LagStatsTest
+{
 
-  public long getTotalLag()
+  @Test
+  public void lagStatsByMetric()
   {
-    return totalLag;
-  }
+    int max = 1;
+    int avg = 2;
+    int total = 3;
+    LagStats lag = new LagStats(max, total, avg);
 
-  public long getAvgLag()
-  {
-    return avgLag;
+    Assert.assertEquals(max, lag.get(LagMetric.MAX));
+    Assert.assertEquals(total, lag.get(LagMetric.TOTAL));
+    Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to