abhishekrb19 commented on code in PR #13825:
URL: https://github.com/apache/druid/pull/13825#discussion_r1113187325


##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java:
##########
@@ -932,6 +932,63 @@ public void testTimeBasedIncrementalHandOff() throws 
Exception
     );
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointResetWithSameEndOffsets() throws Exception
+  {
+    final String baseSequenceName = "sequence0";
+    // as soon as any segment hits maxRowsPerSegment or 
intermediateHandoffPeriod, incremental publishing should happen
+    maxRowsPerSegment = Integer.MAX_VALUE;
+    intermediateHandoffPeriod = new Period().withMillis(10);
+
+    // Insert data
+    insertData();
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final SeekableStreamStartSequenceNumbers<Integer, Long> startPartitions = 
new SeekableStreamStartSequenceNumbers<>(
+        topic,
+        ImmutableMap.of(0, 0L, 1, 0L),
+        ImmutableSet.of()
+    );
+    final SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions = new 
SeekableStreamEndSequenceNumbers<>(
+        topic,
+        ImmutableMap.of(0, 2L, 1, 0L)
+    );
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            INPUT_FORMAT,
+            null
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+
+    // task will pause for checkpointing
+    while (task.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    long currentNextCheckpointTime = task.getRunner().getNextCheckpointTime();
+    final Map<Integer, Long> nextEndOffsets = 
task.getRunner().getLastSequenceMetadata().getStartOffsets();
+    task.getRunner().setEndOffsets(nextEndOffsets, false);
+    long newNextCheckpointTime = task.getRunner().getNextCheckpointTime();
+    Assert.assertTrue(
+        StringUtils.format(
+            "Old checkpoint time: [%s], new checkpoint time: [%s]",

Review Comment:
   `%d` since the timestamps here are longs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to