This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f17b4f44594 Extend server priorities to realtime indexing tasks for 
query isolation (#19040)
f17b4f44594 is described below

commit f17b4f445949c5250524823e10fb78cd58259e45
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Mar 2 22:49:00 2026 -0800

    Extend server priorities to realtime indexing tasks for query isolation 
(#19040)
    
    Fixes https://github.com/apache/druid/issues/19018
    
    Added serverPriorityToReplicas parameter to the streaming supervisor specs 
(kafka/kinesis/rabbit). This allows operators to distribute task replicas 
across different server priorities for realtime indexing tasks. Similar to 
historical tiering, this enables query isolation for mixed workload scenarios 
on the Peons, allowing some task replicas to handle queries of specific 
priorities. This config is optional and is compatible with the existing 
replicas property if specified. If unspecif [...]
---
 docs/ingestion/supervisor.md                       |   3 +-
 .../embedded/indexing/StreamIndexTestBase.java     |   3 +-
 .../embedded/kinesis/KinesisDataFormatsTest.java   |   3 +-
 .../rabbitstream/RabbitStreamIndexTask.java        |   6 +-
 .../supervisor/RabbitStreamSupervisor.java         |   8 +-
 .../supervisor/RabbitStreamSupervisorIOConfig.java |   6 +-
 .../supervisor/RabbitStreamSupervisorTest.java     |   9 +-
 .../druid/indexing/kafka/KafkaIndexTask.java       |   6 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |   7 +-
 .../kafka/supervisor/KafkaSupervisorIOConfig.java  |   6 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   3 +-
 .../kafka/supervisor/KafkaIOConfigBuilder.java     |   1 +
 .../supervisor/KafkaSupervisorIOConfigTest.java    |   9 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      | 326 ++++++++++++++++++---
 .../druid/indexing/kinesis/KinesisIndexTask.java   |   6 +-
 .../kinesis/supervisor/KinesisSupervisor.java      |   8 +-
 .../supervisor/KinesisSupervisorIOConfig.java      |   7 +-
 .../kinesis/KinesisIndexTaskSerdeTest.java         |   1 +
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   3 +-
 .../indexing/kinesis/KinesisSamplerSpecTest.java   |   9 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  34 ++-
 .../druid/indexing/common/task/TaskResource.java   |   2 +-
 .../druid/indexing/overlord/ForkingTaskRunner.java |   9 +
 .../seekablestream/SeekableStreamIndexTask.java    |  19 +-
 .../supervisor/SeekableStreamSupervisor.java       | 133 +++++++--
 .../SeekableStreamSupervisorIOConfig.java          |  43 ++-
 .../indexing/overlord/ForkingTaskRunnerTest.java   |  81 +++++
 .../SeekableStreamIndexTaskRunnerAuthTest.java     |  23 +-
 .../SeekableStreamSamplerSpecTest.java             |   1 +
 .../TestSeekableStreamIndexTask.java               |  19 +-
 .../TestSeekableStreamIndexTaskIOConfig.java       |  44 +++
 .../SeekableStreamSupervisorIOConfigTest.java      | 114 ++++++-
 .../SeekableStreamSupervisorSpecTest.java          |   7 +-
 .../SeekableStreamSupervisorStateTest.java         | 273 ++++++++++++++---
 .../SeekableStreamSupervisorTestBase.java          |   4 +-
 .../java/org/apache/druid/cli/CliPeonTest.java     |   2 +-
 36 files changed, 1069 insertions(+), 169 deletions(-)

diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index c13764acd97..360e6ad86d2 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -56,7 +56,7 @@ For configuration properties specific to Kafka and Kinesis, 
see [Kafka I/O confi
 |`inputFormat`|Object|The [input 
format](../ingestion/data-formats.md#input-format) to define input data 
parsing.|Yes||
 |`autoScalerConfig`|Object|Defines auto scaling behavior for ingestion tasks. 
See [Task autoscaler](#task-autoscaler) for more information.|No|null|
 |`taskCount`|Integer|The maximum number of reading tasks in a replica set. 
Multiply `taskCount` and replicas to measure the maximum number of reading 
tasks. The total number of tasks, reading and publishing, is higher than the 
maximum number of reading tasks. See [Capacity 
planning](../ingestion/supervisor.md#capacity-planning) for more details. When 
`taskCount` is greater than the number of Kafka partitions or Kinesis shards, 
the actual number of reading tasks is less than the `taskCoun [...]
-|`replicas`|Integer|The number of replica sets, where 1 is a single set of 
tasks (no replication). Druid always assigns replicate tasks to different 
workers to provide resiliency against process failure.|No|1|
+|`replicas`|Integer|The number of replica sets, where 1 is a single set of 
tasks (no replication). Druid always assigns task replicas to different workers 
to provide resiliency against process failure. See `serverPriorityToReplicas` 
to assign server priorities for task replicas.|No|1|
 |`taskDuration`|ISO 8601 period|The length of time before tasks stop reading 
and begin publishing segments.|No|`PT1H`|
 |`startDelay`|ISO 8601 period|The period to wait before the supervisor starts 
managing tasks.|No|`PT5S`|
 |`period`|ISO 8601 period|Determines how often the supervisor executes its 
management logic. Note that the supervisor also runs in response to certain 
events, such as tasks succeeding, failing, and reaching their task duration. 
The `period` value specifies the maximum time between iterations.|No|`PT30S`|
@@ -65,6 +65,7 @@ For configuration properties specific to Kafka and Kinesis, 
see [Kafka I/O confi
 |`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps earlier than this period before the task was created. 
For example, if this property is set to `PT1H` and the supervisor creates a 
task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than 
`2016-01-01T11:00Z`. This may help prevent concurrency issues if your data 
stream has late messages and you have multiple pipelines that need to operate 
on the same segments, such as a  [...]
 |`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject 
messages with timestamps later than this period after the task reached its task 
duration. For example, if this property is set to `PT1H`, the task duration is 
set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid 
drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes 
run past their task duration, such as in cases of supervisor failover.|No||
 |`stopTaskCount`|Integer|Limits the number of ingestion tasks Druid can cycle 
at any given time. If not set, Druid can cycle all tasks at the same time. If 
set to a value less than `taskCount`, your cluster needs fewer available slots 
to run the supervisor. You can save costs by scaling down your ingestion tier, 
but this can lead to slower cycle times and lag. See 
[`stopTaskCount`](#stoptaskcount) for more information.|No|`taskCount` value|
+|`serverPriorityToReplicas`|Object (`Map<Integer, Integer>`)|Map of server 
priorities to the number of replicas per priority. When set, each task replica 
is assigned a server priority that corresponds to `druid.server.priority` on 
the Peon process to enable query isolation for mixed workloads using [query 
routing strategies](../configuration/index.md#query-routing). If not 
configured, the `replicas` setting applies and all task replicas are assigned a 
default priority of 0.<br/><br/>For  [...]
 
 #### Task autoscaler
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
index b6743ef5955..ea9dd037bbe 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
@@ -119,7 +119,8 @@ public abstract class StreamIndexTestBase extends 
EmbeddedClusterTestBase
             true,
             Period.seconds(5),
             null, null, null, null, null, null, null, null,
-            false
+            false,
+            null
         ),
         Map.of(),
         false,
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java
index cc6da2169f4..1a76594ec87 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java
@@ -104,7 +104,8 @@ public class KinesisDataFormatsTest extends 
StreamIndexDataFormatsTestBase
             true,
             Period.seconds(5),
             null, null, null, null, null, null, null, null,
-            false
+            false,
+            null
         ),
         Map.of(),
         false,
diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
index 4aceebe37be..452ccf5e018 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
@@ -56,7 +56,8 @@ public class RabbitStreamIndexTask extends 
SeekableStreamIndexTask<String, Long,
       @JsonProperty("tuningConfig") RabbitStreamIndexTaskTuningConfig 
tuningConfig,
       @JsonProperty("ioConfig") RabbitStreamIndexTaskIOConfig ioConfig,
       @JsonProperty("context") Map<String, Object> context,
-      @JacksonInject ObjectMapper configMapper
+      @JacksonInject ObjectMapper configMapper,
+      @JsonProperty("serverPriority") @Nullable Integer serverPriority
   )
   {
     super(
@@ -67,7 +68,8 @@ public class RabbitStreamIndexTask extends 
SeekableStreamIndexTask<String, Long,
         tuningConfig,
         ioConfig,
         context,
-        getFormattedGroupId(Configs.valueOrDefault(supervisorId, 
dataSchema.getDataSource()), TYPE)
+        getFormattedGroupId(Configs.valueOrDefault(supervisorId, 
dataSchema.getDataSource()), TYPE),
+        serverPriority
     );
     this.configMapper = configMapper;
 
diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
index 2ced013ef5b..9be4a77a8d4 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
@@ -55,6 +55,7 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -214,7 +215,9 @@ public class RabbitStreamSupervisor extends 
SeekableStreamSupervisor<String, Lon
       TreeMap<Integer, Map<String, Long>> sequenceOffsets,
       SeekableStreamIndexTaskIOConfig taskIoConfig,
       SeekableStreamIndexTaskTuningConfig taskTuningConfig,
-      RowIngestionMetersFactory rowIngestionMetersFactory) throws 
JsonProcessingException
+      RowIngestionMetersFactory rowIngestionMetersFactory,
+      @Nullable List<Integer> serverPrioritiesToAssign
+  ) throws JsonProcessingException
   {
     final String checkpoints = 
sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
     final Map<String, Object> context = createBaseTaskContexts();
@@ -231,7 +234,8 @@ public class RabbitStreamSupervisor extends 
SeekableStreamSupervisor<String, Lon
           (RabbitStreamIndexTaskTuningConfig) taskTuningConfig,
           (RabbitStreamIndexTaskIOConfig) taskIoConfig,
           context,
-          sortingMapper
+          sortingMapper,
+          CollectionUtils.isNullOrEmpty(serverPrioritiesToAssign) ? null : 
serverPrioritiesToAssign.get(i)
       ));
     }
     return taskList;
diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
index 0538163df61..8aad5b76221 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
@@ -65,7 +65,8 @@ public class RabbitStreamSupervisorIOConfig extends 
SeekableStreamSupervisorIOCo
       @JsonProperty("lateMessageRejectionPeriod") Period 
lateMessageRejectionPeriod,
       @JsonProperty("earlyMessageRejectionPeriod") Period 
earlyMessageRejectionPeriod,
       @JsonProperty("lateMessageRejectionStartDateTime") DateTime 
lateMessageRejectionStartDateTime,
-      @JsonProperty("stopTaskCount") Integer stopTaskCount
+      @JsonProperty("stopTaskCount") Integer stopTaskCount,
+      @Nullable @JsonProperty("serverPriorityToReplicas") Map<Integer, 
Integer> serverPriorityToReplicas
   )
   {
     super(
@@ -84,7 +85,8 @@ public class RabbitStreamSupervisorIOConfig extends 
SeekableStreamSupervisorIOCo
         LagAggregator.DEFAULT,
         lateMessageRejectionStartDateTime,
         new IdleConfig(null, null),
-        stopTaskCount
+        stopTaskCount,
+        serverPriorityToReplicas
     );
 
     this.consumerProperties = consumerProperties;
diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java
index 32fd2f2d53f..82e0b164471 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java
@@ -211,7 +211,8 @@ public class RabbitStreamSupervisorTest extends 
EasyMockSupport
         lateMessageRejectionPeriod, // latemessagerejection
         earlyMessageRejectionPeriod, // early message rejection
         null, // latemessagerejectionstartdatetime
-        1
+        1,
+        null
     );
     RabbitStreamIndexTaskClientFactory clientFactory = new 
RabbitStreamIndexTaskClientFactory(null,
         OBJECT_MAPPER);
@@ -276,7 +277,8 @@ public class RabbitStreamSupervisorTest extends 
EasyMockSupport
         null, // latemessagerejection
         null, // early message rejection
         null, // latemessagerejectionstartdatetime
-        1
+        1,
+        null
     );
     RabbitStreamIndexTaskClientFactory clientFactory = new 
RabbitStreamIndexTaskClientFactory(null,
         OBJECT_MAPPER);
@@ -418,7 +420,8 @@ public class RabbitStreamSupervisorTest extends 
EasyMockSupport
             null, // latemessagerejection
             null, // early message rejection
             null, // latemessagerejectionstartdatetime
-            1
+            1,
+            null
         )
     );
 
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 40cab634c89..402c6b9144b 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -69,7 +69,8 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<KafkaTopicPartition,
       @JsonProperty("tuningConfig") KafkaIndexTaskTuningConfig tuningConfig,
       @JsonProperty("ioConfig") KafkaIndexTaskIOConfig ioConfig,
       @JsonProperty("context") Map<String, Object> context,
-      @JacksonInject ObjectMapper configMapper
+      @JacksonInject ObjectMapper configMapper,
+      @JsonProperty("serverPriority") @Nullable Integer serverPriority
   )
   {
     super(
@@ -80,7 +81,8 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<KafkaTopicPartition,
         tuningConfig,
         ioConfig,
         context,
-        getFormattedGroupId(Configs.valueOrDefault(supervisorId, 
dataSchema.getDataSource()), TYPE)
+        getFormattedGroupId(Configs.valueOrDefault(supervisorId, 
dataSchema.getDataSource()), TYPE),
+        serverPriority
     );
     this.configMapper = configMapper;
 
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 68bdb3fb405..e226db37676 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -60,6 +60,7 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -234,7 +235,8 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
       TreeMap<Integer, Map<KafkaTopicPartition, Long>> sequenceOffsets,
       SeekableStreamIndexTaskIOConfig taskIoConfig,
       SeekableStreamIndexTaskTuningConfig taskTuningConfig,
-      RowIngestionMetersFactory rowIngestionMetersFactory
+      RowIngestionMetersFactory rowIngestionMetersFactory,
+      @Nullable List<Integer> serverPrioritiesToAssign
   ) throws JsonProcessingException
   {
     final String checkpoints = 
sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
@@ -252,7 +254,8 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
           (KafkaIndexTaskTuningConfig) taskTuningConfig,
           (KafkaIndexTaskIOConfig) taskIoConfig,
           context,
-          sortingMapper
+          sortingMapper,
+          CollectionUtils.isNullOrEmpty(serverPrioritiesToAssign) ? null : 
serverPrioritiesToAssign.get(i)
       ));
     }
     return taskList;
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 4eac3163fe3..992ff292694 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -77,7 +77,8 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
       @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
       @JsonProperty("idleConfig") IdleConfig idleConfig,
       @JsonProperty("stopTaskCount") Integer stopTaskCount,
-      @Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics
+      @Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics,
+      @Nullable @JsonProperty("serverPriorityToReplicas") Map<Integer, 
Integer> serverPriorityToReplicas
   )
   {
     super(
@@ -96,7 +97,8 @@ public class KafkaSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
         Configs.valueOrDefault(lagAggregator, LagAggregator.DEFAULT),
         lateMessageRejectionStartDateTime,
         idleConfig,
-        stopTaskCount
+        stopTaskCount,
+        serverPriorityToReplicas
     );
 
     this.consumerProperties = Preconditions.checkNotNull(consumerProperties, 
"consumerProperties");
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index ce84944bb63..a5a3243b5b8 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2982,7 +2982,8 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         tuningConfig,
         ioConfig,
         context,
-        OBJECT_MAPPER
+        OBJECT_MAPPER,
+        null
     );
     task.setPollRetryMs(POLL_RETRY_MS);
     return task;
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
index a649e07915a..24c1656fc7e 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
@@ -92,6 +92,7 @@ public class KafkaIOConfigBuilder extends 
SupervisorIOConfigBuilder<KafkaIOConfi
         null,
         idleConfig,
         stopTaskCount,
+        null,
         null
     );
   }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 2da46aaf0e2..6295d41937e 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -340,7 +340,8 @@ public class KafkaSupervisorIOConfigTest
         null,
         null,
         null,
-        false
+        false,
+        null
     );
     String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = 
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
@@ -375,7 +376,8 @@ public class KafkaSupervisorIOConfigTest
         null,
         null,
         null,
-        false
+        false,
+        null
     );
     Assert.assertEquals(5, kafkaSupervisorIOConfig.getTaskCount().intValue());
 
@@ -427,7 +429,8 @@ public class KafkaSupervisorIOConfigTest
         null,
         mapper.convertValue(idleConfig, IdleConfig.class),
         null,
-        false
+        false,
+        null
     );
     String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
     KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = 
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 4f6a93ce3a1..b5e00bcaab4 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -71,6 +71,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
@@ -133,6 +134,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -482,33 +484,46 @@ public class KafkaSupervisorTest extends EasyMockSupport
             null,
             Duration.standardHours(2).getStandardMinutes()
         ),
-        new KafkaIndexTaskTuningConfig(
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null
-        ),
+        supervisor.getTuningConfig(),
+        null,
         null
     ).get(0);
     Assert.assertTrue(indexTask.getRunner() instanceof KafkaIndexTaskRunner);
+    Assert.assertNull(indexTask.getServerPriority());
+  }
+
+  @Test
+  public void testCreateIndexTasksWithServerPrioritiesToAssign() throws 
JsonProcessingException
+  {
+    supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+    final List<SeekableStreamIndexTask<KafkaTopicPartition, Long, 
KafkaRecordEntity>> taskList =
+        supervisor.createIndexTasks(
+            3,
+            "seq",
+            OBJECT_MAPPER,
+            new TreeMap<>(),
+            new KafkaIndexTaskIOConfig(
+                0,
+                "seq",
+                new SeekableStreamStartSequenceNumbers<>("test", 
Collections.emptyMap(), Collections.emptySet()),
+                new SeekableStreamEndSequenceNumbers<>("test", 
Collections.emptyMap()),
+                Collections.emptyMap(),
+                null,
+                null,
+                null,
+                null,
+                INPUT_FORMAT,
+                null,
+                Duration.standardHours(2).getStandardMinutes()
+            ),
+            supervisor.getTuningConfig(),
+            null,
+            List.of(10, 20, 20)
+        );
+    Assert.assertEquals(3, taskList.size());
+    Assert.assertEquals(Integer.valueOf(10), 
taskList.get(0).getServerPriority());
+    Assert.assertEquals(Integer.valueOf(20), 
taskList.get(1).getServerPriority());
+    Assert.assertEquals(Integer.valueOf(20), 
taskList.get(2).getServerPriority());
   }
 
   @Test
@@ -1613,6 +1628,104 @@ public class KafkaSupervisorTest extends EasyMockSupport
     );
   }
 
+  @Test
+  public void testRequeueTaskWithServerPrioritiesWhenFailed() throws Exception
+  {
+    supervisor = getTestableSupervisor(null, 2, 2, true, true, "PT1H", null, 
null, false, kafkaHost, null, Map.of(1, 1, 2, 1));
+    addSomeEvents(1);
+
+    Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+            .anyTimes();
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+    
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4);
+
+    TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints1 = new 
TreeMap<>();
+    checkpoints1.put(0, singlePartitionMap(topic, 0, 0L, 2, 0L));
+    TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints2 = new 
TreeMap<>();
+    checkpoints2.put(0, singlePartitionMap(topic, 1, 0L));
+    
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
 EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints1))
+            .anyTimes();
+    
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
 EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints2))
+            .anyTimes();
+
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+
+    List<Task> tasks = captured.getValues();
+
+    // test that running the main loop again checks the status of the tasks 
that were created and does nothing if they
+    // are all still running
+    EasyMock.reset(taskStorage);
+    EasyMock.reset(taskQueue);
+    
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(toMap(tasks)).anyTimes();
+    final List<Integer> observedServerPriorities = new ArrayList<>();
+    for (Task task : tasks) {
+      EasyMock.expect(taskStorage.getStatus(task.getId()))
+              .andReturn(Optional.of(TaskStatus.running(task.getId())))
+              .anyTimes();
+      
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
+      observedServerPriorities.add(((KafkaIndexTask) 
task).getServerPriority());
+    }
+    Assert.assertEquals(4, tasks.size());
+    Assert.assertEquals(List.of(2, 1, 2, 1), observedServerPriorities);
+
+    EasyMock.replay(taskStorage);
+    EasyMock.replay(taskQueue);
+
+    supervisor.runInternal();
+    verifyAll();
+
+    // test that a task failing causes a new task to be re-queued with the 
same parameters
+    Capture<Task> aNewTaskCapture = Capture.newInstance();
+    List<Task> imStillAlive = tasks.subList(0, 3);
+    KafkaIndexTask iHaveFailed = (KafkaIndexTask) tasks.get(3);
+    EasyMock.reset(taskStorage);
+    EasyMock.reset(taskQueue);
+    
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(toMap(imStillAlive)).anyTimes();
+    for (Task task : imStillAlive) {
+      EasyMock.expect(taskStorage.getStatus(task.getId()))
+              .andReturn(Optional.of(TaskStatus.running(task.getId())))
+              .anyTimes();
+      
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
+    }
+    EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId()))
+            .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId(), 
"Dummy task status failure err message")));
+    
EasyMock.expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of(iHaveFailed)).anyTimes();
+    
EasyMock.expect(taskQueue.add(EasyMock.capture(aNewTaskCapture))).andReturn(true);
+    EasyMock.replay(taskStorage);
+    EasyMock.replay(taskQueue);
+
+    supervisor.runInternal();
+    verifyAll();
+
+    Assert.assertNotEquals(iHaveFailed.getId(), 
aNewTaskCapture.getValue().getId());
+    KafkaIndexTask retriedTask = (KafkaIndexTask) aNewTaskCapture.getValue();
+    Assert.assertEquals(
+        iHaveFailed.getIOConfig().getBaseSequenceName(),
+        retriedTask.getIOConfig().getBaseSequenceName()
+    );
+    Assert.assertEquals(Integer.valueOf(1), retriedTask.getServerPriority());
+  }
+
   @Test
   public void testRequeueAdoptedTaskWhenFailed() throws Exception
   {
@@ -2423,7 +2536,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("id1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         1,
@@ -2431,7 +2545,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("id2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
     supervisor.updateCurrentAndLatestOffsets();
     supervisor.runInternal();
@@ -2580,7 +2695,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("id1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         1,
@@ -2588,7 +2704,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("id2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
     supervisor.updateCurrentAndLatestOffsets();
     supervisor.runInternal();
@@ -2916,7 +3033,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("id1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
         1,
@@ -2924,7 +3042,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("id2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
     supervisor.updateCurrentAndLatestOffsets();
     supervisor.runInternal();
@@ -4369,7 +4488,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -4378,7 +4498,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     EasyMock.expect(taskClient.getMovingAveragesAsync("task1"))
@@ -4411,7 +4532,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -4420,7 +4542,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     ParseExceptionReport exception1 = new ParseExceptionReport(
@@ -4627,15 +4750,18 @@ public class KafkaSupervisorTest extends EasyMockSupport
         tuningConfigBuilder().build()
     );
 
-    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+    SeekableStreamSupervisor<KafkaTopicPartition, Long, 
KafkaRecordEntity>.TaskGroup taskGroup = 
supervisor.addTaskGroupToActivelyReadingTaskGroup(
         42,
         singlePartitionMap(topic, 0, 0L, 2, 0L),
         minMessageTime,
         maxMessageTime,
         ImmutableSet.of("id1", "id2", "id3", "id4"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
+    Assert.assertNull(supervisor.computeUnassignedServerPriorities(taskGroup, 
3));
+
     DataSchema modifiedDataSchema = getDataSchema("some other datasource");
 
     KafkaSupervisorTuningConfig modifiedTuningConfig = tuningConfigBuilder()
@@ -5029,7 +5155,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   @Test
   public void test_doesTaskMatchSupervisor()
   {
-    supervisor = getTestableSupervisor("supervisorId", 1, 1, true, true, null, 
new Period("PT1H"), new Period("PT1H"), false, kafkaHost, null);
+    supervisor = getTestableSupervisor("supervisorId", 1, 1, true, true, null, 
new Period("PT1H"), new Period("PT1H"), false, kafkaHost, null, null);
 
     KafkaIndexTask kafkaTaskMatch = createMock(KafkaIndexTask.class);
     
EasyMock.expect(kafkaTaskMatch.getSupervisorId()).andReturn("supervisorId");
@@ -5244,6 +5370,112 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertFalse(supervisor.getPartitionOffsets().isEmpty());
   }
 
+  @Test
+  public void 
testComputeUnassignedServerPriorities_whenMultipleReplicasPerPriorityIsSet()
+  {
+    final Map<String, Object> consumerProperties = 
KafkaConsumerConfigs.getConsumerProperties();
+    consumerProperties.put("bootstrap.servers", kafkaHost);
+
+    final KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new 
KafkaSupervisorIOConfig(
+        topic,
+        null,
+        INPUT_FORMAT,
+        null,
+        1,
+        new Period("PT1H"),
+        consumerProperties,
+        null,
+        null,
+        KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+        new Period("P1D"),
+        new Period("PT30S"),
+        true,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        true,
+        Map.of(
+            10, 2,
+            20, 3
+        )
+    );
+
+    Assert.assertEquals(5, (int) kafkaSupervisorIOConfig.getReplicas());
+
+    final KafkaIndexTaskClientFactory taskClientFactory = new 
KafkaIndexTaskClientFactory(null, null);
+    final KafkaSupervisorSpec spec = new KafkaSupervisorSpec(
+        null,
+        null,
+        dataSchema,
+        KafkaSupervisorTuningConfig.defaultConfig(),
+        kafkaSupervisorIOConfig,
+        null,
+        false,
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        OBJECT_MAPPER,
+        new NoopServiceEmitter(),
+        new DruidMonitorSchedulerConfig(),
+        rowIngestionMetersFactory,
+        new SupervisorStateManagerConfig()
+    );
+
+    supervisor = new TestableKafkaSupervisor(
+        taskStorage,
+        taskMaster,
+        indexerMetadataStorageCoordinator,
+        taskClientFactory,
+        OBJECT_MAPPER,
+        spec,
+        rowIngestionMetersFactory
+    );
+
+    final SeekableStreamSupervisor<KafkaTopicPartition, Long, 
KafkaRecordEntity>.TaskGroup taskGroup1 =
+        supervisor.addTaskGroupToActivelyReadingTaskGroup(
+            0,
+            ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 0L),
+            null,
+            null,
+            Set.of(),
+            Set.of(),
+            Map.of()
+        );
+
+    Assert.assertEquals(List.of(20, 20, 20, 10, 10), 
supervisor.computeUnassignedServerPriorities(taskGroup1, 5));
+
+    final SeekableStreamSupervisor<KafkaTopicPartition, Long, 
KafkaRecordEntity>.TaskGroup taskGroup2 =
+        supervisor.addTaskGroupToActivelyReadingTaskGroup(
+            1,
+            ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 0L),
+            null,
+            null,
+            Set.of("task1", "task2"),
+            Set.of(),
+            Map.of("task1", 10, "task2", 10)
+        );
+
+    Assert.assertEquals(List.of(20, 20, 20), 
supervisor.computeUnassignedServerPriorities(taskGroup2, 3));
+
+    final SeekableStreamSupervisor<KafkaTopicPartition, Long, 
KafkaRecordEntity>.TaskGroup taskGroup3 =
+        supervisor.addTaskGroupToActivelyReadingTaskGroup(
+            2,
+            ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 0L),
+            null,
+            null,
+            Set.of("task1", "task2", "task3"),
+            Set.of(),
+            Map.of("task1", 10, "task2", 10, "task3", 20)
+        );
+
+    Assert.assertEquals(List.of(20, 20), 
supervisor.computeUnassignedServerPriorities(taskGroup3, 2));
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     // create topic manually
@@ -5355,6 +5587,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         earlyMessageRejectionPeriod,
         false,
         kafkaHost,
+        null,
         null
     );
   }
@@ -5381,6 +5614,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         earlyMessageRejectionPeriod,
         suspended,
         kafkaHost,
+        null,
         null
     );
   }
@@ -5407,7 +5641,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         earlyMessageRejectionPeriod,
         suspended,
         kafkaHost,
-        idleConfig
+        idleConfig,
+        null
     );
   }
 
@@ -5422,7 +5657,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
       Period earlyMessageRejectionPeriod,
       boolean suspended,
       String kafkaHost,
-      IdleConfig idleConfig
+      IdleConfig idleConfig,
+      Map<Integer, Integer> serverPriorityToReplicas
   )
   {
     final Map<String, Object> consumerProperties = 
KafkaConsumerConfigs.getConsumerProperties();
@@ -5449,7 +5685,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         idleConfig,
         null,
-        true
+        true,
+        serverPriorityToReplicas
     );
 
     KafkaIndexTaskClientFactory taskClientFactory = new 
KafkaIndexTaskClientFactory(
@@ -5543,7 +5780,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        false
+        false,
+        null
     );
 
     KafkaIndexTaskClientFactory taskClientFactory = new 
KafkaIndexTaskClientFactory(
@@ -5637,7 +5875,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        false
+        false,
+        null
     );
 
     KafkaIndexTaskClientFactory taskClientFactory = new 
KafkaIndexTaskClientFactory(
@@ -5786,7 +6025,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
             Duration.standardHours(2).getStandardMinutes()
         ),
         Collections.emptyMap(),
-        OBJECT_MAPPER
+        OBJECT_MAPPER,
+        null
     );
   }
 
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index a09cfcfa5ba..626586c23ac 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -72,7 +72,8 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, Ki
       @JsonProperty("ioConfig") KinesisIndexTaskIOConfig ioConfig,
       @JsonProperty("context") Map<String, Object> context,
       @JsonProperty("useListShards") boolean useListShards,
-      @JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE) 
AWSCredentialsConfig awsCredentialsConfig
+      @JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE) 
AWSCredentialsConfig awsCredentialsConfig,
+      @JsonProperty("serverPriority") @Nullable Integer serverPriority
   )
   {
     super(
@@ -83,7 +84,8 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, Ki
         tuningConfig,
         ioConfig,
         context,
-        getFormattedGroupId(Configs.valueOrDefault(supervisorId, 
dataSchema.getDataSource()), TYPE)
+        getFormattedGroupId(Configs.valueOrDefault(supervisorId, 
dataSchema.getDataSource()), TYPE),
+        serverPriority
     );
     this.useListShards = useListShards;
     this.awsCredentialsConfig = awsCredentialsConfig;
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 884476181fa..08491caa8ff 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -56,8 +56,10 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -157,7 +159,8 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String,
       TreeMap<Integer, Map<String, String>> sequenceOffsets,
       SeekableStreamIndexTaskIOConfig taskIoConfig,
       SeekableStreamIndexTaskTuningConfig taskTuningConfig,
-      RowIngestionMetersFactory rowIngestionMetersFactory
+      RowIngestionMetersFactory rowIngestionMetersFactory,
+      @Nullable List<Integer> serverPrioritiesToAssign
   ) throws JsonProcessingException
   {
     final String checkpoints = 
sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
@@ -176,7 +179,8 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String,
           (KinesisIndexTaskIOConfig) taskIoConfig,
           context,
           spec.getSpec().getTuningConfig().isUseListShards(),
-          awsCredentialsConfig
+          awsCredentialsConfig,
+          CollectionUtils.isNullOrEmpty(serverPrioritiesToAssign) ? null : 
serverPrioritiesToAssign.get(i)
       ));
     }
     return taskList;
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index ce6eece5af2..6c325bd0744 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -34,6 +34,7 @@ import org.joda.time.DateTime;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
+import java.util.Map;
 
 public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
 {
@@ -77,7 +78,8 @@ public class KinesisSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
       @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
       @JsonProperty("awsExternalId") String awsExternalId,
       @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig 
autoScalerConfig,
-      @JsonProperty("deaggregate") @Deprecated boolean deaggregate
+      @JsonProperty("deaggregate") @Deprecated boolean deaggregate,
+      @Nullable @JsonProperty("serverPriorityToReplicas") Map<Integer, 
Integer> serverPriorityToReplicas
   )
   {
     super(
@@ -96,7 +98,8 @@ public class KinesisSupervisorIOConfig extends 
SeekableStreamSupervisorIOConfig
         LagAggregator.DEFAULT,
         lateMessageRejectionStartDateTime,
         new IdleConfig(null, null),
-        null
+        null,
+        serverPriorityToReplicas
     );
 
     this.endpoint = endpoint != null
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index 8a0ec53bb30..84b843958eb 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -126,6 +126,7 @@ public class KinesisIndexTaskSerdeTest
         IO_CONFIG,
         null,
         false,
+        null,
         null
     );
     ObjectMapper objectMapper = createObjectMapper();
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 69764a5d800..8a1c1c6dfe1 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2526,7 +2526,8 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
           ioConfig,
           context,
           false,
-          awsCredentialsConfig
+          awsCredentialsConfig,
+          null
       );
     }
 
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
index cda3d4c1452..290e16b9756 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -145,7 +145,8 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
             null,
             null,
             null,
-            false
+            false,
+            null
         ),
         null,
         null,
@@ -225,7 +226,8 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
             null,
             null,
             null,
-            false
+            false,
+            null
         ),
         null,
         null,
@@ -279,7 +281,8 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
             null,
             null,
             null,
-            false
+            false,
+            null
         ),
         null,
         null,
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 1fd50c13545..0039f730ec2 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -468,7 +468,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        false
+        false,
+        null
     );
     KinesisIndexTaskClientFactory clientFactory = new 
KinesisIndexTaskClientFactory(null, OBJECT_MAPPER);
     KinesisSupervisor supervisor = new KinesisSupervisor(
@@ -533,7 +534,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        false
+        false,
+        null
     );
 
     AutoScalerConfig autoscalerConfigNull = 
kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoScalerConfig();
@@ -560,7 +562,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
-        false
+        false,
+        null
     );
 
     AutoScalerConfig autoscalerConfig = 
kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoScalerConfig();
@@ -3738,7 +3741,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -3747,7 +3751,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     
EasyMock.expect(taskClient.getMovingAveragesAsync("task1")).andReturn(Futures.immediateFuture(ImmutableMap.of(
@@ -3977,7 +3982,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         minMessageTime,
         maxMessageTime,
         ImmutableSet.of("id1", "id2", "id3", "id4"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     DataSchema modifiedDataSchema = getDataSchema("some other datasource");
@@ -4215,7 +4221,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
             null,
             null,
             null,
-            false
+            false,
+            null
         ),
         null,
         null,
@@ -5179,7 +5186,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        false
+        false,
+        null
     );
 
     KinesisIndexTaskClientFactory taskClientFactory = new 
KinesisIndexTaskClientFactory(
@@ -5321,7 +5329,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
          autoScalerConfig,
-        false
+        false,
+        null
     );
 
     KinesisIndexTaskClientFactory taskClientFactory = new 
KinesisIndexTaskClientFactory(
@@ -5407,7 +5416,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        false
+        false,
+        null
     );
 
     KinesisIndexTaskClientFactory taskClientFactory = new 
KinesisIndexTaskClientFactory(
@@ -5495,7 +5505,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        false
+        false,
+        null
     );
 
     KinesisIndexTaskClientFactory taskClientFactory = new 
KinesisIndexTaskClientFactory(
@@ -5646,6 +5657,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         ),
         Collections.emptyMap(),
         false,
+        null,
         null
     );
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResource.java
index 94b85d3b29c..870b1c71575 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResource.java
@@ -42,7 +42,7 @@ public class TaskResource
   }
 
   /**
-   * Returns availability group ID of this task. Tasks the same availability 
group cannot be assigned to the same
+   * Returns availability group ID of this task. Tasks of the same 
availability group cannot be assigned to the same
    * worker. If tasks do not have this restriction, a common convention is to 
set the availability group ID to the
    * task ID.
    *
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 5d87ab602fb..6485d6d72da 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -48,6 +48,7 @@ import 
org.apache.druid.indexing.common.tasklogs.ConsoleLoggingEnforcementConfig
 import org.apache.druid.indexing.common.tasklogs.LogUtils;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
@@ -361,6 +362,14 @@ public class ForkingTaskRunner
                         
command.addSystemProperty("druid.task.executor.enableTlsPort", 
node.isEnableTlsPort());
                         
command.addSystemProperty("log4j2.configurationFactory", 
ConsoleLoggingEnforcementConfigurationFactory.class.getName());
 
+
+                        if (task instanceof SeekableStreamIndexTask) {
+                          final Integer serverPriority = 
((SeekableStreamIndexTask) task).getServerPriority();
+                          if (serverPriority != null) {
+                            command.addSystemProperty("druid.server.priority", 
serverPriority);
+                          }
+                        }
+
                         
command.addSystemProperty("druid.indexer.task.baseTaskDir", 
storageSlot.getDirectory().getAbsolutePath());
                         
command.addSystemProperty("druid.indexer.task.tmpStorageBytesPerTask", 
storageSlot.getNumBytes());
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index d7987221824..390fe68cfee 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -55,6 +55,7 @@ import org.apache.druid.segment.realtime.ChatHandler;
 import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
+import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
 
 import javax.annotation.Nullable;
@@ -73,12 +74,15 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   protected final LockGranularity lockGranularityToUse;
   protected final TaskLockType lockTypeToUse;
   protected final String supervisorId;
+  @Nullable
+  protected final Integer serverPriority;
 
   // Lazily initialized to avoid calling it on the overlord when tasks are 
instantiated.
   // See https://github.com/apache/druid/issues/7724 for issues that can cause.
   // By the way, lazily init is synchronized because the runner may be needed 
in multiple threads.
   private final Supplier<SeekableStreamIndexTaskRunner<PartitionIdType, 
SequenceOffsetType, ?>> runnerSupplier;
 
+
   public SeekableStreamIndexTask(
       final String id,
       final @Nullable String supervisorId,
@@ -87,7 +91,8 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
       final SeekableStreamIndexTaskTuningConfig tuningConfig,
       final SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> ioConfig,
       @Nullable final Map<String, Object> context,
-      @Nullable final String groupId
+      @Nullable final String groupId,
+      @Nullable final Integer serverPriority
   )
   {
     super(
@@ -108,6 +113,7 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
                                 : LockGranularity.SEGMENT;
     this.lockTypeToUse = TaskLocks.determineLockTypeForAppend(getContext());
     this.supervisorId = 
Preconditions.checkNotNull(Configs.valueOrDefault(supervisorId, 
dataSchema.getDataSource()), "supervisorId");
+    this.serverPriority = serverPriority;
   }
 
   protected static String getFormattedGroupId(String supervisorId, String type)
@@ -143,6 +149,17 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
     return supervisorId;
   }
 
+  /**
+   * @return the server priority this task belongs to, if specified. When set, 
this value
+   * maps to {@link DruidServerMetadata#getPriority()} on the corresponding 
Peon server.
+   */
+  @JsonProperty
+  @Nullable
+  public Integer getServerPriority()
+  {
+    return serverPriority;
+  }
+
   @JsonProperty
   public SeekableStreamIndexTaskTuningConfig getTuningConfig()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 62c439e12b3..f5128dc3809 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -205,6 +205,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     final ImmutableMap<PartitionIdType, SequenceOffsetType> 
unfilteredStartingSequencesForSequenceName;
 
     final ConcurrentHashMap<String, TaskData> tasks = new 
ConcurrentHashMap<>();
+    final ConcurrentHashMap<String, Integer> taskIdToServerPriority = new 
ConcurrentHashMap<>();
     final DateTime minimumMessageTime;
     final DateTime maximumMessageTime;
     final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions;
@@ -276,6 +277,28 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       return tasks.keySet();
     }
 
+    /**
+     * Removes a task from {@link #tasks} and {@link #taskIdToServerPriority}.
+     */
+    void removeTask(String taskId)
+    {
+      tasks.remove(taskId);
+      taskIdToServerPriority.remove(taskId);
+    }
+
+    /**
+     * Removes the current task entry from {@link #tasks} using the provided 
iterator,
+     * and also removes the corresponding entry from {@link 
#taskIdToServerPriority}.
+     * <p>
+     * Must be called while iterating over {@link #tasks} and relies on
+     * {@link Iterator#remove()} to safely avoid {@link 
java.util.ConcurrentModificationException}.
+     */
+    void removeTask(Iterator<Entry<String, TaskData>> taskDataIterator, String 
taskId)
+    {
+      taskDataIterator.remove();
+      taskIdToServerPriority.remove(taskId);
+    }
+
     void setHandoffEarly()
     {
       handoffEarly = true;
@@ -1711,7 +1734,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       @Nullable DateTime minMsgTime,
       @Nullable DateTime maxMsgTime,
       Set<String> tasks,
-      Set<PartitionIdType> exclusiveStartingSequencePartitions
+      Set<PartitionIdType> exclusiveStartingSequencePartitions,
+      @Nullable Map<String, Integer> taskIdToServerPriority
   )
   {
     TaskGroup group = new TaskGroup(
@@ -1723,6 +1747,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         exclusiveStartingSequencePartitions
     );
     group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> 
new TaskData())));
+    if (taskIdToServerPriority != null) {
+      group.taskIdToServerPriority.putAll(taskIdToServerPriority);
+    }
     if (activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) {
       throw new ISE(
           "trying to add taskGroup with id [%s] to actively reading task 
groups, but group already exists.",
@@ -1739,7 +1766,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       @Nullable DateTime minMsgTime,
       @Nullable DateTime maxMsgTime,
       Set<String> tasks,
-      Set<PartitionIdType> exclusiveStartingSequencePartitions
+      Set<PartitionIdType> exclusiveStartingSequencePartitions,
+      @Nullable Map<String, Integer> taskIdToServerPriority
   )
   {
     TaskGroup group = new TaskGroup(
@@ -1751,6 +1779,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         exclusiveStartingSequencePartitions
     );
     group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> 
new TaskData())));
+    if (taskIdToServerPriority != null) {
+      group.taskIdToServerPriority.putAll(taskIdToServerPriority);
+    }
     pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new 
CopyOnWriteArrayList<>())
                                .add(group);
     return group;
@@ -2316,6 +2347,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                   taskId, taskGroup.groupId, prevTaskData
                               );
                             }
+                            final Integer serverPriority = 
seekableStreamIndexTask.getServerPriority();
+                            if (serverPriority != null) {
+                              
taskGroup.taskIdToServerPriority.putIfAbsent(taskId, serverPriority);
+                            }
                             
verifySameSequenceNameForAllTasksInGroup(taskGroupId);
                           }
                         }
@@ -2518,7 +2553,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         stateManager.recordThrowableEvent(e);
         log.error(e, "Problem while getting checkpoints for task [%s], killing 
the task", taskId);
         killTask(taskId, "Exception[%s] while getting checkpoints", 
e.getClass());
-        taskGroup.tasks.remove(taskId);
+        taskGroup.removeTask(taskId);
       } else if (checkpointResult.valueOrThrow().isEmpty()) {
         log.warn("Ignoring task [%s], as probably it is not started running 
yet", taskId);
       } else {
@@ -2641,7 +2676,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
               taskGroup.checkpointSequences,
               latestOffsetsFromDb
           );
-          taskGroup.tasks.remove(sequenceCheckpoint.lhs);
+          taskGroup.removeTask(sequenceCheckpoint.lhs);
         }
     );
   }
@@ -2712,6 +2747,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return pendingCompletionTaskGroups.get(groupId);
   }
 
+  @VisibleForTesting
+  protected TaskGroup getActiveTaskGroup(int groupId)
+  {
+    return activelyReadingTaskGroups.get(groupId);
+  }
+
   // Sanity check to ensure that tasks have the same sequence name as their 
task group
   private void verifySameSequenceNameForAllTasksInGroup(int groupId)
   {
@@ -3542,7 +3583,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           if (task.status.isRunnable()) {
             if 
(taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
               killTask(taskId, "Killing task [%s] which hasn't been assigned 
to a worker", taskId);
-              i.remove();
+              taskGroup.removeTask(i, taskId);
             }
           }
         }
@@ -3579,11 +3620,11 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                     taskId,
                     pauseException
                 );
-                taskGroup.tasks.remove(taskId);
+                taskGroup.removeTask(taskId);
 
               } else if (input.get(i).valueOrThrow() == null || 
input.get(i).valueOrThrow().isEmpty()) {
                 killTask(taskId, "Task [%s] returned empty offsets after 
pause", taskId);
-                taskGroup.tasks.remove(taskId);
+                taskGroup.removeTask(taskId);
               } else { // otherwise build a map of the highest sequences seen
                 for (Entry<PartitionIdType, SequenceOffsetType> sequence : 
input.get(i).valueOrThrow().entrySet()) {
                   if (!endOffsets.containsKey(sequence.getKey())
@@ -3632,7 +3673,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                       } else {
                         String taskId = setEndOffsetTaskIds.get(i);
                         killTask(taskId, "Failed to set end offsets, killing 
task");
-                        taskGroup.tasks.remove(taskId);
+                        taskGroup.removeTask(taskId);
                       }
                     }
                   }
@@ -3724,7 +3765,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
           if (taskData.status.isFailure()) {
             stateManager.recordCompletedTaskState(TaskState.FAILED);
-            iTask.remove(); // remove failed task
+            group.removeTask(iTask, taskId); // remove failed task
             if (group.tasks.isEmpty()) {
               // if all tasks in the group have failed, just nuke all task 
groups with this partition set and restart
               entireTaskGroupFailed = true;
@@ -3820,7 +3861,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         if (!isTaskCurrent(groupId, taskId, activeTaskMap)) {
           log.info("Stopping task[%s] as it does not match the expected 
sequence range and ingestion spec.", taskId);
           futures.add(stopTask(taskId, false));
-          iTasks.remove();
+          taskGroup.removeTask(iTasks, taskId);
           continue;
         }
 
@@ -3829,7 +3870,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         // remove failed tasks
         if (taskData.status.isFailure()) {
           stateManager.recordCompletedTaskState(TaskState.FAILED);
-          iTasks.remove();
+          taskGroup.removeTask(iTasks, taskId);
           continue;
         }
 
@@ -4206,6 +4247,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private void createTasksForGroup(int groupId, int replicas)
       throws JsonProcessingException
   {
+
     TaskGroup group = activelyReadingTaskGroups.get(groupId);
     Map<PartitionIdType, SequenceOffsetType> startPartitions = 
group.startingSequences;
     Map<PartitionIdType, SequenceOffsetType> endPartitions = new HashMap<>();
@@ -4230,6 +4272,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         ioConfig
     );
 
+    final List<Integer> unassignedServerPriorities = 
computeUnassignedServerPriorities(group, replicas);
+
     List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, 
RecordType>> taskList = createIndexTasks(
         replicas,
         group.baseSequenceName,
@@ -4237,10 +4281,19 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         group.checkpointSequences,
         newIoConfig,
         taskTuningConfig,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        unassignedServerPriorities
     );
 
     for (SeekableStreamIndexTask indexTask : taskList) {
+      final String taskId = indexTask.getId();
+      final Integer serverPriority = indexTask.getServerPriority();
+
+      if (serverPriority != null) {
+        log.info("Adding serverPriority[%d] for task[%s] and groupId[%s]", 
serverPriority, taskId, groupId);
+        group.taskIdToServerPriority.put(taskId, serverPriority);
+      }
+
       Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
       if (taskQueue.isPresent()) {
         try {
@@ -4256,6 +4309,51 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  /**
+   * Computes the remaining unassigned server priorities for the given task 
group.
+   *
+   * @return the unassigned server priorities, or {@code null} if
+   *         {@link 
SeekableStreamSupervisorIOConfig#getServerPriorityToReplicas()} is not 
configured
+   */
+  @Nullable
+  public List<Integer> computeUnassignedServerPriorities(final TaskGroup 
group, final int replicas)
+  {
+    final Map<Integer, Integer> serverPriorityToReplicas = 
ioConfig.getServerPriorityToReplicas();
+    if (serverPriorityToReplicas == null) {
+      return null;
+    }
+
+    // Build the full list of all required priorities and sort them from 
highest to lowest priorities
+    final List<Integer> allRequiredPriorities = new ArrayList<>();
+    for (Map.Entry<Integer, Integer> entry : 
serverPriorityToReplicas.entrySet()) {
+      for (int i = 0; i < entry.getValue(); i++) {
+        allRequiredPriorities.add(entry.getKey());
+      }
+    }
+    allRequiredPriorities.sort(Collections.reverseOrder());
+
+    // Remove already assigned priorities
+    final List<Integer> assignedPriorities = new 
ArrayList<>(group.taskIdToServerPriority.values());
+    final List<Integer> unassignedServerPriorities = new 
ArrayList<>(allRequiredPriorities);
+    for (Integer assigned : assignedPriorities) {
+      unassignedServerPriorities.remove(assigned);
+    }
+
+    log.info(
+        "Server priorities[%s] to be assigned for new tasks in taskGroupId[%d] 
with replicas[%d]. Task server priorities[%s] have already been assigned to 
tasks[%s].",
+        unassignedServerPriorities, group.groupId, replicas, 
group.taskIdToServerPriority.values(), group.taskIds()
+    );
+
+    if (unassignedServerPriorities.size() < replicas) {
+      // Sanity check: if this triggers, this suggests a bug as the invariant 
should already be validated in the ctr.
+      throw DruidException.defensive(
+          "Found unassignedServerPriorities[%s] of size[%d] < total 
replicas[%d] for taskGroupId[%d]. Task server priorities[%s] have already been 
assigned to tasks[%s].",
+          unassignedServerPriorities, unassignedServerPriorities.size(), 
replicas, group.groupId, assignedPriorities, group.taskIds()
+      );
+    }
+    return unassignedServerPriorities;
+  }
+
   /**
    * monitoring method, fetches current partition offsets and lag in a 
background reporting thread
    */
@@ -4495,11 +4593,11 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   );
 
   /**
-   * creates a list of specific kafka/kinesis index tasks using
-   * the given replicas count
+   * Creates a list of {@link SeekableStreamIndexTask}s using the given {@code 
replicas} and
+   * {@code serverPrioritiesToAssign} for each task, if it is 
non-null/non-empty.
    *
-   * @return list of specific kafka/kinesis index taksks
-   * @throws JsonProcessingException
+   * @return a list of {@link SeekableStreamIndexTask}s
+   * @throws JsonProcessingException if task serialization fails
    */
   protected abstract List<SeekableStreamIndexTask<PartitionIdType, 
SequenceOffsetType, RecordType>> createIndexTasks(
       int replicas,
@@ -4508,7 +4606,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> 
sequenceOffsets,
       SeekableStreamIndexTaskIOConfig taskIoConfig,
       SeekableStreamIndexTaskTuningConfig taskTuningConfig,
-      RowIngestionMetersFactory rowIngestionMetersFactory
+      RowIngestionMetersFactory rowIngestionMetersFactory,
+      @Nullable List<Integer> serverPrioritiesToAssign
   ) throws JsonProcessingException;
 
   /**
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 0de06a63949..9386ac365f8 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -31,6 +31,7 @@ import org.joda.time.Duration;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
+import java.util.Map;
 
 
 public abstract class SeekableStreamSupervisorIOConfig
@@ -51,6 +52,7 @@ public abstract class SeekableStreamSupervisorIOConfig
   @Nullable private final AutoScalerConfig autoScalerConfig;
   @Nullable private final IdleConfig idleConfig;
   @Nullable private final Integer stopTaskCount;
+  @Nullable private final Map<Integer, Integer> serverPriorityToReplicas;
 
   private final LagAggregator lagAggregator;
   private final boolean autoScalerEnabled;
@@ -71,12 +73,12 @@ public abstract class SeekableStreamSupervisorIOConfig
       LagAggregator lagAggregator,
       DateTime lateMessageRejectionStartDateTime,
       @Nullable IdleConfig idleConfig,
-      @Nullable Integer stopTaskCount
+      @Nullable Integer stopTaskCount,
+      @Nullable Map<Integer, Integer> serverPriorityToReplicas
   )
   {
     this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
     this.inputFormat = inputFormat;
-    this.replicas = replicas != null ? replicas : 1;
 
     InvalidInput.conditionalException(
         lagAggregator != null,
@@ -119,6 +121,36 @@ public abstract class SeekableStreamSupervisorIOConfig
     }
 
     this.idleConfig = idleConfig;
+    this.serverPriorityToReplicas = serverPriorityToReplicas;
+
+    if (this.serverPriorityToReplicas != null) {
+      int serverPriorityReplicas = 0;
+      for (Map.Entry<Integer, Integer> entry : 
this.serverPriorityToReplicas.entrySet()) {
+        final Integer serverReplica = entry.getValue();
+
+        if (serverReplica == null || serverReplica < 0) {
+          throw InvalidInput.exception(
+              "Found invalid server replica[%d] for priority[%d] in 
serverPriorityToReplicas[%s]. Replicas must be >= 0.",
+              serverReplica, entry.getKey(), serverPriorityToReplicas
+          );
+        }
+
+        serverPriorityReplicas += serverReplica;
+      }
+
+      if (replicas != null && replicas != serverPriorityReplicas) {
+        throw InvalidInput.exception(
+            "Configured replicas[%d] does not match the sum of replicas[%d] 
specified in serverPriorityToReplicas[%s]."
+            + " To avoid ambiguity, consider removing [ioConfig.replicas] in 
favor of [ioConfig.serverPriorityToReplicas].",
+            replicas, serverPriorityReplicas, serverPriorityToReplicas
+        );
+      }
+
+      // We also explicitly set replicas since the supervisor logic for 
replicas is already implemented.
+      this.replicas = serverPriorityReplicas;
+    } else {
+      this.replicas = replicas != null ? replicas : 1;
+    }
   }
 
   private static Duration defaultDuration(final Period period, final String 
theDefault)
@@ -145,6 +177,13 @@ public abstract class SeekableStreamSupervisorIOConfig
     return replicas;
   }
 
+  @Nullable
+  @JsonProperty
+  public Map<Integer, Integer> getServerPriorityToReplicas()
+  {
+    return serverPriorityToReplicas;
+  }
+
   @Nullable
   @JsonProperty
   public AutoScalerConfig getAutoScalerConfig()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index 4b722be02e5..9b54725293a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -24,20 +24,28 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.indexing.common.TaskStorageDirTracker;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskConfigBuilder;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
+import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask;
+import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTaskIOConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
 import org.apache.druid.tasklogs.NoopTaskLogs;
@@ -60,6 +68,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 
@@ -591,6 +600,78 @@ public class ForkingTaskRunnerTest
     assertEquals(TaskState.SUCCESS, status.getStatusCode());
   }
 
+  @Test
+  public void testTaskCommandIncludesServerPriorityIfConfigured() throws 
Exception
+  {
+    final TaskConfig taskConfig = makeDefaultTaskConfigBuilder().build();
+    final WorkerConfig workerConfig = new WorkerConfig();
+    final ObjectMapper mapper = new DefaultObjectMapper();
+    final Task task = new TestSeekableStreamIndexTask(
+        "id2",
+        null,
+        null,
+        DataSchema.builder()
+                  .withDataSource("foo")
+                  .withTimestamp(new TimestampSpec(null, null, null))
+                  .withDimensions(new DimensionsSpec(List.of()))
+                  .withGranularity(new ArbitraryGranularitySpec(new 
AllGranularity(), List.of()))
+                  .build(),
+        
SeekableStreamSupervisorStateTest.createSupervisorTuningConfig().convertToTaskTuningConfig(),
+        new TestSeekableStreamIndexTaskIOConfig(),
+        null,
+        "1",
+        null,
+        null,
+        2
+    );
+
+    final AtomicReference<List<String>> observedCommandRef = new 
AtomicReference<>(List.of());
+
+    final ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        taskConfig,
+        workerConfig,
+        new Properties(),
+        new NoopTaskLogs(),
+        mapper,
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig(),
+        TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, 
TaskLocation taskLocation) throws IOException
+      {
+        observedCommandRef.set(command);
+        for (String param : command) {
+          if (param.endsWith(task.getId())) {
+            final String basePath = 
getTracker().pickStorageSlot(task.getId()).getDirectory().getAbsolutePath();
+            File resultFile = Paths.get(basePath, task.getId(), "attempt", 
"1", "status.json").toFile();
+            mapper.writeValue(resultFile, TaskStatus.success(task.getId()));
+            break;
+          }
+        }
+        MockTestProcess mockTestProcess = new MockTestProcess()
+        {
+          @Override
+          public int waitFor()
+          {
+            return 0;
+          }
+        };
+        return new ForkingTaskRunner.ProcessHolder(mockTestProcess, logFile, 
taskLocation);
+      }
+    };
+
+
+    forkingTaskRunner.setNumProcessorsPerTask();
+    final TaskStatus status = forkingTaskRunner.run(task).get();
+    Assert.assertNotNull(observedCommandRef);
+    final List<String> observedCommand = observedCommandRef.get();
+    Assert.assertTrue(observedCommand.contains("-Ddruid.server.priority=2"));
+    assertEquals(TaskState.SUCCESS, status.getStatusCode());
+  }
+
   public static TaskConfigBuilder makeDefaultTaskConfigBuilder()
   {
     return new TaskConfigBuilder()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
index 6672b931400..63387ccffbd 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.seekablestream;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.data.input.impl.ByteEntity;
-import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
@@ -33,7 +32,6 @@ import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecor
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
-import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.AllGranularity;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
@@ -51,7 +49,6 @@ import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.server.security.ResourceType;
 import org.easymock.EasyMock;
-import org.joda.time.Duration;
 import org.joda.time.Period;
 import org.junit.Before;
 import org.junit.Rule;
@@ -375,7 +372,7 @@ public class SeekableStreamIndexTaskRunnerAuthTest
         SeekableStreamIndexTaskIOConfig<String, String> ioConfig
     )
     {
-      super(id, null, null, dataSchema, tuningConfig, ioConfig, null, null);
+      super(id, null, null, dataSchema, tuningConfig, ioConfig, null, null, 
null);
     }
 
     @Override
@@ -397,24 +394,6 @@ public class SeekableStreamIndexTaskRunnerAuthTest
     }
   }
 
-  private static class TestSeekableStreamIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<String, String>
-  {
-    public TestSeekableStreamIndexTaskIOConfig()
-    {
-      super(
-          null,
-          "someSequence",
-          new SeekableStreamStartSequenceNumbers<>("abc", "def", 
Collections.emptyMap(), Collections.emptyMap(), null),
-          new SeekableStreamEndSequenceNumbers<>("abc", "def", 
Collections.emptyMap(), Collections.emptyMap()),
-          false,
-          DateTimes.nowUtc().minusDays(2),
-          DateTimes.nowUtc(),
-          new CsvInputFormat(null, null, true, null, 0, null),
-          Duration.standardHours(2).getStandardMinutes()
-      );
-    }
-  }
-
   /**
    * Usernames used in the tests.
    */
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
index 169bbc4a788..97456165f36 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
@@ -345,6 +345,7 @@ public class SeekableStreamSamplerSpecTest extends 
EasyMockSupport
           LagAggregator.DEFAULT,
           lateMessageRejectionStartDateTime,
           idleConfig,
+          null,
           null
       );
     }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
index 985aa7da706..6bd905c57d7 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
@@ -63,7 +63,24 @@ public class TestSeekableStreamIndexTask extends 
SeekableStreamIndexTask<String,
       @Nullable RecordSupplier<String, String, ByteEntity> recordSupplier
   )
   {
-    super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, 
context, groupId);
+    this(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, 
context, groupId, streamingTaskRunner, recordSupplier, null);
+  }
+
+  public TestSeekableStreamIndexTask(
+      String id,
+      @Nullable String supervisorId,
+      @Nullable TaskResource taskResource,
+      DataSchema dataSchema,
+      SeekableStreamIndexTaskTuningConfig tuningConfig,
+      SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
+      @Nullable Map<String, Object> context,
+      @Nullable String groupId,
+      @Nullable SeekableStreamIndexTaskRunner<String, String, ByteEntity> 
streamingTaskRunner,
+      @Nullable RecordSupplier<String, String, ByteEntity> recordSupplier,
+      @Nullable Integer serverPriority
+  )
+  {
+    super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig, 
context, groupId, serverPriority);
     this.streamingTaskRunner = streamingTaskRunner;
     this.recordSupplier = recordSupplier;
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTaskIOConfig.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTaskIOConfig.java
new file mode 100644
index 00000000000..1fa7993b69a
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTaskIOConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.java.util.common.DateTimes;
+import org.joda.time.Duration;
+
+import java.util.Collections;
+
+public class TestSeekableStreamIndexTaskIOConfig extends 
SeekableStreamIndexTaskIOConfig<String, String>
+{
+  public TestSeekableStreamIndexTaskIOConfig()
+  {
+    super(
+        null,
+        "someSequence",
+        new SeekableStreamStartSequenceNumbers<>("abc", "def", 
Collections.emptyMap(), Collections.emptyMap(), null),
+        new SeekableStreamEndSequenceNumbers<>("abc", "def", 
Collections.emptyMap(), Collections.emptyMap()),
+        false,
+        DateTimes.nowUtc().minusDays(2),
+        DateTimes.nowUtc(),
+        new CsvInputFormat(null, null, true, null, 0, null),
+        Duration.standardHours(2).getStandardMinutes()
+    );
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
index 56b8904cc18..3974b9bebca 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
@@ -21,19 +21,31 @@ package org.apache.druid.indexing.seekablestream.supervisor;
 
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.hamcrest.MatcherAssert;
 import org.joda.time.Duration;
 import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+import java.util.Map;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class SeekableStreamSupervisorIOConfigTest
 {
+
+  private final Map<Integer, Integer> serverPriorityToReplicas = Map.of(
+      1, 2,
+      2, 3
+  );
+
   @Test
   public void testAllDefaults()
   {
@@ -56,6 +68,7 @@ public class SeekableStreamSupervisorIOConfigTest
         lagAggregator,
         null,
         null,
+        null,
         null
     )
     {
@@ -77,6 +90,7 @@ public class SeekableStreamSupervisorIOConfigTest
     Assert.assertNull(config.getStopTaskCount());
     Assert.assertEquals(lagAggregator, config.getLagAggregator());
     Assert.assertEquals(1, config.getMaxAllowedStops());
+    Assert.assertNull(config.getServerPriorityToReplicas());
   }
 
   @Test
@@ -106,6 +120,7 @@ public class SeekableStreamSupervisorIOConfigTest
         lagAggregator,
         null,
         null,
+        null,
         null
     )
     {
@@ -130,6 +145,7 @@ public class SeekableStreamSupervisorIOConfigTest
         lagAggregator,
         null,
         null,
+        null,
         null
     )
     {
@@ -161,6 +177,7 @@ public class SeekableStreamSupervisorIOConfigTest
             lagAggregator,
             DateTimes.nowUtc(),
             null,
+            null,
             null
         )
         {
@@ -195,6 +212,7 @@ public class SeekableStreamSupervisorIOConfigTest
             null,
             null,
             null,
+            null,
             null
         )
         {
@@ -227,6 +245,7 @@ public class SeekableStreamSupervisorIOConfigTest
         lagAggregator,
         null,
         null,
+        null,
         null
     )
     {
@@ -250,7 +269,8 @@ public class SeekableStreamSupervisorIOConfigTest
         lagAggregator,
         null,
         null,
-        3
+        3,
+        null
     )
     {
     };
@@ -285,7 +305,8 @@ public class SeekableStreamSupervisorIOConfigTest
         lagAggregator,
         null,
         null,
-        1
+        1,
+        null
     )
     {
     };
@@ -317,7 +338,8 @@ public class SeekableStreamSupervisorIOConfigTest
         lagAggregator,
         null,
         null,
-        1
+        1,
+        null
     )
     {
     };
@@ -346,6 +368,7 @@ public class SeekableStreamSupervisorIOConfigTest
         lagAggregator,
         null,
         null,
+        null,
         null
     )
     {
@@ -353,4 +376,89 @@ public class SeekableStreamSupervisorIOConfigTest
 
     Assert.assertEquals(10, config3.getMaxAllowedStops());
   }
+
+  @Test
+  public void testReplicasIsSetWhenserverPriorityToReplicas()
+  {
+    final SeekableStreamSupervisorIOConfig config = 
makeSeekableStreamSupervisorIOConfig(null, serverPriorityToReplicas);
+    Assert.assertEquals(serverPriorityToReplicas, 
config.getServerPriorityToReplicas());
+    Assert.assertEquals(Integer.valueOf(5), config.getReplicas());
+  }
+
+  @Test
+  public void testReplicasOnlyConfig()
+  {
+    final SeekableStreamSupervisorIOConfig config = 
makeSeekableStreamSupervisorIOConfig(4, null);
+    Assert.assertEquals(Integer.valueOf(4), config.getReplicas());
+    Assert.assertNull(config.getServerPriorityToReplicas());
+  }
+
+  @Test
+  public void testMatchingReplicasAndServerPriority()
+  {
+    final SeekableStreamSupervisorIOConfig config = 
makeSeekableStreamSupervisorIOConfig(5, serverPriorityToReplicas);
+    Assert.assertEquals(Integer.valueOf(5), config.getReplicas());
+    Assert.assertEquals(serverPriorityToReplicas, 
config.getServerPriorityToReplicas());
+  }
+
+  @Test
+  public void 
testMismatchBetweenReplicasAndServerPriorityReplicasThrowsException()
+  {
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> makeSeekableStreamSupervisorIOConfig(3, 
serverPriorityToReplicas)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            StringUtils.format(
+                "Configured replicas[3] does not match the sum of replicas[5] 
specified in serverPriorityToReplicas[%s]."
+                + " To avoid ambiguity, consider removing [ioConfig.replicas] 
in favor of [ioConfig.serverPriorityToReplicas].",
+                serverPriorityToReplicas
+            )
+        )
+    );
+  }
+
+  @Test
+  public void testNegativeReplicasThrowsException()
+  {
+    final Map<Integer, Integer> invalidServerPriorityToReplicas = Map.of(0, 2, 
1, -1);
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> makeSeekableStreamSupervisorIOConfig(null, 
invalidServerPriorityToReplicas)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            StringUtils.format(
+                "Found invalid server replica[-1] for priority[1] in 
serverPriorityToReplicas[%s]. Replicas must be >= 0.",
+                invalidServerPriorityToReplicas
+            )
+        )
+    );
+  }
+
+  private SeekableStreamSupervisorIOConfig 
makeSeekableStreamSupervisorIOConfig(@Nullable Integer replicas, @Nullable 
Map<Integer, Integer> serverPriorityToReplicas)
+  {
+    return new SeekableStreamSupervisorIOConfig(
+        "stream",
+        null,
+        replicas,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        mock(LagAggregator.class),
+        null,
+        null,
+        null,
+        serverPriorityToReplicas
+    )
+    {
+    };
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index db805c21187..d2660937f8a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -794,6 +794,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
         LagAggregator.DEFAULT,
         null,
         null,
+        null,
         null
     )
     {
@@ -851,6 +852,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
         LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, null),
+        null,
         null
     )
     {
@@ -1221,7 +1223,8 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
         null,
         null,
         Set.of("dummyTask"),
-        Collections.emptySet()
+        Collections.emptySet(),
+        null
     );
 
     supervisor.start();
@@ -1469,6 +1472,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
           LagAggregator.DEFAULT,
           null,
           null,
+          null,
           null
       )
       {
@@ -1490,6 +1494,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
           LagAggregator.DEFAULT,
           null,
           null,
+          null,
           null
       )
       {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 71e798e7e2f..db37e3014e3 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -695,6 +695,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, 200L),
+        null,
         null
     )
     {
@@ -801,6 +802,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, 200L),
+        null,
         null
     )
     {
@@ -1100,6 +1102,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, 200L),
+        null,
         null
     ) {};
 
@@ -1318,7 +1321,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, 200L),
-        stopTaskCount
+        stopTaskCount,
+        null
     )
     {
     };
@@ -1553,6 +1557,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         LagAggregator.DEFAULT,
         null,
         new IdleConfig(true, 200L),
+        null,
         null
     )
     {
@@ -1889,7 +1894,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -1898,7 +1904,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
     Map<String, Map<String, Object>> stats = supervisor.getStats();
 
@@ -1935,7 +1942,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -1944,7 +1952,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount());
@@ -1995,7 +2004,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     final DataSourceMetadata resetMetadata = new 
TestSeekableStreamDataSourceMetadata(
@@ -2042,7 +2052,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task0"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
     final SeekableStreamSupervisor.TaskGroup taskGroup1 = 
supervisor.addTaskGroupToActivelyReadingTaskGroup(
         supervisor.getTaskGroupIdForPartition("1"),
@@ -2050,7 +2061,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
     final SeekableStreamSupervisor.TaskGroup taskGroup2 = 
supervisor.addTaskGroupToPendingCompletionTaskGroup(
         supervisor.getTaskGroupIdForPartition("2"),
@@ -2058,7 +2070,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     final PendingSegmentRecord pendingSegmentRecord0 = 
PendingSegmentRecord.create(
@@ -2111,6 +2124,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
             )
         )
     );
+
     
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(SUPERVISOR_ID,
 new TestSeekableStreamDataSourceMetadata(
         new SeekableStreamEndSequenceNumbers<>(
             "stream",
@@ -2136,7 +2150,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2145,7 +2160,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2154,7 +2170,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task3"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     final DataSourceMetadata resetMetadata = new 
TestSeekableStreamDataSourceMetadata(
@@ -2207,7 +2224,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2216,7 +2234,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2225,7 +2244,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task3"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     final DataSourceMetadata resetMetadata = new 
TestSeekableStreamDataSourceMetadata(
@@ -2282,7 +2302,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2291,7 +2312,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     final DataSourceMetadata resetMetadata = new 
TestSeekableStreamDataSourceMetadata(
@@ -2349,7 +2371,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2358,7 +2381,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     final DataSourceMetadata resetMetadata = new 
TestSeekableStreamDataSourceMetadata(
@@ -2392,7 +2416,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -2401,7 +2426,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     verifyAll();
@@ -2431,7 +2457,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -2440,7 +2467,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     verifyAll();
@@ -2481,7 +2509,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task1"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -2490,7 +2519,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null,
         ImmutableSet.of("task2"),
-        ImmutableSet.of()
+        ImmutableSet.of(),
+        null
     );
 
     verifyAll();
@@ -2649,7 +2679,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         null,
         null
     );
-    SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(1, 
autoScalerConfig);
+    SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(1, 
autoScalerConfig, null);
     Assert.assertEquals(
         taskCountMax / DEFAULT_TASKS_PER_WORKER_THREAD,
         SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
@@ -2678,6 +2708,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         LagAggregator.DEFAULT,
         null,
         null,
+        null,
         null
     )
     {
@@ -2741,7 +2772,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         lagAggregator,
         null,
         null,
-        1 // ensure this is overridden
+        1, // ensure this is overridden
+        null
     )
     {
     };
@@ -2773,6 +2805,72 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void 
testComputeUnassignedServerPriorities_whenMultipleReplicasPerPriorityIsSet()
+  {
+    final SeekableStreamSupervisorIOConfig ioConfig = 
createSupervisorIOConfig(5, Map.of(10, 2, 20, 3));
+
+    Assert.assertEquals(5, (int) ioConfig.getReplicas());
+
+    EasyMock.reset(spec);
+    EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
+    replayAll();
+
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    // No tasks assigned yet, need all 5 replicas
+    final SeekableStreamSupervisor<String, String, ByteEntity>.TaskGroup 
taskGroup1 =
+        supervisor.addTaskGroupToActivelyReadingTaskGroup(
+            0,
+            ImmutableMap.of("0", "0"),
+            null,
+            null,
+            Set.of(),
+            Set.of(),
+            Map.of()
+        );
+
+    Assert.assertEquals(List.of(20, 20, 20, 10, 10), 
supervisor.computeUnassignedServerPriorities(taskGroup1, 5));
+
+    // Two tasks with priority 10 already assigned, need 3 more replicas
+    final SeekableStreamSupervisor<String, String, ByteEntity>.TaskGroup 
taskGroup2 =
+        supervisor.addTaskGroupToActivelyReadingTaskGroup(
+            1,
+            ImmutableMap.of("0", "0"),
+            null,
+            null,
+            Set.of("task1", "task2"),
+            Set.of(),
+            Map.of("task1", 10, "task2", 10)
+        );
+
+    Assert.assertEquals(List.of(20, 20, 20), 
supervisor.computeUnassignedServerPriorities(taskGroup2, 3));
+
+    // Two tasks with priority 10 and one with priority 20 assigned, need 2 
more replicas
+    final SeekableStreamSupervisor<String, String, ByteEntity>.TaskGroup 
taskGroup3 =
+        supervisor.addTaskGroupToActivelyReadingTaskGroup(
+            2,
+            ImmutableMap.of("0", "0"),
+            null,
+            null,
+            Set.of("task1", "task2", "task3"),
+            Set.of(),
+            Map.of("task1", 10, "task2", 10, "task3", 20)
+        );
+
+    Assert.assertEquals(List.of(20, 20), 
supervisor.computeUnassignedServerPriorities(taskGroup3, 2));
+
+    verifyAll();
+  }
+
   private static DataSchema getDataSchema()
   {
     List<DimensionSchema> dimensions = new ArrayList<>();
@@ -2796,18 +2894,27 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
 
   private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig()
   {
-    return createSupervisorIOConfig(1, 
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class));
+    return createSupervisorIOConfig(1, 
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class), null);
   }
 
   private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig(
       int taskCount,
-      @Nullable AutoScalerConfig autoScalerConfig
+      @Nullable Map<Integer, Integer> serverPriorityToReplicas
+  )
+  {
+    return createSupervisorIOConfig(taskCount, null, serverPriorityToReplicas);
+  }
+
+  private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig(
+      int taskCount,
+      @Nullable AutoScalerConfig autoScalerConfig,
+      @Nullable Map<Integer, Integer> serverPriorityToReplicas
   )
   {
     return new SeekableStreamSupervisorIOConfig(
         "stream",
         new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
-        1,
+        serverPriorityToReplicas == null ? 1 : 
serverPriorityToReplicas.values().stream().mapToInt(Integer::intValue).sum(),
         taskCount,
         new Period("PT1H"),
         new Period("P1D"),
@@ -2820,7 +2927,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         LagAggregator.DEFAULT,
         null,
         null,
-        null
+        null,
+        serverPriorityToReplicas
     )
     {
     };
@@ -2851,7 +2959,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     return createSupervisorTuningConfigWithWorkerThreads(1);
   }
 
-  private static SeekableStreamSupervisorTuningConfig 
createSupervisorTuningConfig()
+  public static SeekableStreamSupervisorTuningConfig 
createSupervisorTuningConfig()
   {
     return createSupervisorTuningConfigWithWorkerThreads(null);
   }
@@ -3020,7 +3128,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         TreeMap<Integer, Map<String, String>> sequenceOffsets,
         SeekableStreamIndexTaskIOConfig taskIoConfig,
         SeekableStreamIndexTaskTuningConfig taskTuningConfig,
-        RowIngestionMetersFactory rowIngestionMetersFactory
+        RowIngestionMetersFactory rowIngestionMetersFactory,
+        @Nullable List<Integer> serverPrioritiesToAssign
     )
     {
       return ImmutableList.of(new TestSeekableStreamIndexTask(
@@ -3375,4 +3484,102 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     {
     };
   }
+
+  @Test
+  public void testDiscoverExistingTasks_withServerPriorities()
+  {
+    final SeekableStreamSupervisorIOConfig ioConfig = 
createSupervisorIOConfig(5, Map.of(10, 2, 20, 3));
+
+    Assert.assertEquals(5, (int) ioConfig.getReplicas());
+
+    final SeekableStreamIndexTaskIOConfig taskIoConfig = createTaskIoConfigExt(
+        0,
+        Map.of("0", "0"),
+        Map.of("0", "10"),
+        "test",
+        DateTimes.nowUtc(),
+        null,
+        Set.of(),
+        ioConfig
+    );
+
+    final TestSeekableStreamIndexTask task1 = createTestTask("task1", "0", 20, 
taskIoConfig, recordSupplier);
+    final TestSeekableStreamIndexTask task2 = createTestTask("task2", "0", 20, 
taskIoConfig, recordSupplier);
+    final TestSeekableStreamIndexTask task3 = createTestTask("task3", "0", 10, 
taskIoConfig, recordSupplier);
+
+    // Reset the spec mock to return our custom ioConfig
+    EasyMock.reset(spec);
+    EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
+    
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of("0")).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("task1")).andReturn(Optional.of(TaskStatus.running("task1"))).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("task2")).andReturn(Optional.of(TaskStatus.running("task2"))).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("task3")).andReturn(Optional.of(TaskStatus.running("task3"))).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("task1")).andReturn(Optional.of(task1)).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("task2")).andReturn(Optional.of(task2)).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("task3")).andReturn(Optional.of(task3)).anyTimes();
+
+    EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE))
+            .andReturn(Map.of(task1.getId(), task1, task2.getId(), task2, 
task3.getId(), task3))
+            .anyTimes();
+
+    EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.anyString(), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(new TreeMap<>()))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.getStatusAsync(EasyMock.anyString()))
+            
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.getStartTimeAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+            .anyTimes();
+    
EasyMock.expect(indexTaskClient.getCurrentOffsetsAsync(EasyMock.anyString(), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of("0", "5")))
+            .anyTimes();
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes();
+    
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+    replayAll();
+
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+    supervisor.start();
+    supervisor.runInternal();
+
+    // Verify that tasks were discovered and their server priorities were 
captured
+    SeekableStreamSupervisor.TaskGroup taskGroup = 
supervisor.getActiveTaskGroup(0);
+    Assert.assertEquals(0, taskGroup.groupId);
+    Assert.assertEquals(3, taskGroup.tasks.size());
+
+    // Verify server priorities were correctly stored
+    Map<String, Integer> taskIdToServerPriority = 
taskGroup.taskIdToServerPriority;
+    Assert.assertEquals(Integer.valueOf(20), 
taskIdToServerPriority.get("task1"));
+    Assert.assertEquals(Integer.valueOf(20), 
taskIdToServerPriority.get("task2"));
+    Assert.assertEquals(Integer.valueOf(10), 
taskIdToServerPriority.get("task3"));
+
+    verifyAll();
+  }
+
+
+  private static TestSeekableStreamIndexTask createTestTask(String taskId, 
String groupId, @Nullable Integer serverPriority, 
SeekableStreamIndexTaskIOConfig taskIoConfig, RecordSupplier recordSupplier)
+  {
+    return new TestSeekableStreamIndexTask(
+        taskId,
+        null,
+        null,
+        getDataSchema(),
+        getTuningConfig().convertToTaskTuningConfig(),
+        taskIoConfig,
+        null,
+        groupId,
+        null,
+        recordSupplier,
+        serverPriority
+    );
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
index 1d814b63461..35d063e88e1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
@@ -177,7 +177,8 @@ public abstract class SeekableStreamSupervisorTestBase
         TreeMap<Integer, Map<String, String>> sequenceOffsets,
         SeekableStreamIndexTaskIOConfig taskIoConfig,
         SeekableStreamIndexTaskTuningConfig taskTuningConfig,
-        RowIngestionMetersFactory rowIngestionMetersFactory
+        RowIngestionMetersFactory rowIngestionMetersFactory,
+        @Nullable List<Integer> serverPrioritiesToAssign
     )
     {
       return null;
@@ -535,6 +536,7 @@ public abstract class SeekableStreamSupervisorTestBase
         LagAggregator.DEFAULT,
         null,
         null,
+        null,
         null
     )
     {
diff --git a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java 
b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java
index 962b716a5b2..de3812a33ee 100644
--- a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java
+++ b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java
@@ -437,7 +437,7 @@ public class CliPeonTest
     )
     {
 
-      super(id, supervisorId, taskResource, dataSchema, tuningConfig, 
ioConfig, context, groupId);
+      super(id, supervisorId, taskResource, dataSchema, tuningConfig, 
ioConfig, context, groupId, null);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to