This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 60daddedf8b SeekableStreamSupervisor: Use workerExec as the client
connectExec. (#17394)
60daddedf8b is described below
commit 60daddedf8b0a9faf1edeb4603eb4b8e86fd3dc2
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Oct 22 20:21:21 2024 -0700
SeekableStreamSupervisor: Use workerExec as the client connectExec. (#17394)
* SeekableStreamSupervisor: Use workerExec as the client connectExec.
This patch uses the already-existing per-supervisor workerExec as the
connectExec for task clients, rather than using the process-wide default
ServiceClientFactory pool.
This helps prevent callbacks from backlogging on the process-wide pool.
It's especially useful for retries, where callbacks may need to establish
new TCP connections or perform TLS handshakes.
* Fix compilation, tests.
* Fix style.
---
.../RabbitStreamIndexTaskClientFactory.java | 6 +-
.../kafka/KafkaIndexTaskClientFactory.java | 6 +-
.../kafka/supervisor/KafkaSupervisorTest.java | 999 +++++++++++++++------
.../kinesis/KinesisIndexTaskClientFactory.java | 6 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 24 +-
.../SeekableStreamIndexTaskClientFactory.java | 27 +-
.../supervisor/SeekableStreamSupervisor.java | 36 +-
.../SeekableStreamSupervisorStateTest.java | 2 +-
8 files changed, 785 insertions(+), 321 deletions(-)
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java
index 83cdde8191f..088dc6b77d5 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskClientFactory.java
@@ -25,17 +25,17 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
-import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.java.util.http.client.HttpClient;
@LazySingleton
public class RabbitStreamIndexTaskClientFactory extends
SeekableStreamIndexTaskClientFactory<String, Long>
{
@Inject
public RabbitStreamIndexTaskClientFactory(
- @EscalatedGlobal ServiceClientFactory serviceClientFactory,
+ @EscalatedGlobal HttpClient httpClient,
@Json ObjectMapper mapper)
{
- super(serviceClientFactory, mapper);
+ super(httpClient, mapper);
}
@Override
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java
index 73a880b524b..10b57dd4757 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientFactory.java
@@ -26,18 +26,18 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
-import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.java.util.http.client.HttpClient;
@LazySingleton
public class KafkaIndexTaskClientFactory extends
SeekableStreamIndexTaskClientFactory<KafkaTopicPartition, Long>
{
@Inject
public KafkaIndexTaskClientFactory(
- @EscalatedGlobal ServiceClientFactory serviceClientFactory,
+ @EscalatedGlobal HttpClient httpClient,
@Json ObjectMapper mapper
)
{
- super(serviceClientFactory, mapper);
+ super(httpClient, mapper);
}
@Override
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index e436b8cd56a..7926a0568fd 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -127,6 +127,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
@RunWith(Parameterized.class)
public class KafkaSupervisorTest extends EasyMockSupport
@@ -202,10 +203,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
zkServer.getConnectString(),
null,
1,
- ImmutableMap.of("num.partitions",
- String.valueOf(NUM_PARTITIONS),
- "auto.create.topics.enable",
- String.valueOf(false)
+ ImmutableMap.of(
+ "num.partitions",
+ String.valueOf(NUM_PARTITIONS),
+ "auto.create.topics.enable",
+ String.valueOf(false)
)
);
kafkaServer.start();
@@ -267,11 +269,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
final String dataSource,
final TaskInfoProvider taskInfoProvider,
- final int maxNumTasks,
- final SeekableStreamSupervisorTuningConfig tuningConfig
+ final SeekableStreamSupervisorTuningConfig tuningConfig,
+ final ScheduledExecutorService connectExec
)
{
- Assert.assertEquals(replicas * taskCountMax, maxNumTasks);
Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(),
tuningConfig.getHttpTimeout());
Assert.assertEquals(TEST_CHAT_RETRIES, (long)
tuningConfig.getChatRetries());
return taskClient;
@@ -299,53 +300,53 @@ public class KafkaSupervisorTest extends EasyMockSupport
consumerProperties.put("bootstrap.servers", kafkaHost);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new
KafkaSupervisorIOConfig(
- topic,
- null,
- INPUT_FORMAT,
- replicas,
- 1,
- new Period("PT1H"),
- consumerProperties,
- OBJECT_MAPPER.convertValue(autoScalerConfig,
LagBasedAutoScalerConfig.class),
- KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
- new Period("P1D"),
- new Period("PT30S"),
- true,
- new Period("PT30M"),
- null,
- null,
- null,
- null,
- new IdleConfig(true, 1000L),
- 1
+ topic,
+ null,
+ INPUT_FORMAT,
+ replicas,
+ 1,
+ new Period("PT1H"),
+ consumerProperties,
+ OBJECT_MAPPER.convertValue(autoScalerConfig,
LagBasedAutoScalerConfig.class),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ new Period("P1D"),
+ new Period("PT30S"),
+ true,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ new IdleConfig(true, 1000L),
+ 1
);
final KafkaSupervisorTuningConfig tuningConfigOri = new
KafkaSupervisorTuningConfig(
- null,
- 1000,
- null,
- null,
- 50000,
- null,
- new Period("P1Y"),
- null,
- null,
- null,
- false,
- null,
- false,
- null,
- numThreads,
- TEST_CHAT_RETRIES,
- TEST_HTTP_TIMEOUT,
- TEST_SHUTDOWN_TIMEOUT,
- null,
- null,
- null,
- null,
- null,
- null,
- null
+ null,
+ 1000,
+ null,
+ null,
+ 50000,
+ null,
+ new Period("P1Y"),
+ null,
+ null,
+ null,
+ false,
+ null,
+ false,
+ null,
+ numThreads,
+ TEST_CHAT_RETRIES,
+ TEST_HTTP_TIMEOUT,
+ TEST_SHUTDOWN_TIMEOUT,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
);
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
@@ -354,31 +355,31 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.replay(ingestionSchema);
SeekableStreamSupervisorSpec testableSupervisorSpec = new
KafkaSupervisorSpec(
- ingestionSchema,
- dataSchema,
- tuningConfigOri,
- kafkaSupervisorIOConfig,
- null,
- false,
- taskStorage,
- taskMaster,
- indexerMetadataStorageCoordinator,
- taskClientFactory,
- OBJECT_MAPPER,
- new NoopServiceEmitter(),
- new DruidMonitorSchedulerConfig(),
- rowIngestionMetersFactory,
- new SupervisorStateManagerConfig()
+ ingestionSchema,
+ dataSchema,
+ tuningConfigOri,
+ kafkaSupervisorIOConfig,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ new NoopServiceEmitter(),
+ new DruidMonitorSchedulerConfig(),
+ rowIngestionMetersFactory,
+ new SupervisorStateManagerConfig()
);
supervisor = new TestableKafkaSupervisor(
- taskStorage,
- taskMaster,
- indexerMetadataStorageCoordinator,
- taskClientFactory,
- OBJECT_MAPPER,
- (KafkaSupervisorSpec) testableSupervisorSpec,
- rowIngestionMetersFactory
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ (KafkaSupervisorSpec) testableSupervisorSpec,
+ rowIngestionMetersFactory
);
SupervisorTaskAutoScaler autoscaler =
testableSupervisorSpec.createAutoscaler(supervisor);
@@ -393,9 +394,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
- new KafkaDataSourceMetadata(
- null
- )
+ new KafkaDataSourceMetadata(
+ null
+ )
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
@@ -425,22 +426,43 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertFalse("maximumMessageTime",
taskConfig.getMaximumMessageTime().isPresent());
Assert.assertEquals(topic,
taskConfig.getStartSequenceNumbers().getStream());
- Assert.assertEquals(0L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)));
- Assert.assertEquals(0L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)));
- Assert.assertEquals(0L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)));
+ Assert.assertEquals(
+ 0L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ );
+ Assert.assertEquals(
+ 0L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ );
+ Assert.assertEquals(
+ 0L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ );
Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream());
Assert.assertEquals(
- Long.MAX_VALUE,
- (long)
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0))
+ Long.MAX_VALUE,
+ (long) taskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
);
Assert.assertEquals(
- Long.MAX_VALUE,
- (long)
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1))
+ Long.MAX_VALUE,
+ (long) taskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
);
Assert.assertEquals(
- Long.MAX_VALUE,
- (long)
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2))
+ Long.MAX_VALUE,
+ (long) taskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
);
Assert.assertEquals(
Collections.singleton(new ResourceAction(
@@ -543,22 +565,43 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertFalse("maximumMessageTime",
taskConfig.getMaximumMessageTime().isPresent());
Assert.assertEquals(topic,
taskConfig.getStartSequenceNumbers().getStream());
- Assert.assertEquals(0L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)));
- Assert.assertEquals(0L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)));
- Assert.assertEquals(0L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)));
+ Assert.assertEquals(
+ 0L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ );
+ Assert.assertEquals(
+ 0L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ );
+ Assert.assertEquals(
+ 0L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ );
Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream());
Assert.assertEquals(
Long.MAX_VALUE,
- (long)
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0))
+ (long) taskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
);
Assert.assertEquals(
Long.MAX_VALUE,
- (long)
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1))
+ (long) taskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
);
Assert.assertEquals(
Long.MAX_VALUE,
- (long)
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2))
+ (long) taskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
);
}
@@ -613,19 +656,35 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(2,
task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
Assert.assertEquals(
0L,
-
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ task1.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
Long.MAX_VALUE,
-
task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ task1.getIOConfig()
+ .getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
0L,
-
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ task1.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
Assert.assertEquals(
Long.MAX_VALUE,
-
task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ task1.getIOConfig()
+ .getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
KafkaIndexTask task2 = captured.getValues().get(1);
@@ -633,11 +692,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(1,
task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
Assert.assertEquals(
0L,
-
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ task2.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
Long.MAX_VALUE,
-
task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ task2.getIOConfig()
+ .getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
}
@@ -668,15 +735,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(3,
task1.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
Assert.assertEquals(
0L,
-
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ task1.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
0L,
-
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ task1.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
0L,
-
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ task1.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
KafkaIndexTask task2 = captured.getValues().get(1);
@@ -684,15 +763,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(3,
task2.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap().size());
Assert.assertEquals(
0L,
-
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ task2.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
0L,
-
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ task2.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
0L,
-
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ task2.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
}
@@ -802,15 +893,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIndexTask task = captured.getValue();
Assert.assertEquals(
1101L,
-
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ task.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
1101L,
-
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ task.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
1101L,
-
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ task.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
}
@@ -867,15 +970,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIndexTask task = captured.getValue();
Assert.assertEquals(
10,
-
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ task.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
10,
-
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ task.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
10,
-
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ task.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
addMoreEvents(9, 6);
@@ -899,15 +1014,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
task = newcaptured.getValue();
Assert.assertEquals(
0,
-
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 3)).longValue()
+ task.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 3))
+ .longValue()
);
Assert.assertEquals(
0,
-
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 4)).longValue()
+ task.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 4))
+ .longValue()
);
Assert.assertEquals(
0,
-
task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 5)).longValue()
+ task.getIOConfig()
+ .getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 5))
+ .longValue()
);
}
@@ -928,7 +1055,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
- new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of())
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ )
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
@@ -943,15 +1074,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertEquals(
10L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
20L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
30L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
}
@@ -991,15 +1131,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertEquals(
10L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(true, topic, 0)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(true, topic, 0))
+ .longValue()
);
Assert.assertEquals(
20L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(true, topic, 1)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(true, topic, 1))
+ .longValue()
);
Assert.assertEquals(
30L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(true, topic, 2)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(true, topic, 2))
+ .longValue()
);
Assert.assertEquals(3,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
}
@@ -1040,15 +1189,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertEquals(
0L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(true, topic, 0)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(true, topic, 0))
+ .longValue()
);
Assert.assertEquals(
0L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(true, topic, 1)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(true, topic, 1))
+ .longValue()
);
Assert.assertEquals(
0L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(true, topic, 2)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(true, topic, 2))
+ .longValue()
);
Assert.assertEquals(3,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
}
@@ -1074,7 +1232,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
partitionSequenceNumberMap.putAll(multiTopicPartitionMap("notMatch", 0,
10L, 1, 20L, 2, 30L));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
- new SeekableStreamStartSequenceNumbers<>(topicPattern,
partitionSequenceNumberMap.build(), ImmutableSet.of())
+ new SeekableStreamStartSequenceNumbers<>(
+ topicPattern,
+ partitionSequenceNumberMap.build(),
+ ImmutableSet.of()
+ )
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
@@ -1089,15 +1251,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertEquals(
10L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
20L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
30L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
Assert.assertEquals(3,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
}
@@ -1139,15 +1310,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertEquals(
10L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(true, topic, 0)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(true, topic, 0))
+ .longValue()
);
Assert.assertEquals(
20L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(true, topic, 1)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(true, topic, 1))
+ .longValue()
);
Assert.assertEquals(
30L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(true, topic, 2)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(true, topic, 2))
+ .longValue()
);
Assert.assertEquals(3,
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
}
@@ -1239,7 +1419,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 2, 0L), ImmutableSet.of()),
- new SeekableStreamEndSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
null,
null,
tuningConfig
@@ -1258,7 +1441,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id3",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -1272,7 +1459,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 1, 0L), ImmutableSet.of()),
- new SeekableStreamEndSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE)),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE)
+ ),
null,
null,
tuningConfig
@@ -1282,7 +1472,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 1, 0L, 2, 0L), ImmutableSet.of()),
- new SeekableStreamEndSequenceNumbers<>("topic",
singlePartitionMap(topic, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
null,
null,
tuningConfig
@@ -1443,7 +1636,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 2, 0L), ImmutableSet.of()),
- new SeekableStreamEndSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
now,
maxi,
supervisor.getTuningConfig()
@@ -1726,8 +1922,18 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
Assert.assertEquals(topic,
taskConfig.getStartSequenceNumbers().getStream());
- Assert.assertEquals(10L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)));
- Assert.assertEquals(35L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)));
+ Assert.assertEquals(
+ 10L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ );
+ Assert.assertEquals(
+ 35L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ );
}
}
@@ -1744,7 +1950,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -1825,29 +2035,47 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(topic,
capturedTaskConfig.getStartSequenceNumbers().getStream());
Assert.assertEquals(
10L,
-
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ capturedTaskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
20L,
-
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ capturedTaskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
30L,
-
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ capturedTaskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
Assert.assertEquals(topic,
capturedTaskConfig.getEndSequenceNumbers().getStream());
Assert.assertEquals(
Long.MAX_VALUE,
-
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ capturedTaskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
Long.MAX_VALUE,
-
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ capturedTaskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
Long.MAX_VALUE,
-
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ capturedTaskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
}
@@ -1865,7 +2093,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 2, 0L), ImmutableSet.of()),
- new SeekableStreamEndSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
null,
null,
supervisor.getTuningConfig()
@@ -1936,29 +2167,47 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(topic,
capturedTaskConfig.getStartSequenceNumbers().getStream());
Assert.assertEquals(
10L,
-
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ capturedTaskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
0L,
-
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ capturedTaskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
30L,
-
capturedTaskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ capturedTaskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
Assert.assertEquals(topic,
capturedTaskConfig.getEndSequenceNumbers().getStream());
Assert.assertEquals(
Long.MAX_VALUE,
-
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ capturedTaskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
Long.MAX_VALUE,
-
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ capturedTaskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
Long.MAX_VALUE,
-
capturedTaskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ capturedTaskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
}
@@ -1977,7 +2226,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -1991,7 +2244,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id2",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 1L, 1, 2L, 2, 3L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 1L, 1, 2L, 2, 3L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -2407,7 +2664,17 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
- new SeekableStreamEndSequenceNumbers<KafkaTopicPartition,
Long>(topic, singlePartitionMap(topic, 0, 2L, 1, 2L, 2, 2L))
+ new SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>(
+ topic,
+ singlePartitionMap(topic,
+ 0,
+ 2L,
+ 1,
+ 2L,
+ 2,
+ 2L
+ )
+ )
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
@@ -2455,7 +2722,17 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
- new SeekableStreamEndSequenceNumbers<KafkaTopicPartition,
Long>(topic, singlePartitionMap(topic, 0, 2L, 1, 2L, 2, 2L))
+ new SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>(
+ topic,
+ singlePartitionMap(topic,
+ 0,
+ 2L,
+ 1,
+ 2L,
+ 2,
+ 2L
+ )
+ )
)
).anyTimes();
@@ -2484,7 +2761,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
public void testSupervisorIsIdleIfStreamInactiveWhenSuspended() throws
Exception
{
Map<String, String> config = ImmutableMap.of("idleConfig.enabled", "false",
-
"idleConfig.inactiveAfterMillis", "200");
+
"idleConfig.inactiveAfterMillis", "200"
+ );
supervisorConfig = OBJECT_MAPPER.convertValue(config,
SupervisorStateManagerConfig.class);
supervisor = getTestableSupervisorForIdleBehaviour(
1,
@@ -2504,7 +2782,17 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
- new SeekableStreamEndSequenceNumbers<KafkaTopicPartition,
Long>(topic, singlePartitionMap(topic, 0, 2L, 1, 2L, 2, 2L))
+ new SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long>(
+ topic,
+ singlePartitionMap(topic,
+ 0,
+ 2L,
+ 1,
+ 2L,
+ 2,
+ 2L
+ )
+ )
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
@@ -2836,8 +3124,18 @@ public class KafkaSupervisorTest extends EasyMockSupport
for (Task task : captured.getValues()) {
KafkaIndexTaskIOConfig taskConfig = ((KafkaIndexTask)
task).getIOConfig();
- Assert.assertEquals(0L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)));
- Assert.assertEquals(0L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)));
+ Assert.assertEquals(
+ 0L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ );
+ Assert.assertEquals(
+ 0L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ );
}
}
@@ -2930,8 +3228,18 @@ public class KafkaSupervisorTest extends EasyMockSupport
for (Task task : captured.getValues()) {
KafkaIndexTaskIOConfig taskConfig = ((KafkaIndexTask)
task).getIOConfig();
- Assert.assertEquals(0L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)));
- Assert.assertEquals(0L, (long)
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)));
+ Assert.assertEquals(
+ 0L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ );
+ Assert.assertEquals(
+ 0L,
+ (long) taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ );
}
}
@@ -2972,7 +3280,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -2986,7 +3298,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id2",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3000,7 +3316,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id3",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3132,7 +3452,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
);
KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(
- new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 1, 1000L, 2, 1000L), ImmutableSet.of())
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 1, 1000L, 2, 1000L),
+ ImmutableSet.of()
+ )
);
KafkaDataSourceMetadata expectedMetadata = new KafkaDataSourceMetadata(
@@ -3249,7 +3573,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3263,7 +3591,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id2",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3277,7 +3609,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id3",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3355,7 +3691,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3369,7 +3709,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id2",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3383,7 +3727,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id3",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3459,7 +3807,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
topic,
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3473,7 +3825,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id2",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
topic,
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3487,7 +3843,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id3",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
topic,
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3517,8 +3877,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
EasyMock.expect(
-
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new
KafkaDataSourceMetadata(null)
- ).anyTimes();
+
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+ .andReturn(new KafkaDataSourceMetadata(null)
+ )
+ .anyTimes();
EasyMock.expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(Status.READING));
EasyMock.expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(Status.READING));
EasyMock.expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(Status.READING));
@@ -3574,7 +3936,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
topic,
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3588,7 +3954,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id2",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
topic,
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3602,7 +3972,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id3",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
topic,
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3624,8 +3998,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
EasyMock.expect(
-
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new
KafkaDataSourceMetadata(null)
- ).anyTimes();
+
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+ .andReturn(new KafkaDataSourceMetadata(null)
+ )
+ .anyTimes();
replayAll();
@@ -3703,7 +4079,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 0L, 1, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3717,7 +4097,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id2",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3731,7 +4115,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
"id3",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 10L, 1, 20L, 2, 30L),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2,
Long.MAX_VALUE)
@@ -3909,29 +4297,47 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(topic,
taskConfig.getStartSequenceNumbers().getStream());
Assert.assertEquals(
0L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
0L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
0L,
-
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ taskConfig.getStartSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream());
Assert.assertEquals(
Long.MAX_VALUE,
-
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 0)).longValue()
+ taskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 0))
+ .longValue()
);
Assert.assertEquals(
Long.MAX_VALUE,
-
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 1)).longValue()
+ taskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 1))
+ .longValue()
);
Assert.assertEquals(
Long.MAX_VALUE,
-
taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(new
KafkaTopicPartition(false, topic, 2)).longValue()
+ taskConfig.getEndSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .get(new KafkaTopicPartition(false, topic, 2))
+ .longValue()
);
}
@@ -4147,7 +4553,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
singlePartitionMap(topic, 0, 0L, 2, 0L),
ImmutableSet.of()
),
- new SeekableStreamEndSequenceNumbers<>("topic",
singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
null,
null,
supervisor.getTuningConfig()
@@ -4485,67 +4894,80 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.reset(taskClient);
addSomeEvents(100);
- KafkaIndexTask readingTask = createKafkaIndexTask("readingTask",
- DATASOURCE,
- 0,
- new
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 0L),
Collections.emptySet()),
- new
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 0,
Long.MAX_VALUE)),
- null,
- null,
-
supervisor.getTuningConfig()
- );
-
- KafkaIndexTask publishingTask = createKafkaIndexTask("publishingTask",
- DATASOURCE,
- 1,
- new
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 0, 0L),
Collections.emptySet()),
- new
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 0,
Long.MAX_VALUE)),
- null,
- null,
-
supervisor.getTuningConfig()
- );
-
- KafkaIndexTask pausedTask = createKafkaIndexTask("pausedTask",
- DATASOURCE,
- 1,
- new
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 0L),
Collections.emptySet()),
- new
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 1,
Long.MAX_VALUE)),
- null,
- null,
-
supervisor.getTuningConfig()
- );
-
- KafkaIndexTask failsToResumePausedTask =
createKafkaIndexTask("failsToResumePausedTask",
- DATASOURCE,
- 1,
- new
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 1, 0L),
Collections.emptySet()),
- new
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 1,
Long.MAX_VALUE)),
- null,
- null,
-
supervisor.getTuningConfig()
- );
-
- KafkaIndexTask waitingTask = createKafkaIndexTask("waitingTask",
- DATASOURCE,
- 2,
- new
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 2, 0L),
Collections.emptySet()),
- new
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 2,
Long.MAX_VALUE)),
- null,
- null,
-
supervisor.getTuningConfig()
- );
-
- KafkaIndexTask pendingTask = createKafkaIndexTask("pendingTask",
- DATASOURCE,
- 2,
- new
SeekableStreamStartSequenceNumbers<>(topic, singlePartitionMap(topic, 2, 0L),
Collections.emptySet()),
- new
SeekableStreamEndSequenceNumbers<>(topic, singlePartitionMap(topic, 2,
Long.MAX_VALUE)),
- null,
- null,
-
supervisor.getTuningConfig()
- );
-
- List<Task> tasks = ImmutableList.of(readingTask, publishingTask,
pausedTask, failsToResumePausedTask, waitingTask, pendingTask);
+ KafkaIndexTask readingTask = createKafkaIndexTask(
+ "readingTask",
+ DATASOURCE,
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, 0L), Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, Long.MAX_VALUE)),
+ null,
+ null,
+ supervisor.getTuningConfig()
+ );
+
+ KafkaIndexTask publishingTask = createKafkaIndexTask(
+ "publishingTask",
+ DATASOURCE,
+ 1,
+ new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, 0L), Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>(topic,
singlePartitionMap(topic, 0, Long.MAX_VALUE)),
+ null,
+ null,
+ supervisor.getTuningConfig()
+ );
+
+ KafkaIndexTask pausedTask = createKafkaIndexTask(
+ "pausedTask",
+ DATASOURCE,
+ 1,
+ new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 1, 0L), Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>(topic,
singlePartitionMap(topic, 1, Long.MAX_VALUE)),
+ null,
+ null,
+ supervisor.getTuningConfig()
+ );
+
+ KafkaIndexTask failsToResumePausedTask = createKafkaIndexTask(
+ "failsToResumePausedTask",
+ DATASOURCE,
+ 1,
+ new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 1, 0L), Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>(topic,
singlePartitionMap(topic, 1, Long.MAX_VALUE)),
+ null,
+ null,
+ supervisor.getTuningConfig()
+ );
+
+ KafkaIndexTask waitingTask = createKafkaIndexTask(
+ "waitingTask",
+ DATASOURCE,
+ 2,
+ new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 2, 0L), Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>(topic,
singlePartitionMap(topic, 2, Long.MAX_VALUE)),
+ null,
+ null,
+ supervisor.getTuningConfig()
+ );
+
+ KafkaIndexTask pendingTask = createKafkaIndexTask(
+ "pendingTask",
+ DATASOURCE,
+ 2,
+ new SeekableStreamStartSequenceNumbers<>(topic,
singlePartitionMap(topic, 2, 0L), Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>(topic,
singlePartitionMap(topic, 2, Long.MAX_VALUE)),
+ null,
+ null,
+ supervisor.getTuningConfig()
+ );
+
+ List<Task> tasks = ImmutableList.of(
+ readingTask,
+ publishingTask,
+ pausedTask,
+ failsToResumePausedTask,
+ waitingTask,
+ pendingTask
+ );
Collection taskRunnerWorkItems = ImmutableList.of(
new TestTaskRunnerWorkItem(readingTask, null,
TaskLocation.create("testHost", 1001, -1)),
new TestTaskRunnerWorkItem(publishingTask, null,
TaskLocation.create("testHost", 1002, -1)),
@@ -4641,7 +5063,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskClient.getStartTimeAsync(pausedTask.getId())).andReturn(Futures.immediateFuture(startTime));
EasyMock.expect(taskClient.resumeAsync(pausedTask.getId())).andReturn(Futures.immediateFuture(true));
-
EasyMock.expect(taskClient.getStartTimeAsync(failsToResumePausedTask.getId())).andReturn(Futures.immediateFuture(startTime));
+
EasyMock.expect(taskClient.getStartTimeAsync(failsToResumePausedTask.getId()))
+ .andReturn(Futures.immediateFuture(startTime));
EasyMock.expect(taskClient.resumeAsync(failsToResumePausedTask.getId())).andReturn(Futures.immediateFuture(false));
Capture<String> shutdownTaskId = EasyMock.newCapture();
@@ -4856,11 +5279,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
String dataSource,
TaskInfoProvider taskInfoProvider,
- int maxNumTasks,
- SeekableStreamSupervisorTuningConfig tuningConfig
+ SeekableStreamSupervisorTuningConfig tuningConfig,
+ ScheduledExecutorService connectExec
)
{
- Assert.assertEquals(replicas * taskCount, maxNumTasks);
Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(),
tuningConfig.getHttpTimeout());
Assert.assertEquals(TEST_CHAT_RETRIES, (long)
tuningConfig.getChatRetries());
return taskClient;
@@ -4971,11 +5393,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
String dataSource,
TaskInfoProvider taskInfoProvider,
- int maxNumTasks,
- SeekableStreamSupervisorTuningConfig tuningConfig
+ SeekableStreamSupervisorTuningConfig tuningConfig,
+ ScheduledExecutorService connectExec
)
{
- Assert.assertEquals(replicas * taskCount, maxNumTasks);
Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(),
tuningConfig.getHttpTimeout());
Assert.assertEquals(TEST_CHAT_RETRIES, (long)
tuningConfig.getChatRetries());
return taskClient;
@@ -5089,11 +5510,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
String dataSource,
TaskInfoProvider taskInfoProvider,
- int maxNumTasks,
- SeekableStreamSupervisorTuningConfig tuningConfig
+ SeekableStreamSupervisorTuningConfig tuningConfig,
+ ScheduledExecutorService connectExec
)
{
- Assert.assertEquals(replicas * taskCount, maxNumTasks);
Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(),
tuningConfig.getHttpTimeout());
Assert.assertEquals(TEST_CHAT_RETRIES, (long)
tuningConfig.getChatRetries());
return taskClient;
@@ -5227,30 +5647,63 @@ public class KafkaSupervisorTest extends EasyMockSupport
OBJECT_MAPPER
);
}
-
+
private static ImmutableMap<KafkaTopicPartition, Long>
singlePartitionMap(String topic, int partition, long offset)
{
return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition),
offset);
}
- private static ImmutableMap<KafkaTopicPartition, Long>
singlePartitionMap(String topic, int partition1, long offset1, int partition2,
long offset2)
+ private static ImmutableMap<KafkaTopicPartition, Long> singlePartitionMap(
+ String topic,
+ int partition1,
+ long offset1,
+ int partition2,
+ long offset2
+ )
{
- return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1),
offset1, new KafkaTopicPartition(false, topic, partition2),
- offset2);
+ return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1),
+ offset1,
+ new KafkaTopicPartition(false, topic, partition2),
+ offset2
+ );
}
- private static ImmutableMap<KafkaTopicPartition, Long>
singlePartitionMap(String topic, int partition1, long offset1,
- int partition2,
long offset2, int partition3, long offset3)
+ private static ImmutableMap<KafkaTopicPartition, Long> singlePartitionMap(
+ String topic,
+ int partition1,
+ long offset1,
+ int partition2,
+ long offset2,
+ int partition3,
+ long offset3
+ )
{
- return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1),
offset1, new KafkaTopicPartition(false, topic, partition2),
- offset2, new KafkaTopicPartition(false, topic,
partition3), offset3);
+ return ImmutableMap.of(new KafkaTopicPartition(false, topic, partition1),
+ offset1,
+ new KafkaTopicPartition(false, topic, partition2),
+ offset2,
+ new KafkaTopicPartition(false, topic, partition3),
+ offset3
+ );
}
- private static ImmutableMap<KafkaTopicPartition, Long>
multiTopicPartitionMap(String topic, int partition1, long offset1,
-
int partition2, long offset2, int partition3, long offset3)
+ private static ImmutableMap<KafkaTopicPartition, Long>
multiTopicPartitionMap(
+ String topic,
+ int partition1,
+ long offset1,
+ int partition2,
+ long offset2,
+ int partition3,
+ long offset3
+ )
{
- return ImmutableMap.of(new KafkaTopicPartition(true, topic, partition1),
offset1, new KafkaTopicPartition(true, topic, partition2),
- offset2, new KafkaTopicPartition(true, topic, partition3), offset3);
+ return ImmutableMap.of(new KafkaTopicPartition(true, topic, partition1),
+ offset1,
+ new KafkaTopicPartition(true, topic, partition2),
+ offset2,
+ new KafkaTopicPartition(true, topic, partition3),
+ offset3
+ );
}
private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java
index 2399008688c..599c32b2158 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientFactory.java
@@ -25,18 +25,18 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
-import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.java.util.http.client.HttpClient;
@LazySingleton
public class KinesisIndexTaskClientFactory extends
SeekableStreamIndexTaskClientFactory<String, String>
{
@Inject
public KinesisIndexTaskClientFactory(
- @EscalatedGlobal ServiceClientFactory serviceClientFactory,
+ @EscalatedGlobal HttpClient httpClient,
@Json ObjectMapper mapper
)
{
- super(serviceClientFactory, mapper);
+ super(httpClient, mapper);
}
@Override
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index fe851a183a2..122a8e1c5ae 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -111,6 +111,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
public class KinesisSupervisorTest extends EasyMockSupport
{
@@ -5117,11 +5118,10 @@ public class KinesisSupervisorTest extends
EasyMockSupport
public SeekableStreamIndexTaskClient<String, String> build(
String dataSource,
TaskInfoProvider taskInfoProvider,
- int maxNumTasks,
- SeekableStreamSupervisorTuningConfig tuningConfig
+ SeekableStreamSupervisorTuningConfig tuningConfig,
+ ScheduledExecutorService connectExec
)
{
- Assert.assertEquals(replicas * taskCount, maxNumTasks);
Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(),
tuningConfig.getHttpTimeout());
Assert.assertEquals(TEST_CHAT_RETRIES, (long)
tuningConfig.getChatRetries());
return taskClient;
@@ -5259,14 +5259,10 @@ public class KinesisSupervisorTest extends
EasyMockSupport
public SeekableStreamIndexTaskClient<String, String> build(
String dataSource,
TaskInfoProvider taskInfoProvider,
- int maxNumTasks,
- SeekableStreamSupervisorTuningConfig tuningConfig
+ SeekableStreamSupervisorTuningConfig tuningConfig,
+ ScheduledExecutorService connectExec
)
{
- Assert.assertEquals(
- replicas * (autoScalerConfig != null ?
autoScalerConfig.getTaskCountMax() : taskCount),
- maxNumTasks
- );
Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(),
tuningConfig.getHttpTimeout());
Assert.assertEquals(TEST_CHAT_RETRIES, (long)
tuningConfig.getChatRetries());
return taskClient;
@@ -5348,11 +5344,10 @@ public class KinesisSupervisorTest extends
EasyMockSupport
public SeekableStreamIndexTaskClient<String, String> build(
String dataSource,
TaskInfoProvider taskInfoProvider,
- int maxNumTasks,
- SeekableStreamSupervisorTuningConfig tuningConfig
+ SeekableStreamSupervisorTuningConfig tuningConfig,
+ ScheduledExecutorService connectExec
)
{
- Assert.assertEquals(replicas * taskCount, maxNumTasks);
Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(),
tuningConfig.getHttpTimeout());
Assert.assertEquals(TEST_CHAT_RETRIES, (long)
tuningConfig.getChatRetries());
return taskClient;
@@ -5436,11 +5431,10 @@ public class KinesisSupervisorTest extends
EasyMockSupport
public SeekableStreamIndexTaskClient<String, String> build(
String dataSource,
TaskInfoProvider taskInfoProvider,
- int maxNumTasks,
- SeekableStreamSupervisorTuningConfig tuningConfig
+ SeekableStreamSupervisorTuningConfig tuningConfig,
+ ScheduledExecutorService connectExec
)
{
- Assert.assertEquals(replicas * taskCount, maxNumTasks);
Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(),
tuningConfig.getHttpTimeout());
Assert.assertEquals(TEST_CHAT_RETRIES, (long)
tuningConfig.getChatRetries());
return taskClient;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java
index 5bdf8aaac39..c3b4e69f832 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientFactory.java
@@ -21,31 +21,46 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.TaskInfoProvider;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceClientFactoryImpl;
+
+import java.util.concurrent.ScheduledExecutorService;
public abstract class SeekableStreamIndexTaskClientFactory<PartitionIdType,
SequenceOffsetType>
{
private static final Logger log = new
Logger(SeekableStreamIndexTaskClientFactory.class);
- private final ServiceClientFactory serviceClientFactory;
+ private final HttpClient httpClient;
private final ObjectMapper jsonMapper;
protected SeekableStreamIndexTaskClientFactory(
- final ServiceClientFactory serviceClientFactory,
+ final HttpClient httpClient,
final ObjectMapper jsonMapper
)
{
- this.serviceClientFactory = serviceClientFactory;
+ this.httpClient = httpClient;
this.jsonMapper = jsonMapper;
}
+ /**
+ * Creates a task client for a specific supervisor.
+ *
+ * @param dataSource task datasource
+ * @param taskInfoProvider task locator
+ * @param tuningConfig from {@link SeekableStreamSupervisor#tuningConfig}
+ * @param connectExec should generally be {@link
SeekableStreamSupervisor#workerExec}. This is preferable to
+ * the global pool for the default {@link
ServiceClientFactory}, to prevent callbacks from
+ * different supervisors from backlogging each other.
+ */
public SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType>
build(
final String dataSource,
final TaskInfoProvider taskInfoProvider,
- final int maxNumTasks,
- final SeekableStreamSupervisorTuningConfig tuningConfig
+ final SeekableStreamSupervisorTuningConfig tuningConfig,
+ final ScheduledExecutorService connectExec
)
{
log.info(
@@ -57,7 +72,7 @@ public abstract class
SeekableStreamIndexTaskClientFactory<PartitionIdType, Sequ
return new SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType,
SequenceOffsetType>(
dataSource,
- serviceClientFactory,
+ new ServiceClientFactoryImpl(httpClient, connectExec),
taskInfoProvider,
jsonMapper,
tuningConfig.getHttpTimeout(),
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 86c4ba385bd..4cbf0ccfa68 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -33,7 +33,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -65,7 +65,6 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
-import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientAsyncImpl;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
@@ -86,6 +85,7 @@ import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
@@ -837,12 +837,25 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private final SeekableStreamIndexTaskTuningConfig taskTuningConfig;
private final String supervisorId;
private final TaskInfoProvider taskInfoProvider;
- private final long futureTimeoutInSeconds; // how long to wait for async
operations to complete
private final RowIngestionMetersFactory rowIngestionMetersFactory;
+ /**
+ * Single-threaded executor for running {@link Notice#handle()} from {@link
#notices}.
+ */
private final ExecutorService exec;
+ /**
+ * Single-threaded scheduled executor for adding periodic {@link RunNotice}
to {@link #notices}.
+ */
private final ScheduledExecutorService scheduledExec;
+ /**
+ * Single-threaded scheduled executor for reporting metircs on notice queue
size, lag, etc.
+ * See {@link #scheduleReporting}.
+ */
private final ScheduledExecutorService reportingExec;
- private final ListeningExecutorService workerExec;
+ /**
+ * Multi-threaded executor for managing communications with workers,
including handling callbacks from worker RPCs.
+ * Also serves as the connectExec for {@link #taskClient}.
+ */
+ private final ListeningScheduledExecutorService workerExec;
private final NoticesQueue<Notice> notices = new NoticesQueue<>();
private final Object stopLock = new Object();
private final Object stateChangeLock = new Object();
@@ -903,21 +916,16 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
int workerThreads;
- int maxNumTasks;
if (autoScalerConfig != null &&
autoScalerConfig.getEnableTaskAutoScaler()) {
log.info("Running Task autoscaler for datasource [%s]", dataSource);
workerThreads = (this.tuningConfig.getWorkerThreads() != null
? this.tuningConfig.getWorkerThreads()
: Math.min(10, autoScalerConfig.getTaskCountMax()));
-
- maxNumTasks = autoScalerConfig.getTaskCountMax() *
this.ioConfig.getReplicas();
} else {
workerThreads = (this.tuningConfig.getWorkerThreads() != null
? this.tuningConfig.getWorkerThreads()
: Math.min(10, this.ioConfig.getTaskCount()));
-
- maxNumTasks = this.ioConfig.getTaskCount() * this.ioConfig.getReplicas();
}
IdleConfig specIdleConfig = spec.getIoConfig().getIdleConfig();
@@ -938,7 +946,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
this.workerExec = MoreExecutors.listeningDecorator(
- Execs.multiThreaded(
+ ScheduledExecutors.fixed(
workerThreads,
StringUtils.encodeForFormat(supervisorId) + "-Worker-%d"
)
@@ -973,13 +981,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
};
- this.futureTimeoutInSeconds = Math.max(
- MINIMUM_FUTURE_TIMEOUT_IN_SECONDS,
- tuningConfig.getChatRetries() *
(tuningConfig.getHttpTimeout().getStandardSeconds()
- +
SeekableStreamIndexTaskClientAsyncImpl.MAX_RETRY_WAIT_SECONDS)
- );
-
- this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider,
maxNumTasks, this.tuningConfig);
+ this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider,
this.tuningConfig, workerExec);
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 40bbe84b623..af66ce3b8b9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -179,7 +179,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
EasyMock.expect(taskClientFactory.build(
EasyMock.anyString(),
EasyMock.anyObject(),
- EasyMock.anyInt(),
+ EasyMock.anyObject(),
EasyMock.anyObject()
)).andReturn(
indexTaskClient).anyTimes();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]