This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4ba3cf52215 Add test to verify sequence name of Kafka task (#15397)
4ba3cf52215 is described below
commit 4ba3cf52215443e30b7033252321d0ee98118333
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Nov 21 10:17:32 2023 +0530
Add test to verify sequence name of Kafka task (#15397)
* Add test to verify sequence name of Kafka and Kinesis tasks
---
.../kafka/supervisor/KafkaSupervisorTest.java | 108 ++++++++++++++++++++-
.../kinesis/supervisor/KinesisSupervisorTest.java | 77 ++++++++++++++-
.../supervisor/SeekableStreamSupervisor.java | 2 +-
3 files changed, 178 insertions(+), 9 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 2ff8c5bf91e..3087bdcbb1a 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -3966,7 +3966,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
DateTime minMessageTime = DateTimes.nowUtc();
DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
- KafkaSupervisor supervisor = getSupervisor(
+ KafkaSupervisor supervisor = createSupervisor(
2,
1,
true,
@@ -4143,7 +4143,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
replayAll();
Assert.assertTrue(supervisor.isTaskCurrent(42, "id0", taskMap));
-
Assert.assertTrue(supervisor.isTaskCurrent(42, "id1", taskMap));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id2", taskMap));
Assert.assertFalse(supervisor.isTaskCurrent(42, "id3", taskMap));
@@ -4151,6 +4150,106 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
+ @Test
+ public void testSequenceNameDoesNotChangeWithTaskId()
+ {
+ final DateTime minMessageTime = DateTimes.nowUtc();
+ final DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
+
+ KafkaSupervisor supervisor = createSupervisor(
+ 2,
+ 1,
+ true,
+ "PT1H",
+ new Period("P1D"),
+ new Period("P1D"),
+ false,
+ kafkaHost,
+ dataSchema,
+ new KafkaSupervisorTuningConfig(
+ null,
+ 1000,
+ null,
+ null,
+ 50000,
+ null,
+ new Period("P1Y"),
+ null,
+ null,
+ null,
+ false,
+ null,
+ false,
+ null,
+ numThreads,
+ TEST_CHAT_RETRIES,
+ TEST_HTTP_TIMEOUT,
+ TEST_SHUTDOWN_TIMEOUT,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ );
+
+ // Create task1 with some start and end offsets
+ final KafkaIndexTask task1 = createKafkaIndexTask(
+ "id0",
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
+ minMessageTime,
+ maxMessageTime,
+ dataSchema,
+ supervisor.getTuningConfig()
+ );
+
+ // Create task2 with same offsets
+ final KafkaIndexTask task2 = createKafkaIndexTask(
+ "id1",
+ 0,
+ task1.getIOConfig().getStartSequenceNumbers(),
+ task1.getIOConfig().getEndSequenceNumbers(),
+ task1.getIOConfig().getMinimumMessageTime().get(),
+ task1.getIOConfig().getMaximumMessageTime().get(),
+ dataSchema,
+ supervisor.getTuningConfig()
+ );
+
+ replayAll();
+
+ final String sequenceTask1 = supervisor.generateSequenceName(
+
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+ task1.getIOConfig().getMinimumMessageTime(),
+ task1.getIOConfig().getMaximumMessageTime(),
+ task1.getDataSchema(),
+ task1.getTuningConfig()
+ );
+ Assert.assertNotNull(sequenceTask1);
+
+ final String sequenceTask2 = supervisor.generateSequenceName(
+
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+ task2.getIOConfig().getMinimumMessageTime(),
+ task2.getIOConfig().getMaximumMessageTime(),
+ task2.getDataSchema(),
+ task2.getTuningConfig()
+ );
+ Assert.assertNotNull(sequenceTask2);
+
+ Assert.assertNotEquals(task1.getId(), task2.getId());
+ Assert.assertEquals(sequenceTask1, sequenceTask2);
+
+ verifyAll();
+ }
+
@Test
public void testResumeAllActivelyReadingTasks() throws Exception
{
@@ -4711,8 +4810,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
/**
* Use when you don't want generateSequenceNumber overridden
*/
-
- private KafkaSupervisor getSupervisor(
+ private KafkaSupervisor createSupervisor(
int replicas,
int taskCount,
boolean useEarliestOffset,
@@ -4992,7 +5090,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
@Override
- protected String generateSequenceName(
+ public String generateSequenceName(
Map<KafkaTopicPartition, Long> startPartitions,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime,
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 39d943dbebe..303eb5eff38 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -3922,7 +3922,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
DateTime minMessageTime = DateTimes.nowUtc();
DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
- KinesisSupervisor supervisor = getSupervisor(
+ KinesisSupervisor supervisor = createSupervisor(
1,
1,
true,
@@ -4066,6 +4066,77 @@ public class KinesisSupervisorTest extends
EasyMockSupport
verifyAll();
}
+ @Test
+ public void testSequenceNameDoesNotChangeWithTaskId()
+ {
+ final DateTime minMessageTime = DateTimes.nowUtc();
+ final DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
+
+ KinesisSupervisor supervisor = createSupervisor(
+ 1,
+ 1,
+ true,
+ "PT1H",
+ new Period("P1D"),
+ new Period("P1D"),
+ false,
+ 42,
+ 42,
+ dataSchema,
+ tuningConfig
+ );
+
+ // Create task1 with some start and end offsets
+ final KinesisIndexTask task1 = createKinesisIndexTask(
+ "id0",
+ 0,
+ new SeekableStreamStartSequenceNumbers<>("stream",
ImmutableMap.of(SHARD_ID1, "3"), ImmutableSet.of()),
+ new SeekableStreamEndSequenceNumbers<>(
+ "stream",
+ ImmutableMap.of(SHARD_ID1,
KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER)
+ ),
+ minMessageTime,
+ maxMessageTime,
+ dataSchema
+ );
+
+ // Create task2 with same offsets
+ final KinesisIndexTask task2 = createKinesisIndexTask(
+ "id1",
+ 0,
+ task1.getIOConfig().getStartSequenceNumbers(),
+ task1.getIOConfig().getEndSequenceNumbers(),
+ task1.getIOConfig().getMinimumMessageTime().get(),
+ task1.getIOConfig().getMaximumMessageTime().get(),
+ dataSchema
+ );
+
+ replayAll();
+
+ final String sequenceTask1 = supervisor.generateSequenceName(
+
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+ task1.getIOConfig().getMinimumMessageTime(),
+ task1.getIOConfig().getMaximumMessageTime(),
+ task1.getDataSchema(),
+ task1.getTuningConfig()
+ );
+ Assert.assertNotNull(sequenceTask1);
+
+ final String sequenceTask2 = supervisor.generateSequenceName(
+
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+ task2.getIOConfig().getMinimumMessageTime(),
+ task2.getIOConfig().getMaximumMessageTime(),
+ task2.getDataSchema(),
+ task2.getTuningConfig()
+ );
+ Assert.assertNotNull(sequenceTask2);
+
+ Assert.assertNotEquals(task1.getId(), task2.getId());
+ Assert.assertEquals(sequenceTask1, sequenceTask2);
+
+ verifyAll();
+ }
+
@Test
public void testShardSplit() throws Exception
{
@@ -5317,7 +5388,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
/**
* Use for tests where you don't want generateSequenceName to be overridden
out
*/
- private KinesisSupervisor getSupervisor(
+ private KinesisSupervisor createSupervisor(
int replicas,
int taskCount,
boolean useEarliestOffset,
@@ -5572,7 +5643,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Override
- protected String generateSequenceName(
+ public String generateSequenceName(
Map<String, String> startPartitions,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index a0ec6d809eb..906f7155665 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -2608,7 +2608,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@VisibleForTesting
- protected String generateSequenceName(
+ public String generateSequenceName(
Map<PartitionIdType, SequenceOffsetType> startPartitions,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]