This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new cff5d1e3695 Add method Supervisor.computeLagForAutoScaler (#16314)
cff5d1e3695 is described below
commit cff5d1e3695fc26ec88d596142a4c90134eb8a4f
Author: Adithya Chakilam <[email protected]>
AuthorDate: Fri Apr 19 21:27:50 2024 -0500
Add method Supervisor.computeLagForAutoScaler (#16314)
Tries to address the comments made on #16284 after merged.
Changes:
- Remove method `Supervisor.getLagMetric()`
- Add method `Supervisor.computeLagForAutoScaler()`
- Remove classes `LagMetric` and `LagMetricTest`
---
.../kinesis/supervisor/KinesisSupervisor.java | 6 ++---
.../supervisor/autoscaler/LagBasedAutoScaler.java | 10 ++------
.../indexing/overlord/supervisor/Supervisor.java | 8 +++----
.../overlord/supervisor/autoscaler/LagMetric.java | 27 ----------------------
.../overlord/supervisor/autoscaler/LagStats.java | 13 -----------
.../{LagStatsTest.java => SupervisorTest.java} | 20 ++++++++--------
6 files changed, 18 insertions(+), 66 deletions(-)
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index e1a7656f23c..365a9135e3c 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -39,7 +39,6 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
-import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@@ -429,9 +428,10 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
}
@Override
- public LagMetric getLagMetricForAutoScaler()
+ public long computeLagForAutoScaler()
{
- return LagMetric.MAX;
+ LagStats lagStats = computeLagStats();
+ return lagStats == null ? 0L : lagStats.getMaxLag();
}
private SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetadataWithClosedOrExpiredPartitions(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index 7813725733b..f8618b06f74 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -21,7 +21,6 @@ package
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
-import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.StringUtils;
@@ -155,13 +154,8 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
LOCK.lock();
try {
if (!spec.isSuspended()) {
- LagStats lagStats = supervisor.computeLagStats();
- if (lagStats == null) {
- lagMetricsQueue.offer(0L);
- } else {
- long lag = lagStats.get(supervisor.getLagMetricForAutoScaler());
- lagMetricsQueue.offer(lag > 0 ? lag : 0L);
- }
+ long lag = supervisor.computeLagForAutoScaler();
+ lagMetricsQueue.offer(lag > 0 ? lag : 0L);
log.debug("Current lags for dataSource[%s] are [%s].", dataSource,
lagMetricsQueue);
} else {
log.warn("[%s] supervisor is suspended, skipping lag collection",
dataSource);
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 8befa2adae3..9b9511cbf3d 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
-import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.segment.incremental.ParseExceptionReport;
@@ -95,11 +94,12 @@ public interface Supervisor
LagStats computeLagStats();
/**
- * Used by AutoScaler to either scale by max/total/avg.
+ * Used by AutoScaler to make scaling decisions.
*/
- default LagMetric getLagMetricForAutoScaler()
+ default long computeLagForAutoScaler()
{
- return LagMetric.TOTAL;
+ LagStats lagStats = computeLagStats();
+ return lagStats == null ? 0L : lagStats.getTotalLag();
}
int getActiveTaskGroupsCount();
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
deleted file mode 100644
index d3f00b5c2c8..00000000000
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.overlord.supervisor.autoscaler;
-
-public enum LagMetric
-{
- TOTAL,
- MAX,
- AVERAGE;
-}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
index c7a6dfc6132..7b6e5fd0bab 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
@@ -46,17 +46,4 @@ public class LagStats
{
return avgLag;
}
-
- public long get(LagMetric metric)
- {
- switch (metric) {
- case AVERAGE:
- return avgLag;
- case TOTAL:
- return totalLag;
- case MAX:
- return maxLag;
- }
- throw new IllegalStateException("Unknown Metric");
- }
}
diff --git
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
similarity index 69%
rename from
server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
rename to
server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
index b51882538b9..79811079d34 100644
---
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
+++
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
@@ -19,24 +19,22 @@
package org.apache.druid.indexing.overlord.supervisor;
-import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
-public class LagStatsTest
+public class SupervisorTest
{
-
@Test
- public void lagStatsByMetric()
+ public void testAutoScalerLagComputation()
{
- int max = 1;
- int avg = 2;
- int total = 3;
- LagStats lag = new LagStats(max, total, avg);
+ Supervisor supervisor = Mockito.spy(Supervisor.class);
+
+ Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2,
3));
+ Assert.assertEquals(2, supervisor.computeLagForAutoScaler());
- Assert.assertEquals(max, lag.get(LagMetric.MAX));
- Assert.assertEquals(total, lag.get(LagMetric.TOTAL));
- Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE));
+ Mockito.when(supervisor.computeLagStats()).thenReturn(null);
+ Assert.assertEquals(0, supervisor.computeLagForAutoScaler());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]