This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a8eaa1e4ed Skip streaming auto-scaling action if supervisor is idle 
(#14773)
a8eaa1e4ed is described below

commit a8eaa1e4ed81f94fe53ae14bbb078678e35de105
Author: Jonathan Wei <[email protected]>
AuthorDate: Thu Aug 17 19:43:25 2023 -0500

    Skip streaming auto-scaling action if supervisor is idle (#14773)
    
    * Skip streaming auto-scaling action if supervisor is idle
    
    * Update 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
    
    Co-authored-by: Abhishek Radhakrishnan <[email protected]>
    
    ---------
    
    Co-authored-by: Abhishek Radhakrishnan <[email protected]>
---
 .../supervisor/SeekableStreamSupervisor.java       |  7 +++
 .../SeekableStreamSupervisorSpecTest.java          | 65 ++++++++++++++++++++++
 2 files changed, 72 insertions(+)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 29fd16d1a4..0d1e32c49b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -443,6 +443,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
             );
             return;
           }
+          if (SupervisorStateManager.BasicState.IDLE == getState()) {
+            log.info(
+                "Skipping DynamicAllocationTasksNotice execution because [%s] 
supervisor is idle",
+                dataSource
+            );
+            return;
+          }
           log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", 
pendingCompletionTaskGroups,
                     dataSource
           );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index f0b17657d8..9215f24a38 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -35,6 +35,7 @@ import 
org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
@@ -358,6 +359,22 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     }
   }
 
+  private class StateOverrideTestSeekableStreamSupervisor extends 
TestSeekableStreamSupervisor
+  {
+    private SupervisorStateManager.State state;
+
+    public 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.State state, 
int partitionNumbers)
+    {
+      super(partitionNumbers);
+      this.state = state;
+    }
+
+    @Override
+    public SupervisorStateManager.State getState()
+    {
+      return state;
+    }
+  }
 
   private static class TestSeekableStreamSupervisorSpec extends 
SeekableStreamSupervisorSpec
   {
@@ -756,6 +773,54 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
     autoScaler.stop();
   }
 
+  @Test
+  public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor() 
throws InterruptedException
+  {
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, 
true)).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    TestSeekableStreamSupervisor supervisor = new 
StateOverrideTestSeekableStreamSupervisor(
+        SupervisorStateManager.BasicState.IDLE,
+        3
+    );
+
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+        supervisor,
+        DATASOURCE,
+        mapper.convertValue(
+            getScaleOutProperties(2),
+            LagBasedAutoScalerConfig.class
+        ),
+        spec
+    );
+    supervisor.start();
+    autoScaler.start();
+    supervisor.runInternal();
+    int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, taskCountBeforeScaleOut);
+    Thread.sleep(1000);
+    int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, taskCountAfterScaleOut);
+
+    autoScaler.reset();
+    autoScaler.stop();
+  }
+
   @Test
   public void 
testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws 
InterruptedException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to