This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f8015eb02a9 Add config lagAggregate to LagBasedAutoScalerConfig
(#16334)
f8015eb02a9 is described below
commit f8015eb02a95925036b4ce5ad0c1384ff634b30a
Author: Adithya Chakilam <[email protected]>
AuthorDate: Mon Apr 29 11:50:41 2024 -0500
Add config lagAggregate to LagBasedAutoScalerConfig (#16334)
Changes:
- Add new config `lagAggregate` to `LagBasedAutoScalerConfig`
- Add field `aggregateForScaling` to `LagStats`
- Use the new field/config to determine which aggregate to use to compute
lag
- Remove method `Supervisor.computeLagForAutoScaler()`
---
docs/ingestion/supervisor.md | 1 +
.../kinesis/supervisor/KinesisSupervisor.java | 7 -----
.../supervisor/autoscaler/LagBasedAutoScaler.java | 15 +++++++++--
.../autoscaler/LagBasedAutoScalerConfig.java | 14 +++++++++-
.../indexing/overlord/supervisor/Supervisor.java | 9 -------
.../{LagStats.java => AggregateFunction.java} | 30 +++-------------------
.../overlord/supervisor/autoscaler/LagStats.java | 29 +++++++++++++++++++++
.../{SupervisorTest.java => LagStatsTest.java} | 15 +++++------
8 files changed, 67 insertions(+), 53 deletions(-)
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 76dd1cc4a7c..9320c39a02a 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -96,6 +96,7 @@ The following table outlines the configuration properties
related to the `lagBas
|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale
action is triggered.|No|60000|
|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1|
|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2|
+|`lagAggregate`|The aggregate function used to compute the lag metric for
scaling decisions. Possible values are `MAX`, `SUM` and `AVERAGE`. |No|`SUM`|
The following example shows a supervisor spec with `lagBased` autoscaler:
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 365a9135e3c..a142f414762 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -427,13 +427,6 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
);
}
- @Override
- public long computeLagForAutoScaler()
- {
- LagStats lagStats = computeLagStats();
- return lagStats == null ? 0L : lagStats.getMaxLag();
- }
-
private SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetadataWithClosedOrExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata,
Set<String> terminatedPartitionIds,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index f8618b06f74..ec81c5f9f99 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -21,6 +21,8 @@ package
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.StringUtils;
@@ -154,8 +156,17 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
LOCK.lock();
try {
if (!spec.isSuspended()) {
- long lag = supervisor.computeLagForAutoScaler();
- lagMetricsQueue.offer(lag > 0 ? lag : 0L);
+ LagStats lagStats = supervisor.computeLagStats();
+
+ if (lagStats != null) {
+ AggregateFunction aggregate =
lagBasedAutoScalerConfig.getLagAggregate() == null ?
+ lagStats.getAggregateForScaling() :
+
lagBasedAutoScalerConfig.getLagAggregate();
+ long lag = lagStats.getMetric(aggregate);
+ lagMetricsQueue.offer(lag > 0 ? lag : 0L);
+ } else {
+ lagMetricsQueue.offer(0L);
+ }
log.debug("Current lags for dataSource[%s] are [%s].", dataSource,
lagMetricsQueue);
} else {
log.warn("[%s] supervisor is suspended, skipping lag collection",
dataSource);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index e03242de279..068e7cc4f87 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -45,6 +46,7 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
private final int scaleOutStep;
private final boolean enableTaskAutoScaler;
private final long minTriggerScaleActionFrequencyMillis;
+ private final AggregateFunction lagAggregate;
@JsonCreator
public LagBasedAutoScalerConfig(
@@ -61,7 +63,8 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
@Nullable @JsonProperty("scaleInStep") Integer scaleInStep,
@Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
@Nullable @JsonProperty("enableTaskAutoScaler") Boolean
enableTaskAutoScaler,
- @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long
minTriggerScaleActionFrequencyMillis
+ @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long
minTriggerScaleActionFrequencyMillis,
+ @Nullable @JsonProperty("lagAggregate") AggregateFunction
lagAggregate
)
{
this.enableTaskAutoScaler = enableTaskAutoScaler != null ?
enableTaskAutoScaler : false;
@@ -73,6 +76,7 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
this.scaleInThreshold = scaleInThreshold != null ? scaleInThreshold :
1000000;
this.triggerScaleOutFractionThreshold = triggerScaleOutFractionThreshold
!= null ? triggerScaleOutFractionThreshold : 0.3;
this.triggerScaleInFractionThreshold = triggerScaleInFractionThreshold !=
null ? triggerScaleInFractionThreshold : 0.9;
+ this.lagAggregate = lagAggregate;
// Only do taskCountMax and taskCountMin check when autoscaler is enabled.
So that users left autoConfig empty{} will not throw any exception and
autoscaler is disabled.
// If autoscaler is disabled, no matter what configs are set, they are not
used.
@@ -186,6 +190,13 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
return minTriggerScaleActionFrequencyMillis;
}
+ @JsonProperty
+ @Nullable
+ public AggregateFunction getLagAggregate()
+ {
+ return lagAggregate;
+ }
+
@Override
public String toString()
{
@@ -204,6 +215,7 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
", scaleInStep=" + scaleInStep +
", scaleOutStep=" + scaleOutStep +
+ ", lagAggregate=" + lagAggregate +
'}';
}
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index b1fb439184d..bcfc5ebe819 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -92,14 +92,5 @@ public interface Supervisor
*/
LagStats computeLagStats();
- /**
- * Used by AutoScaler to make scaling decisions.
- */
- default long computeLagForAutoScaler()
- {
- LagStats lagStats = computeLagStats();
- return lagStats == null ? 0L : lagStats.getTotalLag();
- }
-
int getActiveTaskGroupsCount();
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/AggregateFunction.java
similarity index 67%
copy from
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
copy to
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/AggregateFunction.java
index 7b6e5fd0bab..247c05180be 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/AggregateFunction.java
@@ -19,31 +19,9 @@
package org.apache.druid.indexing.overlord.supervisor.autoscaler;
-public class LagStats
+public enum AggregateFunction
{
- private final long maxLag;
- private final long totalLag;
- private final long avgLag;
-
- public LagStats(long maxLag, long totalLag, long avgLag)
- {
- this.maxLag = maxLag;
- this.totalLag = totalLag;
- this.avgLag = avgLag;
- }
-
- public long getMaxLag()
- {
- return maxLag;
- }
-
- public long getTotalLag()
- {
- return totalLag;
- }
-
- public long getAvgLag()
- {
- return avgLag;
- }
+ MAX,
+ SUM,
+ AVERAGE
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
index 7b6e5fd0bab..2240585680a 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
@@ -24,12 +24,19 @@ public class LagStats
private final long maxLag;
private final long totalLag;
private final long avgLag;
+ private final AggregateFunction aggregateForScaling;
public LagStats(long maxLag, long totalLag, long avgLag)
+ {
+ this(maxLag, totalLag, avgLag, AggregateFunction.SUM);
+ }
+
+ public LagStats(long maxLag, long totalLag, long avgLag, AggregateFunction
aggregateForScaling)
{
this.maxLag = maxLag;
this.totalLag = totalLag;
this.avgLag = avgLag;
+ this.aggregateForScaling = aggregateForScaling == null ?
AggregateFunction.SUM : aggregateForScaling;
}
public long getMaxLag()
@@ -46,4 +53,26 @@ public class LagStats
{
return avgLag;
}
+
+ /**
+ * The preferred scaling metric that supervisor may specify to be used.
+ * This could be overrided by the autscaler.
+ */
+ public AggregateFunction getAggregateForScaling()
+ {
+ return aggregateForScaling;
+ }
+
+ public long getMetric(AggregateFunction metric)
+ {
+ switch (metric) {
+ case MAX:
+ return getMaxLag();
+ case SUM:
+ return getTotalLag();
+ case AVERAGE:
+ return getAvgLag();
+ }
+ throw new IllegalStateException("Unknown scale metric");
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
similarity index 69%
rename from
server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
rename to
server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
index 79811079d34..5799d8c5e8e 100644
---
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
+++
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
@@ -19,22 +19,21 @@
package org.apache.druid.indexing.overlord.supervisor;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.junit.Assert;
import org.junit.Test;
-import org.mockito.Mockito;
-public class SupervisorTest
+public class LagStatsTest
{
@Test
public void testAutoScalerLagComputation()
{
- Supervisor supervisor = Mockito.spy(Supervisor.class);
+ LagStats lagStats = new LagStats(1, 2, 3);
- Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2,
3));
- Assert.assertEquals(2, supervisor.computeLagForAutoScaler());
-
- Mockito.when(supervisor.computeLagStats()).thenReturn(null);
- Assert.assertEquals(0, supervisor.computeLagForAutoScaler());
+ Assert.assertEquals(1, lagStats.getMetric(AggregateFunction.MAX));
+ Assert.assertEquals(2, lagStats.getMetric(AggregateFunction.SUM));
+ Assert.assertEquals(3, lagStats.getMetric(AggregateFunction.AVERAGE));
+ Assert.assertEquals(AggregateFunction.SUM,
lagStats.getAggregateForScaling());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]