clintropolis commented on a change in pull request #7267: Logic adjustments to
SeekableStreamIndexTaskRunner.
URL: https://github.com/apache/incubator-druid/pull/7267#discussion_r265746339
##########
File path:
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -313,6 +313,116 @@ public KafkaIndexTaskTest(boolean
isIncrementalHandoffSupported)
);
}
+ @Test(timeout = 60_000L)
+ public void testRestoreAtEndOffset() throws Exception
+ {
+ if (!isIncrementalHandoffSupported) {
+ return;
+ }
+
+ records = generateSinglePartitionRecords(topic);
+ maxRowsPerSegment = 2;
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+ consumerProps.put("max.poll.records", "1");
+
+ final KafkaIndexTask task1 = createTask(
+ null,
+ new KafkaIndexTaskIOConfig(
+ 0,
+ "sequence0",
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)),
+ consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ true,
+ null,
+ null
+ )
+ );
+
+ final SeekableStreamPartitions<Integer, Long> checkpoint = new
SeekableStreamPartitions<>(
+ topic,
+ ImmutableMap.of(0, 5L)
+ );
+
+ final ListenableFuture<TaskStatus> future1 = runTask(task1);
+
+ // Insert some data, but not enough for the task to finish
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records,
5)) {
+ kafkaProducer.send(record).get();
+ }
+ kafkaProducer.commitTransaction();
+ }
+
+ while (task1.getRunner().getStatus() != Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ final Map<Integer, Long> currentOffsets =
ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
+ Assert.assertEquals(checkpoint.getPartitionSequenceNumberMap(),
currentOffsets);
+ // Set endOffsets to persist sequences
+ task1.getRunner().setEndOffsets(ImmutableMap.of(0, 5L), false);
+
+ // Stop without publishing segment
+ task1.stopGracefully(toolboxFactory.build(task1).getConfig());
+ unlockAppenderatorBasePersistDirForTask(task1);
+
+ Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
+
+ // Start a new task
+ final KafkaIndexTask task2 = createTask(
+ task1.getId(),
+ new KafkaIndexTaskIOConfig(
+ 0,
+ "sequence0",
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)),
+ consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ true,
+ null,
+ null
+ )
+ );
+
+ final ListenableFuture<TaskStatus> future2 = runTask(task2);
Review comment:
The actual bug here was that if a task was given a 'bad' end offset that was
a kafka transactional topic control offset instead of a record, and was right
after the last read good offset, that the task would get stuck in an infinite
read loop due to the `continue` statements in the loop that were removed in
this PR. I think this test should _either_ be removed since it _shouldn't_
happen in practice, or be renamed to like
`testDoesntGetStuckWithTransactionOffset` and maybe slightly reworked and
commented to clear this up.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]