This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d2dbb8b2c0 Fix infinite checkpointing between tasks and overlord 
(#13825)
d2dbb8b2c0 is described below

commit d2dbb8b2c02b3de9c7049d03bd59b03839aa209b
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Wed Feb 22 19:25:59 2023 +0530

    Fix infinite checkpointing between tasks and overlord (#13825)
    
    If the intermediate handoff period is less than the task duration and there 
is no new data in the input topic, task will continuously checkpoint the same 
offsets again and again. This PR fixes that bug by resetting the checkpoint 
time even when the task receives the same end offset request again.
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 57 ++++++++++++++++++++++
 .../SeekableStreamIndexTaskRunner.java             | 10 +++-
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 54f4523e02..0ba17cdc38 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -932,6 +932,63 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     );
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointResetWithSameEndOffsets() throws Exception
+  {
+    final String baseSequenceName = "sequence0";
+    // as soon as any segment hits maxRowsPerSegment or 
intermediateHandoffPeriod, incremental publishing should happen
+    maxRowsPerSegment = Integer.MAX_VALUE;
+    intermediateHandoffPeriod = new Period().withMillis(10);
+
+    // Insert data
+    insertData();
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final SeekableStreamStartSequenceNumbers<Integer, Long> startPartitions = 
new SeekableStreamStartSequenceNumbers<>(
+        topic,
+        ImmutableMap.of(0, 0L, 1, 0L),
+        ImmutableSet.of()
+    );
+    final SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions = new 
SeekableStreamEndSequenceNumbers<>(
+        topic,
+        ImmutableMap.of(0, 2L, 1, 0L)
+    );
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            INPUT_FORMAT,
+            null
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+
+    // task will pause for checkpointing
+    while (task.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    long currentNextCheckpointTime = task.getRunner().getNextCheckpointTime();
+    final Map<Integer, Long> nextEndOffsets = 
task.getRunner().getLastSequenceMetadata().getStartOffsets();
+    task.getRunner().setEndOffsets(nextEndOffsets, false);
+    long newNextCheckpointTime = task.getRunner().getNextCheckpointTime();
+    Assert.assertTrue(
+        StringUtils.format(
+            "Old checkpoint time: [%d], new checkpoint time: [%d]",
+            currentNextCheckpointTime,
+            newNextCheckpointTime),
+        newNextCheckpointTime > currentNextCheckpointTime);
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+  }
+
   DataSourceMetadata newDataSchemaMetadata()
   {
     return 
metadataStorageCoordinator.retrieveDataSourceMetadata(NEW_DATA_SCHEMA.getDataSource());
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index e509699d5c..794fab2ec5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1222,7 +1222,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     sequences.add(sequenceMetadata);
   }
 
-  private SequenceMetadata<PartitionIdType, SequenceOffsetType> 
getLastSequenceMetadata()
+  @VisibleForTesting
+  public SequenceMetadata<PartitionIdType, SequenceOffsetType> 
getLastSequenceMetadata()
   {
     Preconditions.checkState(!sequences.isEmpty(), "Empty sequences");
     return sequences.get(sequences.size() - 1);
@@ -1651,6 +1652,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
              && !finish)
             || (latestSequence.getEndOffsets().equals(sequenceNumbers) && 
finish)) {
           log.warn("Ignoring duplicate request, end offsets already set for 
sequences [%s]", sequenceNumbers);
+          resetNextCheckpointTime();
           resume();
           return Response.ok(sequenceNumbers).build();
         } else if (latestSequence.isCheckpointed()) {
@@ -1872,6 +1874,12 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return startTime;
   }
 
+  @VisibleForTesting
+  public long getNextCheckpointTime()
+  {
+    return nextCheckpointTime;
+  }
+
   /**
    * This method does two things:
    * <p>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to