This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c9d524e898 Add new task distribution strategy that assigns tasks to 
workers based on supervisor affinity (#18634)
5c9d524e898 is described below

commit 5c9d524e8980fcb7c169c549be31ef4fd84aceed
Author: PANKAJ KUMAR <[email protected]>
AuthorDate: Mon Oct 27 13:46:15 2025 +0530

    Add new task distribution strategy that assigns tasks to workers based on 
supervisor affinity (#18634)
    
    Changes:
    - Add field `WorkerCategorySpec.supervisorIdCategoryAffinity`
    - This is a map from supervisor ID to worker category name
    - While assigning workers in `WorkerSelectUtils`, check in this map, then 
the `categoryAffinity` datasource level map
---
 .../overlord/setup/WorkerCategorySpec.java         |  27 +++-
 .../indexing/overlord/setup/WorkerSelectUtils.java |  24 +++-
 ...onWithCategorySpecWorkerSelectStrategyTest.java | 137 +++++++++++++++++++-
 ...tyWithCategorySpecWorkerSelectStrategyTest.java | 142 ++++++++++++++++++++-
 .../overlord/setup/WorkerCategorySpecTest.java     |   2 +-
 .../TestSeekableStreamIndexTask.java               |  90 +++++++++++++
 .../SeekableStreamSupervisorStateTest.java         | 106 ++++-----------
 7 files changed, 429 insertions(+), 99 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java
index 9f58d62d05a..410aad78a91 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpec.java
@@ -21,7 +21,9 @@ package org.apache.druid.indexing.overlord.setup;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
 
+import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
@@ -86,17 +88,19 @@ public class WorkerCategorySpec
   public static class CategoryConfig
   {
     private final String defaultCategory;
-    // key: datasource, value: category
     private final Map<String, String> categoryAffinity;
+    private final Map<String, String> supervisorIdCategoryAffinity;
 
     @JsonCreator
     public CategoryConfig(
         @JsonProperty("defaultCategory") String defaultCategory,
-        @JsonProperty("categoryAffinity") Map<String, String> categoryAffinity
+        @JsonProperty("categoryAffinity") Map<String, String> categoryAffinity,
+        @JsonProperty("supervisorIdCategoryAffinity") @Nullable Map<String, 
String> supervisorIdCategoryAffinity
     )
     {
       this.defaultCategory = defaultCategory;
       this.categoryAffinity = categoryAffinity == null ? 
Collections.emptyMap() : categoryAffinity;
+      this.supervisorIdCategoryAffinity = 
Configs.valueOrDefault(supervisorIdCategoryAffinity, Map.of());
     }
 
     @JsonProperty
@@ -105,12 +109,25 @@ public class WorkerCategorySpec
       return defaultCategory;
     }
 
+    /**
+     * Returns a map from datasource name to the worker category name to be 
used for tasks of that datasource.
+     */
     @JsonProperty
     public Map<String, String> getCategoryAffinity()
     {
       return categoryAffinity;
     }
 
+    /**
+     * Returns a map from supervisor ID to worker category name to be used for 
tasks of that supervisor.
+     * This takes precedence over {@link #getCategoryAffinity()} when both are 
configured.
+     */
+    @JsonProperty
+    public Map<String, String> getSupervisorIdCategoryAffinity()
+    {
+      return supervisorIdCategoryAffinity;
+    }
+
     @Override
     public boolean equals(final Object o)
     {
@@ -122,13 +139,14 @@ public class WorkerCategorySpec
       }
       final CategoryConfig that = (CategoryConfig) o;
       return Objects.equals(defaultCategory, that.defaultCategory) &&
-             Objects.equals(categoryAffinity, that.categoryAffinity);
+             Objects.equals(categoryAffinity, that.categoryAffinity) &&
+             Objects.equals(supervisorIdCategoryAffinity, 
that.supervisorIdCategoryAffinity);
     }
 
     @Override
     public int hashCode()
     {
-      return Objects.hash(defaultCategory, categoryAffinity);
+      return Objects.hash(defaultCategory, categoryAffinity, 
supervisorIdCategoryAffinity);
     }
 
     @Override
@@ -137,6 +155,7 @@ public class WorkerCategorySpec
       return "CategoryConfig{" +
              "defaultCategory=" + defaultCategory +
              ", categoryAffinity=" + categoryAffinity +
+             ", supervisorIdCategoryAffinity=" + supervisorIdCategoryAffinity +
              '}';
     }
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
index be4e8426008..e8c75a814bd 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/setup/WorkerSelectUtils.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -120,10 +121,25 @@ public class WorkerSelectUtils
       if (categoryConfig != null) {
         final String defaultCategory = categoryConfig.getDefaultCategory();
         final Map<String, String> categoryAffinity = 
categoryConfig.getCategoryAffinity();
-
-        String preferredCategory = categoryAffinity.get(task.getDataSource());
-        // If there is no preferred category for the datasource, then using 
the defaultCategory. However, the defaultCategory
-        // may be null too, so we need to do one more null check (see below).
+        final Map<String, String> supervisorIdCategoryAffinity = 
categoryConfig.getSupervisorIdCategoryAffinity();
+
+        String preferredCategory = null;
+        
+        // First, check if this task has a supervisorId and if there's a 
category affinity for it
+        if (task instanceof SeekableStreamIndexTask) {
+          final String supervisorId = ((SeekableStreamIndexTask<?, ?, ?>) 
task).getSupervisorId();
+          if (supervisorId != null) {
+            preferredCategory = supervisorIdCategoryAffinity.get(supervisorId);
+          }
+        }
+        
+        // If no supervisor-based category is found, fall back to 
datasource-based category affinity
+        if (preferredCategory == null) {
+          preferredCategory = categoryAffinity.get(task.getDataSource());
+        }
+        
+        // If there is no preferred category for the supervisorId or 
datasource, then use the defaultCategory.
+        // However, the defaultCategory may be null too, so we need to do one 
more null check (see below).
         preferredCategory = preferredCategory == null ? defaultCategory : 
preferredCategory;
 
         if (preferredCategory != null) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
index 6ac04a8920f..7ac8b2b746a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWithCategorySpecWorkerSelectStrategyTest.java
@@ -20,14 +20,26 @@
 package org.apache.druid.indexing.overlord.setup;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask;
 import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.segment.indexing.DataSchema;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.HashSet;
 
 public class EqualDistributionWithCategorySpecWorkerSelectStrategyTest
@@ -80,7 +92,8 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
             "noop",
             new WorkerCategorySpec.CategoryConfig(
                 "c2",
-                ImmutableMap.of("ds1", "c2")
+                ImmutableMap.of("ds1", "c2"),
+                null
             )
         ),
         false
@@ -95,7 +108,8 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
             "noop",
             new WorkerCategorySpec.CategoryConfig(
                 null,
-                ImmutableMap.of("ds1", "c2")
+                ImmutableMap.of("ds1", "c2"),
+                null
             )
         ),
         false
@@ -110,6 +124,7 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
             "noop",
             new WorkerCategorySpec.CategoryConfig(
                 "c2",
+                null,
                 null
             )
         ),
@@ -127,6 +142,7 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
         ImmutableMap.of(
             "noop",
             new WorkerCategorySpec.CategoryConfig(
+                null,
                 null,
                 null
             )
@@ -146,7 +162,8 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
             "noop",
             new WorkerCategorySpec.CategoryConfig(
                 "c1",
-                ImmutableMap.of("ds1", "c3")
+                ImmutableMap.of("ds1", "c3"),
+                null
             )
         ),
         false
@@ -164,7 +181,8 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
             "noop",
             new WorkerCategorySpec.CategoryConfig(
                 "c1",
-                ImmutableMap.of("ds1", "c3")
+                ImmutableMap.of("ds1", "c3"),
+                null
             )
         ),
         true
@@ -174,6 +192,94 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
     Assert.assertNull(worker);
   }
 
+  @Test
+  public void testSupervisorIdCategoryAffinity()
+  {
+    final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+        ImmutableMap.of(
+            "test_seekable_stream",
+            new WorkerCategorySpec.CategoryConfig(
+                "c1",
+                ImmutableMap.of("ds1", "c1"),
+                ImmutableMap.of("supervisor1", "c2")
+            )
+        ),
+        false
+    );
+    final Task taskWithSupervisor = createTestTask("task1", "supervisor1", 
"ds1");
+    
+    final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
+        new 
EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+    ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        WORKERS_FOR_TIER_TESTS,
+        taskWithSupervisor
+    );
+    Assert.assertNotNull(worker);
+    Assert.assertEquals("c2", worker.getWorker().getCategory());
+    Assert.assertEquals("localhost3", worker.getWorker().getHost());
+  }
+
+  @Test
+  public void testSupervisorIdCategoryAffinityFallbackToDatasource()
+  {
+    final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+        ImmutableMap.of(
+            "test_seekable_stream",
+            new WorkerCategorySpec.CategoryConfig(
+                "c2",
+                ImmutableMap.of("ds1", "c1"),
+                ImmutableMap.of("supervisor2", "c2")
+            )
+        ),
+        false
+    );
+    final Task taskWithSupervisor = createTestTask("task1", "supervisor1", 
"ds1");
+    
+    final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
+        new 
EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+    ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        WORKERS_FOR_TIER_TESTS,
+        taskWithSupervisor
+    );
+    Assert.assertNotNull(worker);
+    Assert.assertEquals("c1", worker.getWorker().getCategory());
+    Assert.assertEquals("localhost1", worker.getWorker().getHost());
+  }
+
+  @Test
+  public void testSupervisorIdCategoryAffinityFallbackToDefault()
+  {
+    final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+        ImmutableMap.of(
+            "test_seekable_stream",
+            new WorkerCategorySpec.CategoryConfig(
+                "c2",
+                ImmutableMap.of("ds2", "c1"),
+                ImmutableMap.of("supervisor2", "c1")
+            )
+        ),
+        false
+    );
+
+    final Task taskWithSupervisor = createTestTask("task1", "supervisor1", 
"ds1");
+    
+    final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
+        new 
EqualDistributionWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+    ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        WORKERS_FOR_TIER_TESTS,
+        taskWithSupervisor
+    );
+    Assert.assertNotNull(worker);
+    Assert.assertEquals("c2", worker.getWorker().getCategory());
+    Assert.assertEquals("localhost3", worker.getWorker().getHost());
+  }
+
   private ImmutableWorkerInfo selectWorker(WorkerCategorySpec 
workerCategorySpec)
   {
     final EqualDistributionWithCategorySpecWorkerSelectStrategy strategy =
@@ -187,4 +293,27 @@ public class 
EqualDistributionWithCategorySpecWorkerSelectStrategyTest
 
     return worker;
   }
+
+  /**
+   * Helper method to create a test task with supervisor ID for testing
+   */
+  @SuppressWarnings("unchecked")
+  private static Task createTestTask(String id, @Nullable String supervisorId, 
String datasource)
+  {
+    return new TestSeekableStreamIndexTask(
+        id,
+        supervisorId,
+        null,
+        DataSchema.builder()
+          .withDataSource(datasource)
+          .withTimestamp(new TimestampSpec(null, null, null))
+          .withDimensions(new DimensionsSpec(Collections.emptyList()))
+          .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), 
Collections.emptyList()))
+          .build(),
+        Mockito.mock(SeekableStreamIndexTaskTuningConfig.class),
+        Mockito.mock(SeekableStreamIndexTaskIOConfig.class),
+        null,
+        null
+    );
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
index 880ef743dca..ea4ffb16af5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/FillCapacityWithCategorySpecWorkerSelectStrategyTest.java
@@ -20,14 +20,26 @@
 package org.apache.druid.indexing.overlord.setup;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
+import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask;
 import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
+import org.apache.druid.segment.indexing.DataSchema;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.HashSet;
 
 public class FillCapacityWithCategorySpecWorkerSelectStrategyTest
@@ -80,7 +92,8 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategyTest
             "noop",
             new WorkerCategorySpec.CategoryConfig(
                 "c1",
-                ImmutableMap.of("ds1", "c1")
+                ImmutableMap.of("ds1", "c1"),
+                null
             )
         ),
         false
@@ -95,7 +108,8 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategyTest
             "noop",
             new WorkerCategorySpec.CategoryConfig(
                 null,
-                ImmutableMap.of("ds1", "c1")
+                ImmutableMap.of("ds1", "c1"),
+                null
             )
         ),
         false
@@ -110,6 +124,7 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategyTest
             "noop",
             new WorkerCategorySpec.CategoryConfig(
                 "c1",
+                null,
                 null
             )
         ),
@@ -127,6 +142,7 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategyTest
         ImmutableMap.of(
             "noop",
             new WorkerCategorySpec.CategoryConfig(
+                null,
                 null,
                 null
             )
@@ -146,7 +162,8 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategyTest
             "noop",
             new WorkerCategorySpec.CategoryConfig(
                 "c1",
-                ImmutableMap.of("ds1", "c3")
+                ImmutableMap.of("ds1", "c3"),
+                null
             )
         ),
         false
@@ -164,7 +181,8 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategyTest
             "noop",
             new WorkerCategorySpec.CategoryConfig(
                 "c1",
-                ImmutableMap.of("ds1", "c3")
+                ImmutableMap.of("ds1", "c3"),
+                null
             )
         ),
         true
@@ -174,6 +192,99 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategyTest
     Assert.assertNull(worker);
   }
 
+  @Test
+  public void testSupervisorIdCategoryAffinity()
+  {
+    final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+        ImmutableMap.of(
+            "test_seekable_stream",
+            new WorkerCategorySpec.CategoryConfig(
+                "c1",
+                ImmutableMap.of("ds1", "c1"),
+                ImmutableMap.of("supervisor1", "c2")
+            )
+        ),
+        false
+    );
+
+    // Create a test task with supervisor ID "supervisor1"
+    final Task taskWithSupervisor = createTestTask("task1", "supervisor1", 
"ds1");
+    
+    final FillCapacityWithCategorySpecWorkerSelectStrategy strategy =
+        new 
FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+    ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        WORKERS_FOR_TIER_TESTS,
+        taskWithSupervisor
+    );
+    Assert.assertNotNull(worker);
+    Assert.assertEquals("c2", worker.getWorker().getCategory());
+    Assert.assertEquals("localhost3", worker.getWorker().getHost());
+  }
+
+  @Test
+  public void testSupervisorIdCategoryAffinityFallbackToDatasource()
+  {
+    final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+        ImmutableMap.of(
+            "test_seekable_stream",
+            new WorkerCategorySpec.CategoryConfig(
+                "c2",
+                ImmutableMap.of("ds1", "c1"),
+                ImmutableMap.of("supervisor2", "c2")
+            )
+        ),
+        false
+    );
+
+    // Create a test task with supervisor ID "supervisor1" (not in 
supervisorIdCategoryAffinity map)
+    final Task taskWithSupervisor = createTestTask("task1", "supervisor1", 
"ds1");
+    
+    final FillCapacityWithCategorySpecWorkerSelectStrategy strategy =
+        new 
FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+    ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        WORKERS_FOR_TIER_TESTS,
+        taskWithSupervisor
+    );
+    Assert.assertNotNull(worker);
+    Assert.assertEquals("c1", worker.getWorker().getCategory());
+    Assert.assertEquals("localhost1", worker.getWorker().getHost());
+  }
+
+  @Test
+  public void testSupervisorIdCategoryAffinityFallbackToDefault()
+  {
+    final WorkerCategorySpec workerCategorySpec = new WorkerCategorySpec(
+        ImmutableMap.of(
+            "test_seekable_stream",
+            new WorkerCategorySpec.CategoryConfig(
+                "c2",
+                ImmutableMap.of("ds2", "c1"),
+                ImmutableMap.of("supervisor2", "c1")
+            )
+        ),
+        false
+    );
+
+    // Create a test task with supervisor ID "supervisor1" and datasource "ds1"
+    final Task taskWithSupervisor = createTestTask("task1", "supervisor1", 
"ds1");
+    
+    final FillCapacityWithCategorySpecWorkerSelectStrategy strategy =
+        new 
FillCapacityWithCategorySpecWorkerSelectStrategy(workerCategorySpec, null);
+
+    ImmutableWorkerInfo worker = strategy.findWorkerForTask(
+        new RemoteTaskRunnerConfig(),
+        WORKERS_FOR_TIER_TESTS,
+        taskWithSupervisor
+    );
+    Assert.assertNotNull(worker);
+    Assert.assertEquals("c2", worker.getWorker().getCategory());
+    Assert.assertEquals("localhost3", worker.getWorker().getHost());
+  }
+
   private ImmutableWorkerInfo selectWorker(WorkerCategorySpec 
workerCategorySpec)
   {
     final FillCapacityWithCategorySpecWorkerSelectStrategy strategy =
@@ -187,4 +298,27 @@ public class 
FillCapacityWithCategorySpecWorkerSelectStrategyTest
 
     return worker;
   }
+
+  /**
+   * Helper method to create a test task with supervisor ID for testing
+   */
+  @SuppressWarnings("unchecked")
+  private static Task createTestTask(String id, @Nullable String supervisorId, 
String datasource)
+  {
+    return new TestSeekableStreamIndexTask(
+        id,
+        supervisorId,
+        null,
+        DataSchema.builder()
+            .withDataSource(datasource)
+            .withTimestamp(new TimestampSpec(null, null, null))
+            .withDimensions(new DimensionsSpec(Collections.emptyList()))
+            .withGranularity(new ArbitraryGranularitySpec(new 
AllGranularity(), Collections.emptyList()))
+            .build(),
+        Mockito.mock(SeekableStreamIndexTaskTuningConfig.class),
+        Mockito.mock(SeekableStreamIndexTaskIOConfig.class),
+        null,
+        null
+    );
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java
index 4277984fc10..9576aa0ffbb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/WorkerCategorySpecTest.java
@@ -58,7 +58,7 @@ public class WorkerCategorySpecTest
     Assert.assertTrue(workerCategorySpec.isStrong());
     Assert.assertEquals(ImmutableMap.of(
         "index_kafka",
-        new WorkerCategorySpec.CategoryConfig("c1", ImmutableMap.of("ds1", 
"c2"))
+        new WorkerCategorySpec.CategoryConfig("c1", ImmutableMap.of("ds1", 
"c2"), null)
     ), workerCategorySpec.getCategoryMap());
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
new file mode 100644
index 00000000000..985aa7da706
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.segment.indexing.DataSchema;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Test implementation of SeekableStreamIndexTask for use in unit tests.
+ */
+public class TestSeekableStreamIndexTask extends 
SeekableStreamIndexTask<String, String, ByteEntity>
+{
+  private final SeekableStreamIndexTaskRunner<String, String, ByteEntity> 
streamingTaskRunner;
+  private final RecordSupplier<String, String, ByteEntity> recordSupplier;
+
+  public TestSeekableStreamIndexTask(
+      String id,
+      @Nullable String supervisorId,
+      @Nullable TaskResource taskResource,
+      DataSchema dataSchema,
+      SeekableStreamIndexTaskTuningConfig tuningConfig,
+      SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
+      @Nullable Map<String, Object> context,
+      @Nullable String groupId
+  )
+  {
+    this(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, 
context, groupId, null, null);
+  }
+
+  public TestSeekableStreamIndexTask(
+      String id,
+      @Nullable String supervisorId,
+      @Nullable TaskResource taskResource,
+      DataSchema dataSchema,
+      SeekableStreamIndexTaskTuningConfig tuningConfig,
+      SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
+      @Nullable Map<String, Object> context,
+      @Nullable String groupId,
+      @Nullable SeekableStreamIndexTaskRunner<String, String, ByteEntity> 
streamingTaskRunner,
+      @Nullable RecordSupplier<String, String, ByteEntity> recordSupplier
+  )
+  {
+    super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, 
context, groupId);
+    this.streamingTaskRunner = streamingTaskRunner;
+    this.recordSupplier = recordSupplier;
+  }
+
+  @Nullable
+  @Override
+  protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> 
createTaskRunner()
+  {
+    return streamingTaskRunner;
+  }
+
+  @Override
+  protected RecordSupplier<String, String, ByteEntity> 
newTaskRecordSupplier(final TaskToolbox toolbox)
+  {
+    return recordSupplier;
+  }
+
+  @Override
+  public String getType()
+  {
+    return "test_seekable_stream";
+  }
+}
+
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index e204a67cae8..5df9edd184d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -39,10 +39,8 @@ import org.apache.druid.error.DruidExceptionMatcher;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.granularity.UniformGranularitySpec;
-import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
@@ -65,6 +63,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamException;
@@ -1151,7 +1150,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
             taskTuningConfig,
             taskIoConfig,
             context,
-            "0"
+            "0",
+            null,
+            recordSupplier
     );
 
     TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
@@ -1162,7 +1163,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
             taskTuningConfig,
             taskIoConfig,
             context,
-            "0"
+            "0",
+            null,
+            recordSupplier
     );
 
     TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
@@ -1173,7 +1176,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         taskTuningConfig,
         taskIoConfig,
         context,
-        "0"
+        "0",
+        null,
+        recordSupplier
     );
 
     final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
@@ -1364,7 +1369,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
             ioConfig
         ),
         context,
-        "0"
+        "0",
+        null,
+        recordSupplier
     );
 
     TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
@@ -1384,7 +1391,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
             ioConfig
         ),
         context,
-        "1"
+        "1",
+        null,
+        recordSupplier
     );
 
     TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
@@ -1404,7 +1413,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
             ioConfig
         ),
         context,
-        "2"
+        "2",
+        null,
+        recordSupplier
     );
 
     final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
@@ -1596,7 +1607,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         ),
         context,
         "0",
-        streamingTaskRunner
+        streamingTaskRunner,
+        recordSupplier
     );
 
     final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
@@ -2907,78 +2919,6 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     };
   }
 
-  private class TestSeekableStreamIndexTask extends 
SeekableStreamIndexTask<String, String, ByteEntity>
-  {
-    private final SeekableStreamIndexTaskRunner<String, String, ByteEntity> 
streamingTaskRunner;
-
-    public TestSeekableStreamIndexTask(
-        String id,
-        @Nullable String supervisorId,
-        @Nullable TaskResource taskResource,
-        DataSchema dataSchema,
-        SeekableStreamIndexTaskTuningConfig tuningConfig,
-        SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
-        @Nullable Map<String, Object> context,
-        @Nullable String groupId
-    )
-    {
-      this(
-          id,
-          supervisorId,
-          taskResource,
-          dataSchema,
-          tuningConfig,
-          ioConfig,
-          context,
-          groupId,
-          null
-      );
-    }
-
-    public TestSeekableStreamIndexTask(
-        String id,
-        @Nullable String supervisorId,
-        @Nullable TaskResource taskResource,
-        DataSchema dataSchema,
-        SeekableStreamIndexTaskTuningConfig tuningConfig,
-        SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
-        @Nullable Map<String, Object> context,
-        @Nullable String groupId,
-        @Nullable SeekableStreamIndexTaskRunner<String, String, ByteEntity> 
streamingTaskRunner
-    )
-    {
-      super(
-          id,
-          supervisorId,
-          taskResource,
-          dataSchema,
-          tuningConfig,
-          ioConfig,
-          context,
-          groupId
-      );
-      this.streamingTaskRunner = streamingTaskRunner;
-    }
-
-    @Nullable
-    @Override
-    protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> 
createTaskRunner()
-    {
-      return streamingTaskRunner;
-    }
-
-    @Override
-    protected RecordSupplier<String, String, ByteEntity> 
newTaskRecordSupplier(final TaskToolbox toolbox)
-    {
-      return recordSupplier;
-    }
-
-    @Override
-    public String getType()
-    {
-      return "test";
-    }
-  }
 
   private abstract class BaseTestSeekableStreamSupervisor extends 
SeekableStreamSupervisor<String, String, ByteEntity>
   {
@@ -3069,7 +3009,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
           taskTuningConfig,
           taskIoConfig,
           null,
-          null
+          null,
+          null,
+          recordSupplier
       ));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to