This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 34237bc1127 Consider max lag for kinesis while autoscaling (#16284)
34237bc1127 is described below
commit 34237bc11272513a969c74ece39ce4bb0d28635c
Author: Adithya Chakilam <[email protected]>
AuthorDate: Wed Apr 17 04:35:05 2024 -0500
Consider max lag for kinesis while autoscaling (#16284)
* Consider max lag for kinesis while autoscaling
* add test for coverage
* test folder
---
.../kinesis/supervisor/KinesisSupervisor.java | 7 ++++
.../supervisor/autoscaler/LagBasedAutoScaler.java | 4 +--
.../indexing/overlord/supervisor/Supervisor.java | 9 +++++
.../autoscaler/{LagStats.java => LagMetric.java} | 30 +++--------------
.../overlord/supervisor/autoscaler/LagStats.java | 13 ++++++++
.../overlord/supervisor/LagStatsTest.java} | 39 +++++++++-------------
6 files changed, 51 insertions(+), 51 deletions(-)
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index a142f414762..e1a7656f23c 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@@ -427,6 +428,12 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
);
}
+ @Override
+ public LagMetric getLagMetricForAutoScaler()
+ {
+ return LagMetric.MAX;
+ }
+
private SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetadataWithClosedOrExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata,
Set<String> terminatedPartitionIds,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index 8235f53e33f..7813725733b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -159,8 +159,8 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
if (lagStats == null) {
lagMetricsQueue.offer(0L);
} else {
- long totalLags = lagStats.getTotalLag();
- lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
+ long lag = lagStats.get(supervisor.getLagMetricForAutoScaler());
+ lagMetricsQueue.offer(lag > 0 ? lag : 0L);
}
log.debug("Current lags for dataSource[%s] are [%s].", dataSource,
lagMetricsQueue);
} else {
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 9d940bc55b6..8befa2adae3 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.segment.incremental.ParseExceptionReport;
@@ -93,6 +94,14 @@ public interface Supervisor
*/
LagStats computeLagStats();
+ /**
+ * Used by AutoScaler to either scale by max/total/avg.
+ */
+ default LagMetric getLagMetricForAutoScaler()
+ {
+ return LagMetric.TOTAL;
+ }
+
int getActiveTaskGroupsCount();
/**
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
similarity index 67%
copy from
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
copy to
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
index 7b6e5fd0bab..d3f00b5c2c8 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
@@ -19,31 +19,9 @@
package org.apache.druid.indexing.overlord.supervisor.autoscaler;
-public class LagStats
+public enum LagMetric
{
- private final long maxLag;
- private final long totalLag;
- private final long avgLag;
-
- public LagStats(long maxLag, long totalLag, long avgLag)
- {
- this.maxLag = maxLag;
- this.totalLag = totalLag;
- this.avgLag = avgLag;
- }
-
- public long getMaxLag()
- {
- return maxLag;
- }
-
- public long getTotalLag()
- {
- return totalLag;
- }
-
- public long getAvgLag()
- {
- return avgLag;
- }
+ TOTAL,
+ MAX,
+ AVERAGE;
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
index 7b6e5fd0bab..c7a6dfc6132 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
@@ -46,4 +46,17 @@ public class LagStats
{
return avgLag;
}
+
+ public long get(LagMetric metric)
+ {
+ switch (metric) {
+ case AVERAGE:
+ return avgLag;
+ case TOTAL:
+ return totalLag;
+ case MAX:
+ return maxLag;
+ }
+ throw new IllegalStateException("Unknown Metric");
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
similarity index 58%
copy from
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
copy to
server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
index 7b6e5fd0bab..b51882538b9 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
@@ -17,33 +17,26 @@
* under the License.
*/
-package org.apache.druid.indexing.overlord.supervisor.autoscaler;
+package org.apache.druid.indexing.overlord.supervisor;
-public class LagStats
-{
- private final long maxLag;
- private final long totalLag;
- private final long avgLag;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.junit.Assert;
+import org.junit.Test;
- public LagStats(long maxLag, long totalLag, long avgLag)
- {
- this.maxLag = maxLag;
- this.totalLag = totalLag;
- this.avgLag = avgLag;
- }
-
- public long getMaxLag()
- {
- return maxLag;
- }
+public class LagStatsTest
+{
- public long getTotalLag()
+ @Test
+ public void lagStatsByMetric()
{
- return totalLag;
- }
+ int max = 1;
+ int avg = 2;
+ int total = 3;
+ LagStats lag = new LagStats(max, total, avg);
- public long getAvgLag()
- {
- return avgLag;
+ Assert.assertEquals(max, lag.get(LagMetric.MAX));
+ Assert.assertEquals(total, lag.get(LagMetric.TOTAL));
+ Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]