This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new aec5c4a4b65 Fix duplicate actions in auto-scaler history  (#18715)
aec5c4a4b65 is described below

commit aec5c4a4b658a010d4fc9b3c50046ebc9750dea6
Author: Andreas Maechler <[email protected]>
AuthorDate: Fri Nov 7 22:30:40 2025 -0700

    Fix duplicate actions in auto-scaler history  (#18715)
    
    Changes:
    - Do not clear `pendingCompletionTaskGroups` in `clearAllocationInfo`
    - Add unit test
---
 .../kafka/supervisor/KafkaSupervisorTest.java      | 216 +++++++++++++++++++++
 .../supervisor/SeekableStreamSupervisor.java       |  10 +-
 2 files changed, 224 insertions(+), 2 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index f3f01cc5071..f6a026e6aa7 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -71,6 +71,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
@@ -122,6 +123,7 @@ import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
@@ -134,6 +136,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
@@ -5047,6 +5050,201 @@ public class KafkaSupervisorTest extends EasyMockSupport
     EasyMock.replay(differentTaskType);
   }
 
+  @Test
+  public void 
test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublishing() 
throws Exception
+  {
+    final TaskLocation location1 = TaskLocation.create("testHost1", 1234, -1);
+    final TaskLocation location2 = TaskLocation.create("testHost2", 1235, -1);
+    final TaskLocation location3 = TaskLocation.create("testHost3", 1236, -1);
+    final DateTime startTime = DateTimes.nowUtc();
+
+    // Create supervisor with 3 task groups
+    supervisor = getTestableSupervisor(1, 3, true, "PT1H", null, null);
+    final KafkaSupervisorTuningConfig tuningConfig = 
supervisor.getTuningConfig();
+    addSomeEvents(100);
+
+    // Manually create 3 tasks (one for each partition/task group)
+    Task task1 = createKafkaIndexTask(
+        "task1",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 0, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 0, Long.MAX_VALUE)
+        ),
+        null,
+        null,
+        tuningConfig
+    );
+
+    Task task2 = createKafkaIndexTask(
+        "task2",
+        DATASOURCE,
+        1,
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 1, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 1, Long.MAX_VALUE)
+        ),
+        null,
+        null,
+        tuningConfig
+    );
+
+    Task task3 = createKafkaIndexTask(
+        "task3",
+        DATASOURCE,
+        2,
+        new SeekableStreamStartSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 2, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            topic,
+            singlePartitionMap(topic, 2, Long.MAX_VALUE)
+        ),
+        null,
+        null,
+        tuningConfig
+    );
+
+    Collection workItems = new ArrayList<>();
+    workItems.add(new TestTaskRunnerWorkItem(task1, null, location1));
+    workItems.add(new TestTaskRunnerWorkItem(task2, null, location2));
+    workItems.add(new TestTaskRunnerWorkItem(task3, null, location3));
+
+    // Setup mocks for initial discovery of tasks
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes();
+    
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(toMap(task1,
 task2, task3)).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus(task1.getId())).andReturn(Optional.of(TaskStatus.running(task1.getId()))).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus(task2.getId())).andReturn(Optional.of(TaskStatus.running(task2.getId()))).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus(task3.getId())).andReturn(Optional.of(TaskStatus.running(task3.getId()))).anyTimes();
+    
EasyMock.expect(taskStorage.getTask(task1.getId())).andReturn(Optional.of(task1)).anyTimes();
+    
EasyMock.expect(taskStorage.getTask(task2.getId())).andReturn(Optional.of(task2)).anyTimes();
+    
EasyMock.expect(taskStorage.getTask(task3.getId())).andReturn(Optional.of(task3)).anyTimes();
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new
 KafkaDataSourceMetadata(null)).anyTimes();
+    
EasyMock.expect(taskClient.getStatusAsync(task1.getId())).andReturn(Futures.immediateFuture(Status.READING));
+    
EasyMock.expect(taskClient.getStatusAsync(task2.getId())).andReturn(Futures.immediateFuture(Status.READING));
+    
EasyMock.expect(taskClient.getStatusAsync(task3.getId())).andReturn(Futures.immediateFuture(Status.READING));
+    
EasyMock.expect(taskClient.getStartTimeAsync(task1.getId())).andReturn(Futures.immediateFuture(startTime));
+    
EasyMock.expect(taskClient.getStartTimeAsync(task2.getId())).andReturn(Futures.immediateFuture(startTime));
+    
EasyMock.expect(taskClient.getStartTimeAsync(task3.getId())).andReturn(Futures.immediateFuture(startTime));
+
+    TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints1 = new 
TreeMap<>();
+    checkpoints1.put(0, singlePartitionMap(topic, 0, 0L));
+    TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints2 = new 
TreeMap<>();
+    checkpoints2.put(0, singlePartitionMap(topic, 1, 0L));
+    TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints3 = new 
TreeMap<>();
+    checkpoints3.put(0, singlePartitionMap(topic, 2, 0L));
+
+    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task1.getId()), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints1))
+            .times(1);
+    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task2.getId()), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints2))
+            .times(1);
+    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task3.getId()), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints3))
+            .times(1);
+
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
+    replayAll();
+
+    // Start supervisor and discover the 3 existing tasks
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+
+    // Verify we have 3 actively reading task groups after discovery
+    Assert.assertEquals(
+        "Should have 3 actively reading task groups after discovery",
+        3,
+        supervisor.getActivelyReadingTaskGroupsCount()
+    );
+
+    // Reset and setup mocks for gracefulShutdownInternal
+    EasyMock.reset(taskRunner, taskClient, taskQueue);
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes();
+    
EasyMock.expect(taskClient.pauseAsync(task1.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic,
 0, 100L)));
+    
EasyMock.expect(taskClient.pauseAsync(task2.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic,
 1, 100L)));
+    
EasyMock.expect(taskClient.pauseAsync(task3.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic,
 2, 100L)));
+    EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task1.getId()), 
EasyMock.anyObject(), 
EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
+    EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task2.getId()), 
EasyMock.anyObject(), 
EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
+    EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task3.getId()), 
EasyMock.anyObject(), 
EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
+
+    EasyMock.replay(taskRunner, taskClient, taskQueue);
+
+    // Simulate autoscaler scale-down by calling gracefulShutdownInternal()
+    // This should move tasks from activelyReadingTaskGroups to 
pendingCompletionTaskGroups
+    supervisor.gracefulShutdownInternal();
+
+    verifyAll();
+
+    // After gracefulShutdownInternal, tasks should be moved to 
pendingCompletionTaskGroups
+    Assert.assertEquals(
+        "activelyReadingTaskGroups should be empty after 
gracefulShutdownInternal",
+        0,
+        supervisor.getActivelyReadingTaskGroupsCount()
+    );
+
+    // Verify pendingCompletionTaskGroups is NOT empty (tasks were moved there)
+    boolean hasPendingTasks = false;
+    for (int groupId = 0; groupId < 3; groupId++) {
+      if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
+        hasPendingTasks = true;
+        break;
+      }
+    }
+    Assert.assertTrue(
+        "pendingCompletionTaskGroups should contain task groups after 
gracefulShutdownInternal",
+        hasPendingTasks
+    );
+
+    // Now call clearAllocationInfo() - this is where the bug was
+    // The bug was that this method cleared pendingCompletionTaskGroups
+    supervisor.testClearAllocationInfo();
+
+    // THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT 
empty after clearAllocationInfo
+    // This is the fix - clearAllocationInfo should preserve 
pendingCompletionTaskGroups
+    boolean stillHasPendingTasks = false;
+    for (int groupId = 0; groupId < 3; groupId++) {
+      if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
+        stillHasPendingTasks = true;
+        break;
+      }
+    }
+    Assert.assertTrue(
+        "pendingCompletionTaskGroups should be preserved after 
clearAllocationInfo() " +
+        "to prevent autoscaler from creating duplicate history entries",
+        stillHasPendingTasks
+    );
+
+    // Verify activelyReadingTaskGroups is still empty
+    Assert.assertEquals(
+        "activelyReadingTaskGroups should remain empty after 
clearAllocationInfo",
+        0,
+        supervisor.getActivelyReadingTaskGroupsCount()
+    );
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     // create topic manually
@@ -5753,6 +5951,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
     {
       return stateManager;
     }
+
+    public int getActivelyReadingTaskGroupsCount()
+    {
+      return getActiveTaskGroupsCount();
+    }
+
+    public int getPendingCompletionTaskGroupsCount(int groupId)
+    {
+      CopyOnWriteArrayList<?> groups = getPendingCompletionTaskGroups(groupId);
+      return groups != null ? groups.size() : 0;
+    }
+
+    public void testClearAllocationInfo() throws Exception
+    {
+      Method method = 
SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo");
+      method.setAccessible(true);
+      method.invoke(this);
+    }
   }
 
   private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends 
TestableKafkaSupervisor
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 0faa0b84134..ef65d6d22ca 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -608,13 +608,19 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  /**
+   * Clears allocation information including active task groups, partition 
groups, partition offsets, and partition IDs.
+   * <p>
+   * Note: Does not clear {@link #pendingCompletionTaskGroups} so that the 
supervisor remembers that these
+   * tasks are publishing and auto-scaler does not repeatedly attempt a scale 
down until these tasks
+   * complete. If this is cleared, the next {@link #discoverTasks()} might add 
these tasks to
+   * {@link #activelyReadingTaskGroups}.
+   */
   private void clearAllocationInfo()
   {
     activelyReadingTaskGroups.clear();
     partitionGroups.clear();
     partitionOffsets.clear();
-
-    pendingCompletionTaskGroups.clear();
     partitionIds.clear();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to