This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a8eaa1e4ed Skip streaming auto-scaling action if supervisor is idle
(#14773)
a8eaa1e4ed is described below
commit a8eaa1e4ed81f94fe53ae14bbb078678e35de105
Author: Jonathan Wei <[email protected]>
AuthorDate: Thu Aug 17 19:43:25 2023 -0500
Skip streaming auto-scaling action if supervisor is idle (#14773)
* Skip streaming auto-scaling action if supervisor is idle
* Update
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Co-authored-by: Abhishek Radhakrishnan <[email protected]>
---------
Co-authored-by: Abhishek Radhakrishnan <[email protected]>
---
.../supervisor/SeekableStreamSupervisor.java | 7 +++
.../SeekableStreamSupervisorSpecTest.java | 65 ++++++++++++++++++++++
2 files changed, 72 insertions(+)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 29fd16d1a4..0d1e32c49b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -443,6 +443,13 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
return;
}
+ if (SupervisorStateManager.BasicState.IDLE == getState()) {
+ log.info(
+ "Skipping DynamicAllocationTasksNotice execution because [%s]
supervisor is idle",
+ dataSource
+ );
+ return;
+ }
log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]",
pendingCompletionTaskGroups,
dataSource
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index f0b17657d8..9215f24a38 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -35,6 +35,7 @@ import
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
@@ -358,6 +359,22 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
}
}
+ private class StateOverrideTestSeekableStreamSupervisor extends
TestSeekableStreamSupervisor
+ {
+ private SupervisorStateManager.State state;
+
+ public
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.State state,
int partitionNumbers)
+ {
+ super(partitionNumbers);
+ this.state = state;
+ }
+
+ @Override
+ public SupervisorStateManager.State getState()
+ {
+ return state;
+ }
+ }
private static class TestSeekableStreamSupervisorSpec extends
SeekableStreamSupervisorSpec
{
@@ -756,6 +773,54 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
autoScaler.stop();
}
+ @Test
+ public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor()
throws InterruptedException
+ {
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1,
true)).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ TestSeekableStreamSupervisor supervisor = new
StateOverrideTestSeekableStreamSupervisor(
+ SupervisorStateManager.BasicState.IDLE,
+ 3
+ );
+
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ getScaleOutProperties(2),
+ LagBasedAutoScalerConfig.class
+ ),
+ spec
+ );
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+ int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, taskCountBeforeScaleOut);
+ Thread.sleep(1000);
+ int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, taskCountAfterScaleOut);
+
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
@Test
public void
testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws
InterruptedException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]