This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d2dbb8b2c0 Fix infinite checkpointing between tasks and overlord
(#13825)
d2dbb8b2c0 is described below
commit d2dbb8b2c02b3de9c7049d03bd59b03839aa209b
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Wed Feb 22 19:25:59 2023 +0530
Fix infinite checkpointing between tasks and overlord (#13825)
If the intermediate handoff period is less than the task duration and there
is no new data in the input topic, task will continuously checkpoint the same
offsets again and again. This PR fixes that bug by resetting the checkpoint
time even when the task receives the same end offset request again.
---
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 57 ++++++++++++++++++++++
.../SeekableStreamIndexTaskRunner.java | 10 +++-
2 files changed, 66 insertions(+), 1 deletion(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 54f4523e02..0ba17cdc38 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -932,6 +932,63 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
);
}
+ @Test(timeout = 60_000L)
+ public void testCheckpointResetWithSameEndOffsets() throws Exception
+ {
+ final String baseSequenceName = "sequence0";
+ // as soon as any segment hits maxRowsPerSegment or
intermediateHandoffPeriod, incremental publishing should happen
+ maxRowsPerSegment = Integer.MAX_VALUE;
+ intermediateHandoffPeriod = new Period().withMillis(10);
+
+ // Insert data
+ insertData();
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+ consumerProps.put("max.poll.records", "1");
+
+ final SeekableStreamStartSequenceNumbers<Integer, Long> startPartitions =
new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ ImmutableMap.of(0, 0L, 1, 0L),
+ ImmutableSet.of()
+ );
+ final SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions = new
SeekableStreamEndSequenceNumbers<>(
+ topic,
+ ImmutableMap.of(0, 2L, 1, 0L)
+ );
+ final KafkaIndexTask task = createTask(
+ null,
+ new KafkaIndexTaskIOConfig(
+ 0,
+ baseSequenceName,
+ startPartitions,
+ endPartitions,
+ consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ true,
+ null,
+ null,
+ INPUT_FORMAT,
+ null
+ )
+ );
+ final ListenableFuture<TaskStatus> future = runTask(task);
+
+ // task will pause for checkpointing
+ while (task.getRunner().getStatus() != Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ long currentNextCheckpointTime = task.getRunner().getNextCheckpointTime();
+ final Map<Integer, Long> nextEndOffsets =
task.getRunner().getLastSequenceMetadata().getStartOffsets();
+ task.getRunner().setEndOffsets(nextEndOffsets, false);
+ long newNextCheckpointTime = task.getRunner().getNextCheckpointTime();
+ Assert.assertTrue(
+ StringUtils.format(
+ "Old checkpoint time: [%d], new checkpoint time: [%d]",
+ currentNextCheckpointTime,
+ newNextCheckpointTime),
+ newNextCheckpointTime > currentNextCheckpointTime);
+ Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+ }
+
DataSourceMetadata newDataSchemaMetadata()
{
return
metadataStorageCoordinator.retrieveDataSourceMetadata(NEW_DATA_SCHEMA.getDataSource());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index e509699d5c..794fab2ec5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1222,7 +1222,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
sequences.add(sequenceMetadata);
}
- private SequenceMetadata<PartitionIdType, SequenceOffsetType>
getLastSequenceMetadata()
+ @VisibleForTesting
+ public SequenceMetadata<PartitionIdType, SequenceOffsetType>
getLastSequenceMetadata()
{
Preconditions.checkState(!sequences.isEmpty(), "Empty sequences");
return sequences.get(sequences.size() - 1);
@@ -1651,6 +1652,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
&& !finish)
|| (latestSequence.getEndOffsets().equals(sequenceNumbers) &&
finish)) {
log.warn("Ignoring duplicate request, end offsets already set for
sequences [%s]", sequenceNumbers);
+ resetNextCheckpointTime();
resume();
return Response.ok(sequenceNumbers).build();
} else if (latestSequence.isCheckpointed()) {
@@ -1872,6 +1874,12 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return startTime;
}
+ @VisibleForTesting
+ public long getNextCheckpointTime()
+ {
+ return nextCheckpointTime;
+ }
+
/**
* This method does two things:
* <p>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]