This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e834e492908 supervisor/autoscaler: Fix clearing of collected lags on
skipped scale actions (#17356)
e834e492908 is described below
commit e834e4929088dafe63e6b35b04dd3a842f9fa51a
Author: Adithya Chakilam <[email protected]>
AuthorDate: Thu Oct 17 13:05:16 2024 -0500
supervisor/autoscaler: Fix clearing of collected lags on skipped scale
actions (#17356)
* superviosr/autoscaler: Fix clearing of collected lags on skipped scale
actions
* comments
* supervisor/autoscaler: Skip scaling when partitions are less than
minTaskCount (#17335)
* Fix pip installation after ubuntu upgrade (#17358)
* fix tests
---------
Co-authored-by: Pranav <[email protected]>
---
.../kinesis/supervisor/KinesisSupervisorTest.java | 2 ++
.../supervisor/SeekableStreamSupervisor.java | 23 ++++++++++++++++------
.../supervisor/autoscaler/LagBasedAutoScaler.java | 19 +++++++++++++-----
3 files changed, 33 insertions(+), 11 deletions(-)
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 24d919918f4..fe851a183a2 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -358,6 +358,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
Thread.sleep(1 * 1000);
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScale);
+ autoscaler.stop();
}
@Test
@@ -435,6 +436,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
Thread.sleep(1 * 1000);
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountAfterScale);
+ autoscaler.stop();
}
@Test
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 1da5b4fbb9c..86c4ba385bd 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -422,13 +422,19 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// change taskCount without resubmitting.
private class DynamicAllocationTasksNotice implements Notice
{
- Callable<Integer> scaleAction;
+ Callable<Integer> computeDesiredTaskCount;
ServiceEmitter emitter;
+ Runnable onSuccessfulScale;
private static final String TYPE = "dynamic_allocation_tasks_notice";
- DynamicAllocationTasksNotice(Callable<Integer> scaleAction, ServiceEmitter
emitter)
+ DynamicAllocationTasksNotice(
+ Callable<Integer> computeDesiredTaskCount,
+ Runnable onSuccessfulScale,
+ ServiceEmitter emitter
+ )
{
- this.scaleAction = scaleAction;
+ this.computeDesiredTaskCount = computeDesiredTaskCount;
+ this.onSuccessfulScale = onSuccessfulScale;
this.emitter = emitter;
}
@@ -470,7 +476,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return;
}
}
- final Integer desiredTaskCount = scaleAction.call();
+ final Integer desiredTaskCount = computeDesiredTaskCount.call();
ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
@@ -500,6 +506,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
boolean allocationSuccess = changeTaskCount(desiredTaskCount);
if (allocationSuccess) {
+ onSuccessfulScale.run();
dynamicTriggerLastRunTime = nowTime;
}
}
@@ -1260,9 +1267,13 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
- public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction,
ServiceEmitter emitter)
+ public Runnable buildDynamicAllocationTask(
+ Callable<Integer> scaleAction,
+ Runnable onSuccessfulScale,
+ ServiceEmitter emitter
+ )
{
- return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction,
emitter));
+ return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction,
onSuccessfulScale, emitter));
}
private Runnable buildRunTask()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index 22e36841199..648d8a655e9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -86,10 +86,6 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
int desiredTaskCount = -1;
try {
desiredTaskCount = computeDesiredTaskCount(new
ArrayList<>(lagMetricsQueue));
-
- if (desiredTaskCount != -1) {
- lagMetricsQueue.clear();
- }
}
catch (Exception ex) {
log.warn(ex, "Exception while computing desired task count for [%s]",
dataSource);
@@ -100,6 +96,19 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
return desiredTaskCount;
};
+ Runnable onSuccessfulScale = () -> {
+ LOCK.lock();
+ try {
+ lagMetricsQueue.clear();
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Exception while clearing lags for [%s]", dataSource);
+ }
+ finally {
+ LOCK.unlock();
+ }
+ };
+
lagComputationExec.scheduleAtFixedRate(
computeAndCollectLag(),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for
tasks to start up
@@ -107,7 +116,7 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
TimeUnit.MILLISECONDS
);
allocationExec.scheduleAtFixedRate(
- supervisor.buildDynamicAllocationTask(scaleAction, emitter),
+ supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale,
emitter),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() +
lagBasedAutoScalerConfig
.getLagCollectionRangeMillis(),
lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]