This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f17b4f44594 Extend server priorities to realtime indexing tasks for
query isolation (#19040)
f17b4f44594 is described below
commit f17b4f445949c5250524823e10fb78cd58259e45
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Mar 2 22:49:00 2026 -0800
Extend server priorities to realtime indexing tasks for query isolation
(#19040)
Fixes https://github.com/apache/druid/issues/19018
Added serverPriorityToReplicas parameter to the streaming supervisor specs
(kafka/kinesis/rabbit). This allows operators to distribute task replicas
across different server priorities for realtime indexing tasks. Similar to
historical tiering, this enables query isolation for mixed workload scenarios
on the Peons, allowing some task replicas to handle queries of specific
priorities. This config is optional and is compatible with the existing
replicas property if specified. If unspecif [...]
---
docs/ingestion/supervisor.md | 3 +-
.../embedded/indexing/StreamIndexTestBase.java | 3 +-
.../embedded/kinesis/KinesisDataFormatsTest.java | 3 +-
.../rabbitstream/RabbitStreamIndexTask.java | 6 +-
.../supervisor/RabbitStreamSupervisor.java | 8 +-
.../supervisor/RabbitStreamSupervisorIOConfig.java | 6 +-
.../supervisor/RabbitStreamSupervisorTest.java | 9 +-
.../druid/indexing/kafka/KafkaIndexTask.java | 6 +-
.../indexing/kafka/supervisor/KafkaSupervisor.java | 7 +-
.../kafka/supervisor/KafkaSupervisorIOConfig.java | 6 +-
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 3 +-
.../kafka/supervisor/KafkaIOConfigBuilder.java | 1 +
.../supervisor/KafkaSupervisorIOConfigTest.java | 9 +-
.../kafka/supervisor/KafkaSupervisorTest.java | 326 ++++++++++++++++++---
.../druid/indexing/kinesis/KinesisIndexTask.java | 6 +-
.../kinesis/supervisor/KinesisSupervisor.java | 8 +-
.../supervisor/KinesisSupervisorIOConfig.java | 7 +-
.../kinesis/KinesisIndexTaskSerdeTest.java | 1 +
.../indexing/kinesis/KinesisIndexTaskTest.java | 3 +-
.../indexing/kinesis/KinesisSamplerSpecTest.java | 9 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 34 ++-
.../druid/indexing/common/task/TaskResource.java | 2 +-
.../druid/indexing/overlord/ForkingTaskRunner.java | 9 +
.../seekablestream/SeekableStreamIndexTask.java | 19 +-
.../supervisor/SeekableStreamSupervisor.java | 133 +++++++--
.../SeekableStreamSupervisorIOConfig.java | 43 ++-
.../indexing/overlord/ForkingTaskRunnerTest.java | 81 +++++
.../SeekableStreamIndexTaskRunnerAuthTest.java | 23 +-
.../SeekableStreamSamplerSpecTest.java | 1 +
.../TestSeekableStreamIndexTask.java | 19 +-
.../TestSeekableStreamIndexTaskIOConfig.java | 44 +++
.../SeekableStreamSupervisorIOConfigTest.java | 114 ++++++-
.../SeekableStreamSupervisorSpecTest.java | 7 +-
.../SeekableStreamSupervisorStateTest.java | 273 ++++++++++++++---
.../SeekableStreamSupervisorTestBase.java | 4 +-
.../java/org/apache/druid/cli/CliPeonTest.java | 2 +-
36 files changed, 1069 insertions(+), 169 deletions(-)
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index c13764acd97..360e6ad86d2 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -56,7 +56,7 @@ For configuration properties specific to Kafka and Kinesis,
see [Kafka I/O confi
|`inputFormat`|Object|The [input
format](../ingestion/data-formats.md#input-format) to define input data
parsing.|Yes||
|`autoScalerConfig`|Object|Defines auto scaling behavior for ingestion tasks.
See [Task autoscaler](#task-autoscaler) for more information.|No|null|
|`taskCount`|Integer|The maximum number of reading tasks in a replica set.
Multiply `taskCount` and replicas to measure the maximum number of reading
tasks. The total number of tasks, reading and publishing, is higher than the
maximum number of reading tasks. See [Capacity
planning](../ingestion/supervisor.md#capacity-planning) for more details. When
`taskCount` is greater than the number of Kafka partitions or Kinesis shards,
the actual number of reading tasks is less than the `taskCoun [...]
-|`replicas`|Integer|The number of replica sets, where 1 is a single set of
tasks (no replication). Druid always assigns replicate tasks to different
workers to provide resiliency against process failure.|No|1|
+|`replicas`|Integer|The number of replica sets, where 1 is a single set of
tasks (no replication). Druid always assigns task replicas to different workers
to provide resiliency against process failure. See `serverPriorityToReplicas`
to assign server priorities for task replicas.|No|1|
|`taskDuration`|ISO 8601 period|The length of time before tasks stop reading
and begin publishing segments.|No|`PT1H`|
|`startDelay`|ISO 8601 period|The period to wait before the supervisor starts
managing tasks.|No|`PT5S`|
|`period`|ISO 8601 period|Determines how often the supervisor executes its
management logic. Note that the supervisor also runs in response to certain
events, such as tasks succeeding, failing, and reaching their task duration.
The `period` value specifies the maximum time between iterations.|No|`PT30S`|
@@ -65,6 +65,7 @@ For configuration properties specific to Kafka and Kinesis,
see [Kafka I/O confi
|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject
messages with timestamps earlier than this period before the task was created.
For example, if this property is set to `PT1H` and the supervisor creates a
task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than
`2016-01-01T11:00Z`. This may help prevent concurrency issues if your data
stream has late messages and you have multiple pipelines that need to operate
on the same segments, such as a [...]
|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject
messages with timestamps later than this period after the task reached its task
duration. For example, if this property is set to `PT1H`, the task duration is
set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid
drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes
run past their task duration, such as in cases of supervisor failover.|No||
|`stopTaskCount`|Integer|Limits the number of ingestion tasks Druid can cycle
at any given time. If not set, Druid can cycle all tasks at the same time. If
set to a value less than `taskCount`, your cluster needs fewer available slots
to run the supervisor. You can save costs by scaling down your ingestion tier,
but this can lead to slower cycle times and lag. See
[`stopTaskCount`](#stoptaskcount) for more information.|No|`taskCount` value|
+|`serverPriorityToReplicas`|Object (`Map<Integer, Integer>`)|Map of server
priorities to the number of replicas per priority. When set, each task replica
is assigned a server priority that corresponds to `druid.server.priority` on
the Peon process to enable query isolation for mixed workloads using [query
routing strategies](../configuration/index.md#query-routing). If not
configured, the `replicas` setting applies and all task replicas are assigned a
default priority of 0.<br/><br/>For [...]
#### Task autoscaler
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
index b6743ef5955..ea9dd037bbe 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java
@@ -119,7 +119,8 @@ public abstract class StreamIndexTestBase extends
EmbeddedClusterTestBase
true,
Period.seconds(5),
null, null, null, null, null, null, null, null,
- false
+ false,
+ null
),
Map.of(),
false,
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java
index cc6da2169f4..1a76594ec87 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java
@@ -104,7 +104,8 @@ public class KinesisDataFormatsTest extends
StreamIndexDataFormatsTestBase
true,
Period.seconds(5),
null, null, null, null, null, null, null, null,
- false
+ false,
+ null
),
Map.of(),
false,
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
index 4aceebe37be..452ccf5e018 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
@@ -56,7 +56,8 @@ public class RabbitStreamIndexTask extends
SeekableStreamIndexTask<String, Long,
@JsonProperty("tuningConfig") RabbitStreamIndexTaskTuningConfig
tuningConfig,
@JsonProperty("ioConfig") RabbitStreamIndexTaskIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
- @JacksonInject ObjectMapper configMapper
+ @JacksonInject ObjectMapper configMapper,
+ @JsonProperty("serverPriority") @Nullable Integer serverPriority
)
{
super(
@@ -67,7 +68,8 @@ public class RabbitStreamIndexTask extends
SeekableStreamIndexTask<String, Long,
tuningConfig,
ioConfig,
context,
- getFormattedGroupId(Configs.valueOrDefault(supervisorId,
dataSchema.getDataSource()), TYPE)
+ getFormattedGroupId(Configs.valueOrDefault(supervisorId,
dataSchema.getDataSource()), TYPE),
+ serverPriority
);
this.configMapper = configMapper;
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
index 2ced013ef5b..9be4a77a8d4 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
@@ -55,6 +55,7 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -214,7 +215,9 @@ public class RabbitStreamSupervisor extends
SeekableStreamSupervisor<String, Lon
TreeMap<Integer, Map<String, Long>> sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
- RowIngestionMetersFactory rowIngestionMetersFactory) throws
JsonProcessingException
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ @Nullable List<Integer> serverPrioritiesToAssign
+ ) throws JsonProcessingException
{
final String checkpoints =
sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = createBaseTaskContexts();
@@ -231,7 +234,8 @@ public class RabbitStreamSupervisor extends
SeekableStreamSupervisor<String, Lon
(RabbitStreamIndexTaskTuningConfig) taskTuningConfig,
(RabbitStreamIndexTaskIOConfig) taskIoConfig,
context,
- sortingMapper
+ sortingMapper,
+ CollectionUtils.isNullOrEmpty(serverPrioritiesToAssign) ? null :
serverPrioritiesToAssign.get(i)
));
}
return taskList;
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
index 0538163df61..8aad5b76221 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java
@@ -65,7 +65,8 @@ public class RabbitStreamSupervisorIOConfig extends
SeekableStreamSupervisorIOCo
@JsonProperty("lateMessageRejectionPeriod") Period
lateMessageRejectionPeriod,
@JsonProperty("earlyMessageRejectionPeriod") Period
earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime
lateMessageRejectionStartDateTime,
- @JsonProperty("stopTaskCount") Integer stopTaskCount
+ @JsonProperty("stopTaskCount") Integer stopTaskCount,
+ @Nullable @JsonProperty("serverPriorityToReplicas") Map<Integer,
Integer> serverPriorityToReplicas
)
{
super(
@@ -84,7 +85,8 @@ public class RabbitStreamSupervisorIOConfig extends
SeekableStreamSupervisorIOCo
LagAggregator.DEFAULT,
lateMessageRejectionStartDateTime,
new IdleConfig(null, null),
- stopTaskCount
+ stopTaskCount,
+ serverPriorityToReplicas
);
this.consumerProperties = consumerProperties;
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java
b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java
index 32fd2f2d53f..82e0b164471 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java
@@ -211,7 +211,8 @@ public class RabbitStreamSupervisorTest extends
EasyMockSupport
lateMessageRejectionPeriod, // latemessagerejection
earlyMessageRejectionPeriod, // early message rejection
null, // latemessagerejectionstartdatetime
- 1
+ 1,
+ null
);
RabbitStreamIndexTaskClientFactory clientFactory = new
RabbitStreamIndexTaskClientFactory(null,
OBJECT_MAPPER);
@@ -276,7 +277,8 @@ public class RabbitStreamSupervisorTest extends
EasyMockSupport
null, // latemessagerejection
null, // early message rejection
null, // latemessagerejectionstartdatetime
- 1
+ 1,
+ null
);
RabbitStreamIndexTaskClientFactory clientFactory = new
RabbitStreamIndexTaskClientFactory(null,
OBJECT_MAPPER);
@@ -418,7 +420,8 @@ public class RabbitStreamSupervisorTest extends
EasyMockSupport
null, // latemessagerejection
null, // early message rejection
null, // latemessagerejectionstartdatetime
- 1
+ 1,
+ null
)
);
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 40cab634c89..402c6b9144b 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -69,7 +69,8 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<KafkaTopicPartition,
@JsonProperty("tuningConfig") KafkaIndexTaskTuningConfig tuningConfig,
@JsonProperty("ioConfig") KafkaIndexTaskIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
- @JacksonInject ObjectMapper configMapper
+ @JacksonInject ObjectMapper configMapper,
+ @JsonProperty("serverPriority") @Nullable Integer serverPriority
)
{
super(
@@ -80,7 +81,8 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<KafkaTopicPartition,
tuningConfig,
ioConfig,
context,
- getFormattedGroupId(Configs.valueOrDefault(supervisorId,
dataSchema.getDataSource()), TYPE)
+ getFormattedGroupId(Configs.valueOrDefault(supervisorId,
dataSchema.getDataSource()), TYPE),
+ serverPriority
);
this.configMapper = configMapper;
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 68bdb3fb405..e226db37676 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -60,6 +60,7 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -234,7 +235,8 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
TreeMap<Integer, Map<KafkaTopicPartition, Long>> sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
- RowIngestionMetersFactory rowIngestionMetersFactory
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ @Nullable List<Integer> serverPrioritiesToAssign
) throws JsonProcessingException
{
final String checkpoints =
sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
@@ -252,7 +254,8 @@ public class KafkaSupervisor extends
SeekableStreamSupervisor<KafkaTopicPartitio
(KafkaIndexTaskTuningConfig) taskTuningConfig,
(KafkaIndexTaskIOConfig) taskIoConfig,
context,
- sortingMapper
+ sortingMapper,
+ CollectionUtils.isNullOrEmpty(serverPrioritiesToAssign) ? null :
serverPrioritiesToAssign.get(i)
));
}
return taskList;
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 4eac3163fe3..992ff292694 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -77,7 +77,8 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
@JsonProperty("idleConfig") IdleConfig idleConfig,
@JsonProperty("stopTaskCount") Integer stopTaskCount,
- @Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics
+ @Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics,
+ @Nullable @JsonProperty("serverPriorityToReplicas") Map<Integer,
Integer> serverPriorityToReplicas
)
{
super(
@@ -96,7 +97,8 @@ public class KafkaSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
Configs.valueOrDefault(lagAggregator, LagAggregator.DEFAULT),
lateMessageRejectionStartDateTime,
idleConfig,
- stopTaskCount
+ stopTaskCount,
+ serverPriorityToReplicas
);
this.consumerProperties = Preconditions.checkNotNull(consumerProperties,
"consumerProperties");
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index ce84944bb63..a5a3243b5b8 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2982,7 +2982,8 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
tuningConfig,
ioConfig,
context,
- OBJECT_MAPPER
+ OBJECT_MAPPER,
+ null
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
index a649e07915a..24c1656fc7e 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
@@ -92,6 +92,7 @@ public class KafkaIOConfigBuilder extends
SupervisorIOConfigBuilder<KafkaIOConfi
null,
idleConfig,
stopTaskCount,
+ null,
null
);
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 2da46aaf0e2..6295d41937e 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -340,7 +340,8 @@ public class KafkaSupervisorIOConfigTest
null,
null,
null,
- false
+ false,
+ null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 =
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
@@ -375,7 +376,8 @@ public class KafkaSupervisorIOConfigTest
null,
null,
null,
- false
+ false,
+ null
);
Assert.assertEquals(5, kafkaSupervisorIOConfig.getTaskCount().intValue());
@@ -427,7 +429,8 @@ public class KafkaSupervisorIOConfigTest
null,
mapper.convertValue(idleConfig, IdleConfig.class),
null,
- false
+ false,
+ null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 =
mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 4f6a93ce3a1..b5e00bcaab4 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -71,6 +71,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
@@ -133,6 +134,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -482,33 +484,46 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
Duration.standardHours(2).getStandardMinutes()
),
- new KafkaIndexTaskTuningConfig(
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- ),
+ supervisor.getTuningConfig(),
+ null,
null
).get(0);
Assert.assertTrue(indexTask.getRunner() instanceof KafkaIndexTaskRunner);
+ Assert.assertNull(indexTask.getServerPriority());
+ }
+
+ @Test
+ public void testCreateIndexTasksWithServerPrioritiesToAssign() throws
JsonProcessingException
+ {
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
+ final List<SeekableStreamIndexTask<KafkaTopicPartition, Long,
KafkaRecordEntity>> taskList =
+ supervisor.createIndexTasks(
+ 3,
+ "seq",
+ OBJECT_MAPPER,
+ new TreeMap<>(),
+ new KafkaIndexTaskIOConfig(
+ 0,
+ "seq",
+ new SeekableStreamStartSequenceNumbers<>("test",
Collections.emptyMap(), Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>("test",
Collections.emptyMap()),
+ Collections.emptyMap(),
+ null,
+ null,
+ null,
+ null,
+ INPUT_FORMAT,
+ null,
+ Duration.standardHours(2).getStandardMinutes()
+ ),
+ supervisor.getTuningConfig(),
+ null,
+ List.of(10, 20, 20)
+ );
+ Assert.assertEquals(3, taskList.size());
+ Assert.assertEquals(Integer.valueOf(10),
taskList.get(0).getServerPriority());
+ Assert.assertEquals(Integer.valueOf(20),
taskList.get(1).getServerPriority());
+ Assert.assertEquals(Integer.valueOf(20),
taskList.get(2).getServerPriority());
}
@Test
@@ -1613,6 +1628,104 @@ public class KafkaSupervisorTest extends EasyMockSupport
);
}
+ @Test
+ public void testRequeueTaskWithServerPrioritiesWhenFailed() throws Exception
+ {
+ supervisor = getTestableSupervisor(null, 2, 2, true, true, "PT1H", null,
null, false, kafkaHost, null, Map.of(1, 1, 2, 1));
+ addSomeEvents(1);
+
+ Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes();
+ EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+ .anyTimes();
+ EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+ .anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(4);
+
+ TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints1 = new
TreeMap<>();
+ checkpoints1.put(0, singlePartitionMap(topic, 0, 0L, 2, 0L));
+ TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints2 = new
TreeMap<>();
+ checkpoints2.put(0, singlePartitionMap(topic, 1, 0L));
+
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .anyTimes();
+
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .anyTimes();
+
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
+ replayAll();
+
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+
+ List<Task> tasks = captured.getValues();
+
+ // test that running the main loop again checks the status of the tasks
that were created and does nothing if they
+ // are all still running
+ EasyMock.reset(taskStorage);
+ EasyMock.reset(taskQueue);
+
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(toMap(tasks)).anyTimes();
+ final List<Integer> observedServerPriorities = new ArrayList<>();
+ for (Task task : tasks) {
+ EasyMock.expect(taskStorage.getStatus(task.getId()))
+ .andReturn(Optional.of(TaskStatus.running(task.getId())))
+ .anyTimes();
+
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
+ observedServerPriorities.add(((KafkaIndexTask)
task).getServerPriority());
+ }
+ Assert.assertEquals(4, tasks.size());
+ Assert.assertEquals(List.of(2, 1, 2, 1), observedServerPriorities);
+
+ EasyMock.replay(taskStorage);
+ EasyMock.replay(taskQueue);
+
+ supervisor.runInternal();
+ verifyAll();
+
+ // test that a task failing causes a new task to be re-queued with the
same parameters
+ Capture<Task> aNewTaskCapture = Capture.newInstance();
+ List<Task> imStillAlive = tasks.subList(0, 3);
+ KafkaIndexTask iHaveFailed = (KafkaIndexTask) tasks.get(3);
+ EasyMock.reset(taskStorage);
+ EasyMock.reset(taskQueue);
+
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(toMap(imStillAlive)).anyTimes();
+ for (Task task : imStillAlive) {
+ EasyMock.expect(taskStorage.getStatus(task.getId()))
+ .andReturn(Optional.of(TaskStatus.running(task.getId())))
+ .anyTimes();
+
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
+ }
+ EasyMock.expect(taskStorage.getStatus(iHaveFailed.getId()))
+ .andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId(),
"Dummy task status failure err message")));
+
EasyMock.expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of(iHaveFailed)).anyTimes();
+
EasyMock.expect(taskQueue.add(EasyMock.capture(aNewTaskCapture))).andReturn(true);
+ EasyMock.replay(taskStorage);
+ EasyMock.replay(taskQueue);
+
+ supervisor.runInternal();
+ verifyAll();
+
+ Assert.assertNotEquals(iHaveFailed.getId(),
aNewTaskCapture.getValue().getId());
+ KafkaIndexTask retriedTask = (KafkaIndexTask) aNewTaskCapture.getValue();
+ Assert.assertEquals(
+ iHaveFailed.getIOConfig().getBaseSequenceName(),
+ retriedTask.getIOConfig().getBaseSequenceName()
+ );
+ Assert.assertEquals(Integer.valueOf(1), retriedTask.getServerPriority());
+ }
+
@Test
public void testRequeueAdoptedTaskWhenFailed() throws Exception
{
@@ -2423,7 +2536,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("id1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
1,
@@ -2431,7 +2545,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("id2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
@@ -2580,7 +2695,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("id1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
1,
@@ -2588,7 +2704,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("id2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
@@ -2916,7 +3033,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("id1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
1,
@@ -2924,7 +3042,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("id2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.updateCurrentAndLatestOffsets();
supervisor.runInternal();
@@ -4369,7 +4488,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -4378,7 +4498,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
EasyMock.expect(taskClient.getMovingAveragesAsync("task1"))
@@ -4411,7 +4532,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -4420,7 +4542,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
ParseExceptionReport exception1 = new ParseExceptionReport(
@@ -4627,15 +4750,18 @@ public class KafkaSupervisorTest extends EasyMockSupport
tuningConfigBuilder().build()
);
- supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ SeekableStreamSupervisor<KafkaTopicPartition, Long,
KafkaRecordEntity>.TaskGroup taskGroup =
supervisor.addTaskGroupToActivelyReadingTaskGroup(
42,
singlePartitionMap(topic, 0, 0L, 2, 0L),
minMessageTime,
maxMessageTime,
ImmutableSet.of("id1", "id2", "id3", "id4"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
+ Assert.assertNull(supervisor.computeUnassignedServerPriorities(taskGroup,
3));
+
DataSchema modifiedDataSchema = getDataSchema("some other datasource");
KafkaSupervisorTuningConfig modifiedTuningConfig = tuningConfigBuilder()
@@ -5029,7 +5155,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void test_doesTaskMatchSupervisor()
{
- supervisor = getTestableSupervisor("supervisorId", 1, 1, true, true, null,
new Period("PT1H"), new Period("PT1H"), false, kafkaHost, null);
+ supervisor = getTestableSupervisor("supervisorId", 1, 1, true, true, null,
new Period("PT1H"), new Period("PT1H"), false, kafkaHost, null, null);
KafkaIndexTask kafkaTaskMatch = createMock(KafkaIndexTask.class);
EasyMock.expect(kafkaTaskMatch.getSupervisorId()).andReturn("supervisorId");
@@ -5244,6 +5370,112 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertFalse(supervisor.getPartitionOffsets().isEmpty());
}
+ @Test
+ public void
testComputeUnassignedServerPriorities_whenMultipleReplicasPerPriorityIsSet()
+ {
+ final Map<String, Object> consumerProperties =
KafkaConsumerConfigs.getConsumerProperties();
+ consumerProperties.put("bootstrap.servers", kafkaHost);
+
+ final KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new
KafkaSupervisorIOConfig(
+ topic,
+ null,
+ INPUT_FORMAT,
+ null,
+ 1,
+ new Period("PT1H"),
+ consumerProperties,
+ null,
+ null,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ new Period("P1D"),
+ new Period("PT30S"),
+ true,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ Map.of(
+ 10, 2,
+ 20, 3
+ )
+ );
+
+ Assert.assertEquals(5, (int) kafkaSupervisorIOConfig.getReplicas());
+
+ final KafkaIndexTaskClientFactory taskClientFactory = new
KafkaIndexTaskClientFactory(null, null);
+ final KafkaSupervisorSpec spec = new KafkaSupervisorSpec(
+ null,
+ null,
+ dataSchema,
+ KafkaSupervisorTuningConfig.defaultConfig(),
+ kafkaSupervisorIOConfig,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ new NoopServiceEmitter(),
+ new DruidMonitorSchedulerConfig(),
+ rowIngestionMetersFactory,
+ new SupervisorStateManagerConfig()
+ );
+
+ supervisor = new TestableKafkaSupervisor(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ spec,
+ rowIngestionMetersFactory
+ );
+
+ final SeekableStreamSupervisor<KafkaTopicPartition, Long,
KafkaRecordEntity>.TaskGroup taskGroup1 =
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 0,
+ ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 0L),
+ null,
+ null,
+ Set.of(),
+ Set.of(),
+ Map.of()
+ );
+
+ Assert.assertEquals(List.of(20, 20, 20, 10, 10),
supervisor.computeUnassignedServerPriorities(taskGroup1, 5));
+
+ final SeekableStreamSupervisor<KafkaTopicPartition, Long,
KafkaRecordEntity>.TaskGroup taskGroup2 =
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 1,
+ ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 0L),
+ null,
+ null,
+ Set.of("task1", "task2"),
+ Set.of(),
+ Map.of("task1", 10, "task2", 10)
+ );
+
+ Assert.assertEquals(List.of(20, 20, 20),
supervisor.computeUnassignedServerPriorities(taskGroup2, 3));
+
+ final SeekableStreamSupervisor<KafkaTopicPartition, Long,
KafkaRecordEntity>.TaskGroup taskGroup3 =
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 2,
+ ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 0L),
+ null,
+ null,
+ Set.of("task1", "task2", "task3"),
+ Set.of(),
+ Map.of("task1", 10, "task2", 10, "task3", 20)
+ );
+
+ Assert.assertEquals(List.of(20, 20),
supervisor.computeUnassignedServerPriorities(taskGroup3, 2));
+ }
+
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
// create topic manually
@@ -5355,6 +5587,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
earlyMessageRejectionPeriod,
false,
kafkaHost,
+ null,
null
);
}
@@ -5381,6 +5614,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
earlyMessageRejectionPeriod,
suspended,
kafkaHost,
+ null,
null
);
}
@@ -5407,7 +5641,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
earlyMessageRejectionPeriod,
suspended,
kafkaHost,
- idleConfig
+ idleConfig,
+ null
);
}
@@ -5422,7 +5657,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Period earlyMessageRejectionPeriod,
boolean suspended,
String kafkaHost,
- IdleConfig idleConfig
+ IdleConfig idleConfig,
+ Map<Integer, Integer> serverPriorityToReplicas
)
{
final Map<String, Object> consumerProperties =
KafkaConsumerConfigs.getConsumerProperties();
@@ -5449,7 +5685,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
idleConfig,
null,
- true
+ true,
+ serverPriorityToReplicas
);
KafkaIndexTaskClientFactory taskClientFactory = new
KafkaIndexTaskClientFactory(
@@ -5543,7 +5780,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
);
KafkaIndexTaskClientFactory taskClientFactory = new
KafkaIndexTaskClientFactory(
@@ -5637,7 +5875,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
);
KafkaIndexTaskClientFactory taskClientFactory = new
KafkaIndexTaskClientFactory(
@@ -5786,7 +6025,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Duration.standardHours(2).getStandardMinutes()
),
Collections.emptyMap(),
- OBJECT_MAPPER
+ OBJECT_MAPPER,
+ null
);
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index a09cfcfa5ba..626586c23ac 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -72,7 +72,8 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, Ki
@JsonProperty("ioConfig") KinesisIndexTaskIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("useListShards") boolean useListShards,
- @JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE)
AWSCredentialsConfig awsCredentialsConfig
+ @JacksonInject @Named(KinesisIndexingServiceModule.AWS_SCOPE)
AWSCredentialsConfig awsCredentialsConfig,
+ @JsonProperty("serverPriority") @Nullable Integer serverPriority
)
{
super(
@@ -83,7 +84,8 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, Ki
tuningConfig,
ioConfig,
context,
- getFormattedGroupId(Configs.valueOrDefault(supervisorId,
dataSchema.getDataSource()), TYPE)
+ getFormattedGroupId(Configs.valueOrDefault(supervisorId,
dataSchema.getDataSource()), TYPE),
+ serverPriority
);
this.useListShards = useListShards;
this.awsCredentialsConfig = awsCredentialsConfig;
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 884476181fa..08491caa8ff 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -56,8 +56,10 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -157,7 +159,8 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
TreeMap<Integer, Map<String, String>> sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
- RowIngestionMetersFactory rowIngestionMetersFactory
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ @Nullable List<Integer> serverPrioritiesToAssign
) throws JsonProcessingException
{
final String checkpoints =
sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
@@ -176,7 +179,8 @@ public class KinesisSupervisor extends
SeekableStreamSupervisor<String, String,
(KinesisIndexTaskIOConfig) taskIoConfig,
context,
spec.getSpec().getTuningConfig().isUseListShards(),
- awsCredentialsConfig
+ awsCredentialsConfig,
+ CollectionUtils.isNullOrEmpty(serverPrioritiesToAssign) ? null :
serverPrioritiesToAssign.get(i)
));
}
return taskList;
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index ce6eece5af2..6c325bd0744 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -34,6 +34,7 @@ import org.joda.time.DateTime;
import org.joda.time.Period;
import javax.annotation.Nullable;
+import java.util.Map;
public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
{
@@ -77,7 +78,8 @@ public class KinesisSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
@Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig
autoScalerConfig,
- @JsonProperty("deaggregate") @Deprecated boolean deaggregate
+ @JsonProperty("deaggregate") @Deprecated boolean deaggregate,
+ @Nullable @JsonProperty("serverPriorityToReplicas") Map<Integer,
Integer> serverPriorityToReplicas
)
{
super(
@@ -96,7 +98,8 @@ public class KinesisSupervisorIOConfig extends
SeekableStreamSupervisorIOConfig
LagAggregator.DEFAULT,
lateMessageRejectionStartDateTime,
new IdleConfig(null, null),
- null
+ null,
+ serverPriorityToReplicas
);
this.endpoint = endpoint != null
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index 8a0ec53bb30..84b843958eb 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -126,6 +126,7 @@ public class KinesisIndexTaskSerdeTest
IO_CONFIG,
null,
false,
+ null,
null
);
ObjectMapper objectMapper = createObjectMapper();
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 69764a5d800..8a1c1c6dfe1 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2526,7 +2526,8 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
ioConfig,
context,
false,
- awsCredentialsConfig
+ awsCredentialsConfig,
+ null
);
}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
index cda3d4c1452..290e16b9756 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -145,7 +145,8 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
),
null,
null,
@@ -225,7 +226,8 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
),
null,
null,
@@ -279,7 +281,8 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
),
null,
null,
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 1fd50c13545..0039f730ec2 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -468,7 +468,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
);
KinesisIndexTaskClientFactory clientFactory = new
KinesisIndexTaskClientFactory(null, OBJECT_MAPPER);
KinesisSupervisor supervisor = new KinesisSupervisor(
@@ -533,7 +534,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
);
AutoScalerConfig autoscalerConfigNull =
kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoScalerConfig();
@@ -560,7 +562,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
- false
+ false,
+ null
);
AutoScalerConfig autoscalerConfig =
kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoScalerConfig();
@@ -3738,7 +3741,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -3747,7 +3751,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
EasyMock.expect(taskClient.getMovingAveragesAsync("task1")).andReturn(Futures.immediateFuture(ImmutableMap.of(
@@ -3977,7 +3982,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
minMessageTime,
maxMessageTime,
ImmutableSet.of("id1", "id2", "id3", "id4"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
DataSchema modifiedDataSchema = getDataSchema("some other datasource");
@@ -4215,7 +4221,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
),
null,
null,
@@ -5179,7 +5186,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
);
KinesisIndexTaskClientFactory taskClientFactory = new
KinesisIndexTaskClientFactory(
@@ -5321,7 +5329,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
autoScalerConfig,
- false
+ false,
+ null
);
KinesisIndexTaskClientFactory taskClientFactory = new
KinesisIndexTaskClientFactory(
@@ -5407,7 +5416,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
);
KinesisIndexTaskClientFactory taskClientFactory = new
KinesisIndexTaskClientFactory(
@@ -5495,7 +5505,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- false
+ false,
+ null
);
KinesisIndexTaskClientFactory taskClientFactory = new
KinesisIndexTaskClientFactory(
@@ -5646,6 +5657,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
),
Collections.emptyMap(),
false,
+ null,
null
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResource.java
index 94b85d3b29c..870b1c71575 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResource.java
@@ -42,7 +42,7 @@ public class TaskResource
}
/**
- * Returns availability group ID of this task. Tasks the same availability
group cannot be assigned to the same
+ * Returns availability group ID of this task. Tasks of the same
availability group cannot be assigned to the same
* worker. If tasks do not have this restriction, a common convention is to
set the availability group ID to the
* task ID.
*
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 5d87ab602fb..6485d6d72da 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -48,6 +48,7 @@ import
org.apache.druid.indexing.common.tasklogs.ConsoleLoggingEnforcementConfig
import org.apache.druid.indexing.common.tasklogs.LogUtils;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
@@ -361,6 +362,14 @@ public class ForkingTaskRunner
command.addSystemProperty("druid.task.executor.enableTlsPort",
node.isEnableTlsPort());
command.addSystemProperty("log4j2.configurationFactory",
ConsoleLoggingEnforcementConfigurationFactory.class.getName());
+
+ if (task instanceof SeekableStreamIndexTask) {
+ final Integer serverPriority =
((SeekableStreamIndexTask) task).getServerPriority();
+ if (serverPriority != null) {
+ command.addSystemProperty("druid.server.priority",
serverPriority);
+ }
+ }
+
command.addSystemProperty("druid.indexer.task.baseTaskDir",
storageSlot.getDirectory().getAbsolutePath());
command.addSystemProperty("druid.indexer.task.tmpStorageBytesPerTask",
storageSlot.getNumBytes());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index d7987221824..390fe68cfee 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -55,6 +55,7 @@ import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
+import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import javax.annotation.Nullable;
@@ -73,12 +74,15 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
protected final LockGranularity lockGranularityToUse;
protected final TaskLockType lockTypeToUse;
protected final String supervisorId;
+ @Nullable
+ protected final Integer serverPriority;
// Lazily initialized to avoid calling it on the overlord when tasks are
instantiated.
// See https://github.com/apache/druid/issues/7724 for issues that can cause.
// By the way, lazily init is synchronized because the runner may be needed
in multiple threads.
private final Supplier<SeekableStreamIndexTaskRunner<PartitionIdType,
SequenceOffsetType, ?>> runnerSupplier;
+
public SeekableStreamIndexTask(
final String id,
final @Nullable String supervisorId,
@@ -87,7 +91,8 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
final SeekableStreamIndexTaskTuningConfig tuningConfig,
final SeekableStreamIndexTaskIOConfig<PartitionIdType,
SequenceOffsetType> ioConfig,
@Nullable final Map<String, Object> context,
- @Nullable final String groupId
+ @Nullable final String groupId,
+ @Nullable final Integer serverPriority
)
{
super(
@@ -108,6 +113,7 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
: LockGranularity.SEGMENT;
this.lockTypeToUse = TaskLocks.determineLockTypeForAppend(getContext());
this.supervisorId =
Preconditions.checkNotNull(Configs.valueOrDefault(supervisorId,
dataSchema.getDataSource()), "supervisorId");
+ this.serverPriority = serverPriority;
}
protected static String getFormattedGroupId(String supervisorId, String type)
@@ -143,6 +149,17 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
return supervisorId;
}
+ /**
+ * @return the server priority this task belongs to, if specified. When set,
this value
+ * maps to {@link DruidServerMetadata#getPriority()} on the corresponding
Peon server.
+ */
+ @JsonProperty
+ @Nullable
+ public Integer getServerPriority()
+ {
+ return serverPriority;
+ }
+
@JsonProperty
public SeekableStreamIndexTaskTuningConfig getTuningConfig()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 62c439e12b3..f5128dc3809 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -205,6 +205,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final ImmutableMap<PartitionIdType, SequenceOffsetType>
unfilteredStartingSequencesForSequenceName;
final ConcurrentHashMap<String, TaskData> tasks = new
ConcurrentHashMap<>();
+ final ConcurrentHashMap<String, Integer> taskIdToServerPriority = new
ConcurrentHashMap<>();
final DateTime minimumMessageTime;
final DateTime maximumMessageTime;
final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions;
@@ -276,6 +277,28 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return tasks.keySet();
}
+ /**
+ * Removes a task from {@link #tasks} and {@link #taskIdToServerPriority}.
+ */
+ void removeTask(String taskId)
+ {
+ tasks.remove(taskId);
+ taskIdToServerPriority.remove(taskId);
+ }
+
+ /**
+ * Removes the current task entry from {@link #tasks} using the provided
iterator,
+ * and also removes the corresponding entry from {@link
#taskIdToServerPriority}.
+ * <p>
+ * Must be called while iterating over {@link #tasks} and relies on
+ * {@link Iterator#remove()} to safely avoid {@link
java.util.ConcurrentModificationException}.
+ */
+ void removeTask(Iterator<Entry<String, TaskData>> taskDataIterator, String
taskId)
+ {
+ taskDataIterator.remove();
+ taskIdToServerPriority.remove(taskId);
+ }
+
void setHandoffEarly()
{
handoffEarly = true;
@@ -1711,7 +1734,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@Nullable DateTime minMsgTime,
@Nullable DateTime maxMsgTime,
Set<String> tasks,
- Set<PartitionIdType> exclusiveStartingSequencePartitions
+ Set<PartitionIdType> exclusiveStartingSequencePartitions,
+ @Nullable Map<String, Integer> taskIdToServerPriority
)
{
TaskGroup group = new TaskGroup(
@@ -1723,6 +1747,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
exclusiveStartingSequencePartitions
);
group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x ->
new TaskData())));
+ if (taskIdToServerPriority != null) {
+ group.taskIdToServerPriority.putAll(taskIdToServerPriority);
+ }
if (activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) {
throw new ISE(
"trying to add taskGroup with id [%s] to actively reading task
groups, but group already exists.",
@@ -1739,7 +1766,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@Nullable DateTime minMsgTime,
@Nullable DateTime maxMsgTime,
Set<String> tasks,
- Set<PartitionIdType> exclusiveStartingSequencePartitions
+ Set<PartitionIdType> exclusiveStartingSequencePartitions,
+ @Nullable Map<String, Integer> taskIdToServerPriority
)
{
TaskGroup group = new TaskGroup(
@@ -1751,6 +1779,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
exclusiveStartingSequencePartitions
);
group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x ->
new TaskData())));
+ if (taskIdToServerPriority != null) {
+ group.taskIdToServerPriority.putAll(taskIdToServerPriority);
+ }
pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new
CopyOnWriteArrayList<>())
.add(group);
return group;
@@ -2316,6 +2347,10 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskId, taskGroup.groupId, prevTaskData
);
}
+ final Integer serverPriority =
seekableStreamIndexTask.getServerPriority();
+ if (serverPriority != null) {
+
taskGroup.taskIdToServerPriority.putIfAbsent(taskId, serverPriority);
+ }
verifySameSequenceNameForAllTasksInGroup(taskGroupId);
}
}
@@ -2518,7 +2553,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
stateManager.recordThrowableEvent(e);
log.error(e, "Problem while getting checkpoints for task [%s], killing
the task", taskId);
killTask(taskId, "Exception[%s] while getting checkpoints",
e.getClass());
- taskGroup.tasks.remove(taskId);
+ taskGroup.removeTask(taskId);
} else if (checkpointResult.valueOrThrow().isEmpty()) {
log.warn("Ignoring task [%s], as probably it is not started running
yet", taskId);
} else {
@@ -2641,7 +2676,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskGroup.checkpointSequences,
latestOffsetsFromDb
);
- taskGroup.tasks.remove(sequenceCheckpoint.lhs);
+ taskGroup.removeTask(sequenceCheckpoint.lhs);
}
);
}
@@ -2712,6 +2747,12 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return pendingCompletionTaskGroups.get(groupId);
}
+ @VisibleForTesting
+ protected TaskGroup getActiveTaskGroup(int groupId)
+ {
+ return activelyReadingTaskGroups.get(groupId);
+ }
+
// Sanity check to ensure that tasks have the same sequence name as their
task group
private void verifySameSequenceNameForAllTasksInGroup(int groupId)
{
@@ -3542,7 +3583,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (task.status.isRunnable()) {
if
(taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) {
killTask(taskId, "Killing task [%s] which hasn't been assigned
to a worker", taskId);
- i.remove();
+ taskGroup.removeTask(i, taskId);
}
}
}
@@ -3579,11 +3620,11 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskId,
pauseException
);
- taskGroup.tasks.remove(taskId);
+ taskGroup.removeTask(taskId);
} else if (input.get(i).valueOrThrow() == null ||
input.get(i).valueOrThrow().isEmpty()) {
killTask(taskId, "Task [%s] returned empty offsets after
pause", taskId);
- taskGroup.tasks.remove(taskId);
+ taskGroup.removeTask(taskId);
} else { // otherwise build a map of the highest sequences seen
for (Entry<PartitionIdType, SequenceOffsetType> sequence :
input.get(i).valueOrThrow().entrySet()) {
if (!endOffsets.containsKey(sequence.getKey())
@@ -3632,7 +3673,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
} else {
String taskId = setEndOffsetTaskIds.get(i);
killTask(taskId, "Failed to set end offsets, killing
task");
- taskGroup.tasks.remove(taskId);
+ taskGroup.removeTask(taskId);
}
}
}
@@ -3724,7 +3765,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (taskData.status.isFailure()) {
stateManager.recordCompletedTaskState(TaskState.FAILED);
- iTask.remove(); // remove failed task
+ group.removeTask(iTask, taskId); // remove failed task
if (group.tasks.isEmpty()) {
// if all tasks in the group have failed, just nuke all task
groups with this partition set and restart
entireTaskGroupFailed = true;
@@ -3820,7 +3861,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (!isTaskCurrent(groupId, taskId, activeTaskMap)) {
log.info("Stopping task[%s] as it does not match the expected
sequence range and ingestion spec.", taskId);
futures.add(stopTask(taskId, false));
- iTasks.remove();
+ taskGroup.removeTask(iTasks, taskId);
continue;
}
@@ -3829,7 +3870,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// remove failed tasks
if (taskData.status.isFailure()) {
stateManager.recordCompletedTaskState(TaskState.FAILED);
- iTasks.remove();
+ taskGroup.removeTask(iTasks, taskId);
continue;
}
@@ -4206,6 +4247,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private void createTasksForGroup(int groupId, int replicas)
throws JsonProcessingException
{
+
TaskGroup group = activelyReadingTaskGroups.get(groupId);
Map<PartitionIdType, SequenceOffsetType> startPartitions =
group.startingSequences;
Map<PartitionIdType, SequenceOffsetType> endPartitions = new HashMap<>();
@@ -4230,6 +4272,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
ioConfig
);
+ final List<Integer> unassignedServerPriorities =
computeUnassignedServerPriorities(group, replicas);
+
List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType,
RecordType>> taskList = createIndexTasks(
replicas,
group.baseSequenceName,
@@ -4237,10 +4281,19 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
group.checkpointSequences,
newIoConfig,
taskTuningConfig,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ unassignedServerPriorities
);
for (SeekableStreamIndexTask indexTask : taskList) {
+ final String taskId = indexTask.getId();
+ final Integer serverPriority = indexTask.getServerPriority();
+
+ if (serverPriority != null) {
+ log.info("Adding serverPriority[%d] for task[%s] and groupId[%s]",
serverPriority, taskId, groupId);
+ group.taskIdToServerPriority.put(taskId, serverPriority);
+ }
+
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
try {
@@ -4256,6 +4309,51 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ /**
+ * Computes the remaining unassigned server priorities for the given task
group.
+ *
+ * @return the unassigned server priorities, or {@code null} if
+ * {@link
SeekableStreamSupervisorIOConfig#getServerPriorityToReplicas()} is not
configured
+ */
+ @Nullable
+ public List<Integer> computeUnassignedServerPriorities(final TaskGroup
group, final int replicas)
+ {
+ final Map<Integer, Integer> serverPriorityToReplicas =
ioConfig.getServerPriorityToReplicas();
+ if (serverPriorityToReplicas == null) {
+ return null;
+ }
+
+ // Build the full list of all required priorities and sort them from
highest to lowest priorities
+ final List<Integer> allRequiredPriorities = new ArrayList<>();
+ for (Map.Entry<Integer, Integer> entry :
serverPriorityToReplicas.entrySet()) {
+ for (int i = 0; i < entry.getValue(); i++) {
+ allRequiredPriorities.add(entry.getKey());
+ }
+ }
+ allRequiredPriorities.sort(Collections.reverseOrder());
+
+ // Remove already assigned priorities
+ final List<Integer> assignedPriorities = new
ArrayList<>(group.taskIdToServerPriority.values());
+ final List<Integer> unassignedServerPriorities = new
ArrayList<>(allRequiredPriorities);
+ for (Integer assigned : assignedPriorities) {
+ unassignedServerPriorities.remove(assigned);
+ }
+
+ log.info(
+ "Server priorities[%s] to be assigned for new tasks in taskGroupId[%d]
with replicas[%d]. Task server priorities[%s] have already been assigned to
tasks[%s].",
+ unassignedServerPriorities, group.groupId, replicas,
group.taskIdToServerPriority.values(), group.taskIds()
+ );
+
+ if (unassignedServerPriorities.size() < replicas) {
+ // Sanity check: if this triggers, this suggests a bug as the invariant
should already be validated in the ctr.
+ throw DruidException.defensive(
+ "Found unassignedServerPriorities[%s] of size[%d] < total
replicas[%d] for taskGroupId[%d]. Task server priorities[%s] have already been
assigned to tasks[%s].",
+ unassignedServerPriorities, unassignedServerPriorities.size(),
replicas, group.groupId, assignedPriorities, group.taskIds()
+ );
+ }
+ return unassignedServerPriorities;
+ }
+
/**
* monitoring method, fetches current partition offsets and lag in a
background reporting thread
*/
@@ -4495,11 +4593,11 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
/**
- * creates a list of specific kafka/kinesis index tasks using
- * the given replicas count
+ * Creates a list of {@link SeekableStreamIndexTask}s using the given {@code
replicas} and
+ * {@code serverPrioritiesToAssign} for each task, if it is
non-null/non-empty.
*
- * @return list of specific kafka/kinesis index taksks
- * @throws JsonProcessingException
+ * @return a list of {@link SeekableStreamIndexTask}s
+ * @throws JsonProcessingException if task serialization fails
*/
protected abstract List<SeekableStreamIndexTask<PartitionIdType,
SequenceOffsetType, RecordType>> createIndexTasks(
int replicas,
@@ -4508,7 +4606,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>
sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
- RowIngestionMetersFactory rowIngestionMetersFactory
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ @Nullable List<Integer> serverPrioritiesToAssign
) throws JsonProcessingException;
/**
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 0de06a63949..9386ac365f8 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -31,6 +31,7 @@ import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
+import java.util.Map;
public abstract class SeekableStreamSupervisorIOConfig
@@ -51,6 +52,7 @@ public abstract class SeekableStreamSupervisorIOConfig
@Nullable private final AutoScalerConfig autoScalerConfig;
@Nullable private final IdleConfig idleConfig;
@Nullable private final Integer stopTaskCount;
+ @Nullable private final Map<Integer, Integer> serverPriorityToReplicas;
private final LagAggregator lagAggregator;
private final boolean autoScalerEnabled;
@@ -71,12 +73,12 @@ public abstract class SeekableStreamSupervisorIOConfig
LagAggregator lagAggregator,
DateTime lateMessageRejectionStartDateTime,
@Nullable IdleConfig idleConfig,
- @Nullable Integer stopTaskCount
+ @Nullable Integer stopTaskCount,
+ @Nullable Map<Integer, Integer> serverPriorityToReplicas
)
{
this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
this.inputFormat = inputFormat;
- this.replicas = replicas != null ? replicas : 1;
InvalidInput.conditionalException(
lagAggregator != null,
@@ -119,6 +121,36 @@ public abstract class SeekableStreamSupervisorIOConfig
}
this.idleConfig = idleConfig;
+ this.serverPriorityToReplicas = serverPriorityToReplicas;
+
+ if (this.serverPriorityToReplicas != null) {
+ int serverPriorityReplicas = 0;
+ for (Map.Entry<Integer, Integer> entry :
this.serverPriorityToReplicas.entrySet()) {
+ final Integer serverReplica = entry.getValue();
+
+ if (serverReplica == null || serverReplica < 0) {
+ throw InvalidInput.exception(
+ "Found invalid server replica[%d] for priority[%d] in
serverPriorityToReplicas[%s]. Replicas must be >= 0.",
+ serverReplica, entry.getKey(), serverPriorityToReplicas
+ );
+ }
+
+ serverPriorityReplicas += serverReplica;
+ }
+
+ if (replicas != null && replicas != serverPriorityReplicas) {
+ throw InvalidInput.exception(
+ "Configured replicas[%d] does not match the sum of replicas[%d]
specified in serverPriorityToReplicas[%s]."
+ + " To avoid ambiguity, consider removing [ioConfig.replicas] in
favor of [ioConfig.serverPriorityToReplicas].",
+ replicas, serverPriorityReplicas, serverPriorityToReplicas
+ );
+ }
+
+ // We also explicitly set replicas since the supervisor logic for
replicas is already implemented.
+ this.replicas = serverPriorityReplicas;
+ } else {
+ this.replicas = replicas != null ? replicas : 1;
+ }
}
private static Duration defaultDuration(final Period period, final String
theDefault)
@@ -145,6 +177,13 @@ public abstract class SeekableStreamSupervisorIOConfig
return replicas;
}
+ @Nullable
+ @JsonProperty
+ public Map<Integer, Integer> getServerPriorityToReplicas()
+ {
+ return serverPriorityToReplicas;
+ }
+
@Nullable
@JsonProperty
public AutoScalerConfig getAutoScalerConfig()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index 4b722be02e5..9b54725293a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -24,20 +24,28 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
+import org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTask;
+import
org.apache.druid.indexing.seekablestream.TestSeekableStreamIndexTaskIOConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateTest;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.tasklogs.NoopTaskLogs;
@@ -60,6 +68,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
@@ -591,6 +600,78 @@ public class ForkingTaskRunnerTest
assertEquals(TaskState.SUCCESS, status.getStatusCode());
}
+ @Test
+ public void testTaskCommandIncludesServerPriorityIfConfigured() throws
Exception
+ {
+ final TaskConfig taskConfig = makeDefaultTaskConfigBuilder().build();
+ final WorkerConfig workerConfig = new WorkerConfig();
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ final Task task = new TestSeekableStreamIndexTask(
+ "id2",
+ null,
+ null,
+ DataSchema.builder()
+ .withDataSource("foo")
+ .withTimestamp(new TimestampSpec(null, null, null))
+ .withDimensions(new DimensionsSpec(List.of()))
+ .withGranularity(new ArbitraryGranularitySpec(new
AllGranularity(), List.of()))
+ .build(),
+
SeekableStreamSupervisorStateTest.createSupervisorTuningConfig().convertToTaskTuningConfig(),
+ new TestSeekableStreamIndexTaskIOConfig(),
+ null,
+ "1",
+ null,
+ null,
+ 2
+ );
+
+ final AtomicReference<List<String>> observedCommandRef = new
AtomicReference<>(List.of());
+
+ final ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+ new ForkingTaskRunnerConfig(),
+ taskConfig,
+ workerConfig,
+ new Properties(),
+ new NoopTaskLogs(),
+ mapper,
+ new DruidNode("middleManager", "host", false, 8091, null, true, false),
+ new StartupLoggingConfig(),
+ TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
+ )
+ {
+ @Override
+ ProcessHolder runTaskProcess(List<String> command, File logFile,
TaskLocation taskLocation) throws IOException
+ {
+ observedCommandRef.set(command);
+ for (String param : command) {
+ if (param.endsWith(task.getId())) {
+ final String basePath =
getTracker().pickStorageSlot(task.getId()).getDirectory().getAbsolutePath();
+ File resultFile = Paths.get(basePath, task.getId(), "attempt",
"1", "status.json").toFile();
+ mapper.writeValue(resultFile, TaskStatus.success(task.getId()));
+ break;
+ }
+ }
+ MockTestProcess mockTestProcess = new MockTestProcess()
+ {
+ @Override
+ public int waitFor()
+ {
+ return 0;
+ }
+ };
+ return new ForkingTaskRunner.ProcessHolder(mockTestProcess, logFile,
taskLocation);
+ }
+ };
+
+
+ forkingTaskRunner.setNumProcessorsPerTask();
+ final TaskStatus status = forkingTaskRunner.run(task).get();
+ Assert.assertNotNull(observedCommandRef);
+ final List<String> observedCommand = observedCommandRef.get();
+ Assert.assertTrue(observedCommand.contains("-Ddruid.server.priority=2"));
+ assertEquals(TaskState.SUCCESS, status.getStatusCode());
+ }
+
public static TaskConfigBuilder makeDefaultTaskConfigBuilder()
{
return new TaskConfigBuilder()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
index 6672b931400..63387ccffbd 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.ByteEntity;
-import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec;
@@ -33,7 +32,6 @@ import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecor
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
-import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
@@ -51,7 +49,6 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceType;
import org.easymock.EasyMock;
-import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Before;
import org.junit.Rule;
@@ -375,7 +372,7 @@ public class SeekableStreamIndexTaskRunnerAuthTest
SeekableStreamIndexTaskIOConfig<String, String> ioConfig
)
{
- super(id, null, null, dataSchema, tuningConfig, ioConfig, null, null);
+ super(id, null, null, dataSchema, tuningConfig, ioConfig, null, null,
null);
}
@Override
@@ -397,24 +394,6 @@ public class SeekableStreamIndexTaskRunnerAuthTest
}
}
- private static class TestSeekableStreamIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<String, String>
- {
- public TestSeekableStreamIndexTaskIOConfig()
- {
- super(
- null,
- "someSequence",
- new SeekableStreamStartSequenceNumbers<>("abc", "def",
Collections.emptyMap(), Collections.emptyMap(), null),
- new SeekableStreamEndSequenceNumbers<>("abc", "def",
Collections.emptyMap(), Collections.emptyMap()),
- false,
- DateTimes.nowUtc().minusDays(2),
- DateTimes.nowUtc(),
- new CsvInputFormat(null, null, true, null, 0, null),
- Duration.standardHours(2).getStandardMinutes()
- );
- }
- }
-
/**
* Usernames used in the tests.
*/
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
index 169bbc4a788..97456165f36 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java
@@ -345,6 +345,7 @@ public class SeekableStreamSamplerSpecTest extends
EasyMockSupport
LagAggregator.DEFAULT,
lateMessageRejectionStartDateTime,
idleConfig,
+ null,
null
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
index 985aa7da706..6bd905c57d7 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTask.java
@@ -63,7 +63,24 @@ public class TestSeekableStreamIndexTask extends
SeekableStreamIndexTask<String,
@Nullable RecordSupplier<String, String, ByteEntity> recordSupplier
)
{
- super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig,
context, groupId);
+ this(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig,
context, groupId, streamingTaskRunner, recordSupplier, null);
+ }
+
+ public TestSeekableStreamIndexTask(
+ String id,
+ @Nullable String supervisorId,
+ @Nullable TaskResource taskResource,
+ DataSchema dataSchema,
+ SeekableStreamIndexTaskTuningConfig tuningConfig,
+ SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
+ @Nullable Map<String, Object> context,
+ @Nullable String groupId,
+ @Nullable SeekableStreamIndexTaskRunner<String, String, ByteEntity>
streamingTaskRunner,
+ @Nullable RecordSupplier<String, String, ByteEntity> recordSupplier,
+ @Nullable Integer serverPriority
+ )
+ {
+ super(id, supervisorId, taskResource, dataSchema, tuningConfig, ioConfig,
context, groupId, serverPriority);
this.streamingTaskRunner = streamingTaskRunner;
this.recordSupplier = recordSupplier;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTaskIOConfig.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTaskIOConfig.java
new file mode 100644
index 00000000000..1fa7993b69a
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamIndexTaskIOConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.java.util.common.DateTimes;
+import org.joda.time.Duration;
+
+import java.util.Collections;
+
+public class TestSeekableStreamIndexTaskIOConfig extends
SeekableStreamIndexTaskIOConfig<String, String>
+{
+ public TestSeekableStreamIndexTaskIOConfig()
+ {
+ super(
+ null,
+ "someSequence",
+ new SeekableStreamStartSequenceNumbers<>("abc", "def",
Collections.emptyMap(), Collections.emptyMap(), null),
+ new SeekableStreamEndSequenceNumbers<>("abc", "def",
Collections.emptyMap(), Collections.emptyMap()),
+ false,
+ DateTimes.nowUtc().minusDays(2),
+ DateTimes.nowUtc(),
+ new CsvInputFormat(null, null, true, null, 0, null),
+ Duration.standardHours(2).getStandardMinutes()
+ );
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
index 56b8904cc18..3974b9bebca 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java
@@ -21,19 +21,31 @@ package org.apache.druid.indexing.seekablestream.supervisor;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.hamcrest.MatcherAssert;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
+import javax.annotation.Nullable;
+import java.util.Map;
+
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SeekableStreamSupervisorIOConfigTest
{
+
+ private final Map<Integer, Integer> serverPriorityToReplicas = Map.of(
+ 1, 2,
+ 2, 3
+ );
+
@Test
public void testAllDefaults()
{
@@ -56,6 +68,7 @@ public class SeekableStreamSupervisorIOConfigTest
lagAggregator,
null,
null,
+ null,
null
)
{
@@ -77,6 +90,7 @@ public class SeekableStreamSupervisorIOConfigTest
Assert.assertNull(config.getStopTaskCount());
Assert.assertEquals(lagAggregator, config.getLagAggregator());
Assert.assertEquals(1, config.getMaxAllowedStops());
+ Assert.assertNull(config.getServerPriorityToReplicas());
}
@Test
@@ -106,6 +120,7 @@ public class SeekableStreamSupervisorIOConfigTest
lagAggregator,
null,
null,
+ null,
null
)
{
@@ -130,6 +145,7 @@ public class SeekableStreamSupervisorIOConfigTest
lagAggregator,
null,
null,
+ null,
null
)
{
@@ -161,6 +177,7 @@ public class SeekableStreamSupervisorIOConfigTest
lagAggregator,
DateTimes.nowUtc(),
null,
+ null,
null
)
{
@@ -195,6 +212,7 @@ public class SeekableStreamSupervisorIOConfigTest
null,
null,
null,
+ null,
null
)
{
@@ -227,6 +245,7 @@ public class SeekableStreamSupervisorIOConfigTest
lagAggregator,
null,
null,
+ null,
null
)
{
@@ -250,7 +269,8 @@ public class SeekableStreamSupervisorIOConfigTest
lagAggregator,
null,
null,
- 3
+ 3,
+ null
)
{
};
@@ -285,7 +305,8 @@ public class SeekableStreamSupervisorIOConfigTest
lagAggregator,
null,
null,
- 1
+ 1,
+ null
)
{
};
@@ -317,7 +338,8 @@ public class SeekableStreamSupervisorIOConfigTest
lagAggregator,
null,
null,
- 1
+ 1,
+ null
)
{
};
@@ -346,6 +368,7 @@ public class SeekableStreamSupervisorIOConfigTest
lagAggregator,
null,
null,
+ null,
null
)
{
@@ -353,4 +376,89 @@ public class SeekableStreamSupervisorIOConfigTest
Assert.assertEquals(10, config3.getMaxAllowedStops());
}
+
+ @Test
+ public void testReplicasIsSetWhenserverPriorityToReplicas()
+ {
+ final SeekableStreamSupervisorIOConfig config =
makeSeekableStreamSupervisorIOConfig(null, serverPriorityToReplicas);
+ Assert.assertEquals(serverPriorityToReplicas,
config.getServerPriorityToReplicas());
+ Assert.assertEquals(Integer.valueOf(5), config.getReplicas());
+ }
+
+ @Test
+ public void testReplicasOnlyConfig()
+ {
+ final SeekableStreamSupervisorIOConfig config =
makeSeekableStreamSupervisorIOConfig(4, null);
+ Assert.assertEquals(Integer.valueOf(4), config.getReplicas());
+ Assert.assertNull(config.getServerPriorityToReplicas());
+ }
+
+ @Test
+ public void testMatchingReplicasAndServerPriority()
+ {
+ final SeekableStreamSupervisorIOConfig config =
makeSeekableStreamSupervisorIOConfig(5, serverPriorityToReplicas);
+ Assert.assertEquals(Integer.valueOf(5), config.getReplicas());
+ Assert.assertEquals(serverPriorityToReplicas,
config.getServerPriorityToReplicas());
+ }
+
+ @Test
+ public void
testMismatchBetweenReplicasAndServerPriorityReplicasThrowsException()
+ {
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> makeSeekableStreamSupervisorIOConfig(3,
serverPriorityToReplicas)
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ StringUtils.format(
+ "Configured replicas[3] does not match the sum of replicas[5]
specified in serverPriorityToReplicas[%s]."
+ + " To avoid ambiguity, consider removing [ioConfig.replicas]
in favor of [ioConfig.serverPriorityToReplicas].",
+ serverPriorityToReplicas
+ )
+ )
+ );
+ }
+
+ @Test
+ public void testNegativeReplicasThrowsException()
+ {
+ final Map<Integer, Integer> invalidServerPriorityToReplicas = Map.of(0, 2,
1, -1);
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> makeSeekableStreamSupervisorIOConfig(null,
invalidServerPriorityToReplicas)
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ StringUtils.format(
+ "Found invalid server replica[-1] for priority[1] in
serverPriorityToReplicas[%s]. Replicas must be >= 0.",
+ invalidServerPriorityToReplicas
+ )
+ )
+ );
+ }
+
+ private SeekableStreamSupervisorIOConfig
makeSeekableStreamSupervisorIOConfig(@Nullable Integer replicas, @Nullable
Map<Integer, Integer> serverPriorityToReplicas)
+ {
+ return new SeekableStreamSupervisorIOConfig(
+ "stream",
+ null,
+ replicas,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ mock(LagAggregator.class),
+ null,
+ null,
+ null,
+ serverPriorityToReplicas
+ )
+ {
+ };
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index db805c21187..d2660937f8a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -794,6 +794,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
LagAggregator.DEFAULT,
null,
null,
+ null,
null
)
{
@@ -851,6 +852,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
LagAggregator.DEFAULT,
null,
new IdleConfig(true, null),
+ null,
null
)
{
@@ -1221,7 +1223,8 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
null,
null,
Set.of("dummyTask"),
- Collections.emptySet()
+ Collections.emptySet(),
+ null
);
supervisor.start();
@@ -1469,6 +1472,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
LagAggregator.DEFAULT,
null,
null,
+ null,
null
)
{
@@ -1490,6 +1494,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
LagAggregator.DEFAULT,
null,
null,
+ null,
null
)
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 71e798e7e2f..db37e3014e3 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -695,6 +695,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
LagAggregator.DEFAULT,
null,
new IdleConfig(true, 200L),
+ null,
null
)
{
@@ -801,6 +802,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
LagAggregator.DEFAULT,
null,
new IdleConfig(true, 200L),
+ null,
null
)
{
@@ -1100,6 +1102,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
LagAggregator.DEFAULT,
null,
new IdleConfig(true, 200L),
+ null,
null
) {};
@@ -1318,7 +1321,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
LagAggregator.DEFAULT,
null,
new IdleConfig(true, 200L),
- stopTaskCount
+ stopTaskCount,
+ null
)
{
};
@@ -1553,6 +1557,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
LagAggregator.DEFAULT,
null,
new IdleConfig(true, 200L),
+ null,
null
)
{
@@ -1889,7 +1894,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -1898,7 +1904,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
Map<String, Map<String, Object>> stats = supervisor.getStats();
@@ -1935,7 +1942,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -1944,7 +1952,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount());
@@ -1995,7 +2004,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
final DataSourceMetadata resetMetadata = new
TestSeekableStreamDataSourceMetadata(
@@ -2042,7 +2052,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task0"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
final SeekableStreamSupervisor.TaskGroup taskGroup1 =
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
@@ -2050,7 +2061,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
final SeekableStreamSupervisor.TaskGroup taskGroup2 =
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
@@ -2058,7 +2070,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
final PendingSegmentRecord pendingSegmentRecord0 =
PendingSegmentRecord.create(
@@ -2111,6 +2124,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
)
)
);
+
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(SUPERVISOR_ID,
new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
"stream",
@@ -2136,7 +2150,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2145,7 +2160,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2154,7 +2170,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task3"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
final DataSourceMetadata resetMetadata = new
TestSeekableStreamDataSourceMetadata(
@@ -2207,7 +2224,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2216,7 +2234,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2225,7 +2244,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task3"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
final DataSourceMetadata resetMetadata = new
TestSeekableStreamDataSourceMetadata(
@@ -2282,7 +2302,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2291,7 +2312,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
final DataSourceMetadata resetMetadata = new
TestSeekableStreamDataSourceMetadata(
@@ -2349,7 +2371,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -2358,7 +2381,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
final DataSourceMetadata resetMetadata = new
TestSeekableStreamDataSourceMetadata(
@@ -2392,7 +2416,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -2401,7 +2426,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
verifyAll();
@@ -2431,7 +2457,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -2440,7 +2467,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
verifyAll();
@@ -2481,7 +2509,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task1"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
@@ -2490,7 +2519,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null,
ImmutableSet.of("task2"),
- ImmutableSet.of()
+ ImmutableSet.of(),
+ null
);
verifyAll();
@@ -2649,7 +2679,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
null,
null
);
- SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(1,
autoScalerConfig);
+ SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(1,
autoScalerConfig, null);
Assert.assertEquals(
taskCountMax / DEFAULT_TASKS_PER_WORKER_THREAD,
SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
@@ -2678,6 +2708,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
LagAggregator.DEFAULT,
null,
null,
+ null,
null
)
{
@@ -2741,7 +2772,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
lagAggregator,
null,
null,
- 1 // ensure this is overridden
+ 1, // ensure this is overridden
+ null
)
{
};
@@ -2773,6 +2805,72 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
verifyAll();
}
+ @Test
+ public void
testComputeUnassignedServerPriorities_whenMultipleReplicasPerPriorityIsSet()
+ {
+ final SeekableStreamSupervisorIOConfig ioConfig =
createSupervisorIOConfig(5, Map.of(10, 2, 20, 3));
+
+ Assert.assertEquals(5, (int) ioConfig.getReplicas());
+
+ EasyMock.reset(spec);
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
+ replayAll();
+
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+
+ // No tasks assigned yet, need all 5 replicas
+ final SeekableStreamSupervisor<String, String, ByteEntity>.TaskGroup
taskGroup1 =
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 0,
+ ImmutableMap.of("0", "0"),
+ null,
+ null,
+ Set.of(),
+ Set.of(),
+ Map.of()
+ );
+
+ Assert.assertEquals(List.of(20, 20, 20, 10, 10),
supervisor.computeUnassignedServerPriorities(taskGroup1, 5));
+
+ // Two tasks with priority 10 already assigned, need 3 more replicas
+ final SeekableStreamSupervisor<String, String, ByteEntity>.TaskGroup
taskGroup2 =
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 1,
+ ImmutableMap.of("0", "0"),
+ null,
+ null,
+ Set.of("task1", "task2"),
+ Set.of(),
+ Map.of("task1", 10, "task2", 10)
+ );
+
+ Assert.assertEquals(List.of(20, 20, 20),
supervisor.computeUnassignedServerPriorities(taskGroup2, 3));
+
+ // Two tasks with priority 10 and one with priority 20 assigned, need 2
more replicas
+ final SeekableStreamSupervisor<String, String, ByteEntity>.TaskGroup
taskGroup3 =
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 2,
+ ImmutableMap.of("0", "0"),
+ null,
+ null,
+ Set.of("task1", "task2", "task3"),
+ Set.of(),
+ Map.of("task1", 10, "task2", 10, "task3", 20)
+ );
+
+ Assert.assertEquals(List.of(20, 20),
supervisor.computeUnassignedServerPriorities(taskGroup3, 2));
+
+ verifyAll();
+ }
+
private static DataSchema getDataSchema()
{
List<DimensionSchema> dimensions = new ArrayList<>();
@@ -2796,18 +2894,27 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig()
{
- return createSupervisorIOConfig(1,
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class));
+ return createSupervisorIOConfig(1,
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class), null);
}
private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig(
int taskCount,
- @Nullable AutoScalerConfig autoScalerConfig
+ @Nullable Map<Integer, Integer> serverPriorityToReplicas
+ )
+ {
+ return createSupervisorIOConfig(taskCount, null, serverPriorityToReplicas);
+ }
+
+ private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig(
+ int taskCount,
+ @Nullable AutoScalerConfig autoScalerConfig,
+ @Nullable Map<Integer, Integer> serverPriorityToReplicas
)
{
return new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
- 1,
+ serverPriorityToReplicas == null ? 1 :
serverPriorityToReplicas.values().stream().mapToInt(Integer::intValue).sum(),
taskCount,
new Period("PT1H"),
new Period("P1D"),
@@ -2820,7 +2927,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
LagAggregator.DEFAULT,
null,
null,
- null
+ null,
+ serverPriorityToReplicas
)
{
};
@@ -2851,7 +2959,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
return createSupervisorTuningConfigWithWorkerThreads(1);
}
- private static SeekableStreamSupervisorTuningConfig
createSupervisorTuningConfig()
+ public static SeekableStreamSupervisorTuningConfig
createSupervisorTuningConfig()
{
return createSupervisorTuningConfigWithWorkerThreads(null);
}
@@ -3020,7 +3128,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
TreeMap<Integer, Map<String, String>> sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
- RowIngestionMetersFactory rowIngestionMetersFactory
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ @Nullable List<Integer> serverPrioritiesToAssign
)
{
return ImmutableList.of(new TestSeekableStreamIndexTask(
@@ -3375,4 +3484,102 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
{
};
}
+
+ @Test
+ public void testDiscoverExistingTasks_withServerPriorities()
+ {
+ final SeekableStreamSupervisorIOConfig ioConfig =
createSupervisorIOConfig(5, Map.of(10, 2, 20, 3));
+
+ Assert.assertEquals(5, (int) ioConfig.getReplicas());
+
+ final SeekableStreamIndexTaskIOConfig taskIoConfig = createTaskIoConfigExt(
+ 0,
+ Map.of("0", "0"),
+ Map.of("0", "10"),
+ "test",
+ DateTimes.nowUtc(),
+ null,
+ Set.of(),
+ ioConfig
+ );
+
+ final TestSeekableStreamIndexTask task1 = createTestTask("task1", "0", 20,
taskIoConfig, recordSupplier);
+ final TestSeekableStreamIndexTask task2 = createTestTask("task2", "0", 20,
taskIoConfig, recordSupplier);
+ final TestSeekableStreamIndexTask task3 = createTestTask("task3", "0", 10,
taskIoConfig, recordSupplier);
+
+ // Reset the spec mock to return our custom ioConfig
+ EasyMock.reset(spec);
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
+
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of("0")).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("task1")).andReturn(Optional.of(TaskStatus.running("task1"))).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("task2")).andReturn(Optional.of(TaskStatus.running("task2"))).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("task3")).andReturn(Optional.of(TaskStatus.running("task3"))).anyTimes();
+
EasyMock.expect(taskStorage.getTask("task1")).andReturn(Optional.of(task1)).anyTimes();
+
EasyMock.expect(taskStorage.getTask("task2")).andReturn(Optional.of(task2)).anyTimes();
+
EasyMock.expect(taskStorage.getTask("task3")).andReturn(Optional.of(task3)).anyTimes();
+
+ EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE))
+ .andReturn(Map.of(task1.getId(), task1, task2.getId(), task2,
task3.getId(), task3))
+ .anyTimes();
+
+ EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.anyString(),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(new TreeMap<>()))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.getStatusAsync(EasyMock.anyString()))
+
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.getStartTimeAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+ .anyTimes();
+
EasyMock.expect(indexTaskClient.getCurrentOffsetsAsync(EasyMock.anyString(),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of("0", "5")))
+ .anyTimes();
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes();
+
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+ replayAll();
+
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor();
+ supervisor.start();
+ supervisor.runInternal();
+
+ // Verify that tasks were discovered and their server priorities were
captured
+ SeekableStreamSupervisor.TaskGroup taskGroup =
supervisor.getActiveTaskGroup(0);
+ Assert.assertEquals(0, taskGroup.groupId);
+ Assert.assertEquals(3, taskGroup.tasks.size());
+
+ // Verify server priorities were correctly stored
+ Map<String, Integer> taskIdToServerPriority =
taskGroup.taskIdToServerPriority;
+ Assert.assertEquals(Integer.valueOf(20),
taskIdToServerPriority.get("task1"));
+ Assert.assertEquals(Integer.valueOf(20),
taskIdToServerPriority.get("task2"));
+ Assert.assertEquals(Integer.valueOf(10),
taskIdToServerPriority.get("task3"));
+
+ verifyAll();
+ }
+
+
+ private static TestSeekableStreamIndexTask createTestTask(String taskId,
String groupId, @Nullable Integer serverPriority,
SeekableStreamIndexTaskIOConfig taskIoConfig, RecordSupplier recordSupplier)
+ {
+ return new TestSeekableStreamIndexTask(
+ taskId,
+ null,
+ null,
+ getDataSchema(),
+ getTuningConfig().convertToTaskTuningConfig(),
+ taskIoConfig,
+ null,
+ groupId,
+ null,
+ recordSupplier,
+ serverPriority
+ );
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
index 1d814b63461..35d063e88e1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
@@ -177,7 +177,8 @@ public abstract class SeekableStreamSupervisorTestBase
TreeMap<Integer, Map<String, String>> sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
- RowIngestionMetersFactory rowIngestionMetersFactory
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ @Nullable List<Integer> serverPrioritiesToAssign
)
{
return null;
@@ -535,6 +536,7 @@ public abstract class SeekableStreamSupervisorTestBase
LagAggregator.DEFAULT,
null,
null,
+ null,
null
)
{
diff --git a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java
b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java
index 962b716a5b2..de3812a33ee 100644
--- a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java
+++ b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java
@@ -437,7 +437,7 @@ public class CliPeonTest
)
{
- super(id, supervisorId, taskResource, dataSchema, tuningConfig,
ioConfig, context, groupId);
+ super(id, supervisorId, taskResource, dataSchema, tuningConfig,
ioConfig, context, groupId, null);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]