jihoonson commented on a change in pull request #7267: Logic adjustments to 
SeekableStreamIndexTaskRunner.
URL: https://github.com/apache/incubator-druid/pull/7267#discussion_r265729962
 
 

 ##########
 File path: 
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 ##########
 @@ -313,6 +313,116 @@ public KafkaIndexTaskTest(boolean 
isIncrementalHandoffSupported)
     );
   }
 
+  @Test(timeout = 60_000L)
+  public void testRestoreAtEndOffset() throws Exception
+  {
+    if (!isIncrementalHandoffSupported) {
+      return;
+    }
+
+    records = generateSinglePartitionRecords(topic);
+    maxRowsPerSegment = 2;
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final KafkaIndexTask task1 = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence0",
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)),
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null
+        )
+    );
+
+    final SeekableStreamPartitions<Integer, Long> checkpoint = new 
SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(0, 5L)
+    );
+
+    final ListenableFuture<TaskStatus> future1 = runTask(task1);
+
+    // Insert some data, but not enough for the task to finish
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 
5)) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
+    }
+
+    while (task1.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    final Map<Integer, Long> currentOffsets = 
ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint.getPartitionSequenceNumberMap(), 
currentOffsets);
+    // Set endOffsets to persist sequences
+    task1.getRunner().setEndOffsets(ImmutableMap.of(0, 5L), false);
+
+    // Stop without publishing segment
+    task1.stopGracefully(toolboxFactory.build(task1).getConfig());
+    unlockAppenderatorBasePersistDirForTask(task1);
+
+    Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
+
+    // Start a new task
+    final KafkaIndexTask task2 = createTask(
+        task1.getId(),
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence0",
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)),
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null
+        )
+    );
+
+    final ListenableFuture<TaskStatus> future2 = runTask(task2);
 
 Review comment:
   Would you please add a comment about why task2 reads nothing? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to