This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ba3cf52215 Add test to verify sequence name of Kafka task (#15397)
4ba3cf52215 is described below

commit 4ba3cf52215443e30b7033252321d0ee98118333
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Nov 21 10:17:32 2023 +0530

    Add test to verify sequence name of Kafka task (#15397)
    
    * Add test to verify sequence name of Kafka and Kinesis tasks
---
 .../kafka/supervisor/KafkaSupervisorTest.java      | 108 ++++++++++++++++++++-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  77 ++++++++++++++-
 .../supervisor/SeekableStreamSupervisor.java       |   2 +-
 3 files changed, 178 insertions(+), 9 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 2ff8c5bf91e..3087bdcbb1a 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -3966,7 +3966,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     DateTime minMessageTime = DateTimes.nowUtc();
     DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
 
-    KafkaSupervisor supervisor = getSupervisor(
+    KafkaSupervisor supervisor = createSupervisor(
         2,
         1,
         true,
@@ -4143,7 +4143,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
     replayAll();
 
     Assert.assertTrue(supervisor.isTaskCurrent(42, "id0", taskMap));
-
     Assert.assertTrue(supervisor.isTaskCurrent(42, "id1", taskMap));
     Assert.assertFalse(supervisor.isTaskCurrent(42, "id2", taskMap));
     Assert.assertFalse(supervisor.isTaskCurrent(42, "id3", taskMap));
@@ -4151,6 +4150,106 @@ public class KafkaSupervisorTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testSequenceNameDoesNotChangeWithTaskId()
+  {
+    final DateTime minMessageTime = DateTimes.nowUtc();
+    final DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
+
+    KafkaSupervisor supervisor = createSupervisor(
+        2,
+        1,
+        true,
+        "PT1H",
+        new Period("P1D"),
+        new Period("P1D"),
+        false,
+        kafkaHost,
+        dataSchema,
+        new KafkaSupervisorTuningConfig(
+            null,
+            1000,
+            null,
+            null,
+            50000,
+            null,
+            new Period("P1Y"),
+            null,
+            null,
+            null,
+            false,
+            null,
+            false,
+            null,
+            numThreads,
+            TEST_CHAT_RETRIES,
+            TEST_HTTP_TIMEOUT,
+            TEST_SHUTDOWN_TIMEOUT,
+            null,
+            null,
+            null,
+            null,
+            null
+        )
+    );
+
+    // Create task1 with some start and end offsets
+    final KafkaIndexTask task1 = createKafkaIndexTask(
+        "id0",
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            singlePartitionMap(topic, 0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
+        minMessageTime,
+        maxMessageTime,
+        dataSchema,
+        supervisor.getTuningConfig()
+    );
+
+    // Create task2 with same offsets
+    final KafkaIndexTask task2 = createKafkaIndexTask(
+        "id1",
+        0,
+        task1.getIOConfig().getStartSequenceNumbers(),
+        task1.getIOConfig().getEndSequenceNumbers(),
+        task1.getIOConfig().getMinimumMessageTime().get(),
+        task1.getIOConfig().getMaximumMessageTime().get(),
+        dataSchema,
+        supervisor.getTuningConfig()
+    );
+
+    replayAll();
+
+    final String sequenceTask1 = supervisor.generateSequenceName(
+        
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+        task1.getIOConfig().getMinimumMessageTime(),
+        task1.getIOConfig().getMaximumMessageTime(),
+        task1.getDataSchema(),
+        task1.getTuningConfig()
+    );
+    Assert.assertNotNull(sequenceTask1);
+
+    final String sequenceTask2 = supervisor.generateSequenceName(
+        
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+        task2.getIOConfig().getMinimumMessageTime(),
+        task2.getIOConfig().getMaximumMessageTime(),
+        task2.getDataSchema(),
+        task2.getTuningConfig()
+    );
+    Assert.assertNotNull(sequenceTask2);
+
+    Assert.assertNotEquals(task1.getId(), task2.getId());
+    Assert.assertEquals(sequenceTask1, sequenceTask2);
+
+    verifyAll();
+  }
+
   @Test
   public void testResumeAllActivelyReadingTasks() throws Exception
   {
@@ -4711,8 +4810,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   /**
    * Use when you don't want generateSequenceNumber overridden
    */
-
-  private KafkaSupervisor getSupervisor(
+  private KafkaSupervisor createSupervisor(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -4992,7 +5090,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     }
 
     @Override
-    protected String generateSequenceName(
+    public String generateSequenceName(
         Map<KafkaTopicPartition, Long> startPartitions,
         Optional<DateTime> minimumMessageTime,
         Optional<DateTime> maximumMessageTime,
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 39d943dbebe..303eb5eff38 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -3922,7 +3922,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     DateTime minMessageTime = DateTimes.nowUtc();
     DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
 
-    KinesisSupervisor supervisor = getSupervisor(
+    KinesisSupervisor supervisor = createSupervisor(
         1,
         1,
         true,
@@ -4066,6 +4066,77 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testSequenceNameDoesNotChangeWithTaskId()
+  {
+    final DateTime minMessageTime = DateTimes.nowUtc();
+    final DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
+
+    KinesisSupervisor supervisor = createSupervisor(
+        1,
+        1,
+        true,
+        "PT1H",
+        new Period("P1D"),
+        new Period("P1D"),
+        false,
+        42,
+        42,
+        dataSchema,
+        tuningConfig
+    );
+
+    // Create task1 with some start and end offsets
+    final KinesisIndexTask task1 = createKinesisIndexTask(
+        "id0",
+        0,
+        new SeekableStreamStartSequenceNumbers<>("stream", 
ImmutableMap.of(SHARD_ID1, "3"), ImmutableSet.of()),
+        new SeekableStreamEndSequenceNumbers<>(
+            "stream",
+            ImmutableMap.of(SHARD_ID1, 
KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER)
+        ),
+        minMessageTime,
+        maxMessageTime,
+        dataSchema
+    );
+
+    // Create task2 with same offsets
+    final KinesisIndexTask task2 = createKinesisIndexTask(
+        "id1",
+        0,
+        task1.getIOConfig().getStartSequenceNumbers(),
+        task1.getIOConfig().getEndSequenceNumbers(),
+        task1.getIOConfig().getMinimumMessageTime().get(),
+        task1.getIOConfig().getMaximumMessageTime().get(),
+        dataSchema
+    );
+
+    replayAll();
+
+    final String sequenceTask1 = supervisor.generateSequenceName(
+        
task1.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+        task1.getIOConfig().getMinimumMessageTime(),
+        task1.getIOConfig().getMaximumMessageTime(),
+        task1.getDataSchema(),
+        task1.getTuningConfig()
+    );
+    Assert.assertNotNull(sequenceTask1);
+
+    final String sequenceTask2 = supervisor.generateSequenceName(
+        
task2.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+        task2.getIOConfig().getMinimumMessageTime(),
+        task2.getIOConfig().getMaximumMessageTime(),
+        task2.getDataSchema(),
+        task2.getTuningConfig()
+    );
+    Assert.assertNotNull(sequenceTask2);
+
+    Assert.assertNotEquals(task1.getId(), task2.getId());
+    Assert.assertEquals(sequenceTask1, sequenceTask2);
+
+    verifyAll();
+  }
+
   @Test
   public void testShardSplit() throws Exception
   {
@@ -5317,7 +5388,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
   /**
    * Use for tests where you don't want generateSequenceName to be overridden 
out
    */
-  private KinesisSupervisor getSupervisor(
+  private KinesisSupervisor createSupervisor(
       int replicas,
       int taskCount,
       boolean useEarliestOffset,
@@ -5572,7 +5643,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     }
 
     @Override
-    protected String generateSequenceName(
+    public String generateSequenceName(
         Map<String, String> startPartitions,
         Optional<DateTime> minimumMessageTime,
         Optional<DateTime> maximumMessageTime,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index a0ec6d809eb..906f7155665 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -2608,7 +2608,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   }
 
   @VisibleForTesting
-  protected String generateSequenceName(
+  public String generateSequenceName(
       Map<PartitionIdType, SequenceOffsetType> startPartitions,
       Optional<DateTime> minimumMessageTime,
       Optional<DateTime> maximumMessageTime,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to