This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d0abf5c  fix kafka index task doesn't resume when recieve duplicate 
request (#6990)
d0abf5c is described below

commit d0abf5c20a4c7e3a8f5ade03ca4efec2bc742094
Author: Mingming Qiu <[email protected]>
AuthorDate: Wed Feb 13 05:24:28 2019 +0800

    fix kafka index task doesn't resume when recieve duplicate request (#6990)
    
    * fix kafka index task doesn't resume when recieve duplicate request
    
    * add unit test
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 43 ++++++++++++++++++++++
 .../SeekableStreamIndexTaskRunner.java             |  1 +
 2 files changed, 44 insertions(+)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 04d3802..b7b3896 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2127,6 +2127,49 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", 
desc2));
   }
 
+  @Test(timeout = 60_000L)
+  public void testRunWithDuplicateRequest() throws Exception
+  {
+    // Insert data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+      for (ProducerRecord<byte[], byte[]> record : records) {
+        kafkaProducer.send(record).get();
+      }
+    }
+
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence0",
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)),
+            kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            false
+        )
+    );
+
+    runTask(task);
+
+    while (!task.getRunner().getStatus().equals(Status.READING)) {
+      Thread.sleep(20);
+    }
+
+    // first setEndOffsets request
+    task.getRunner().pause();
+    task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true);
+    Assert.assertEquals(Status.READING, task.getRunner().getStatus());
+
+    // duplicate setEndOffsets request
+    task.getRunner().pause();
+    task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true);
+    Assert.assertEquals(Status.READING, task.getRunner().getStatus());
+  }
+
   private ListenableFuture<TaskStatus> runTask(final Task task)
   {
     try {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 8572649..c86a2b5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1403,6 +1403,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
             exclusivePartitions) && !finish) ||
             (latestSequence.getEndOffsets().equals(sequenceNumbers) && 
finish)) {
           log.warn("Ignoring duplicate request, end sequences already set for 
sequences [%s]", sequenceNumbers);
+          resume();
           return Response.ok(sequenceNumbers).build();
         } else if (latestSequence.isCheckpointed()) {
           return Response.status(Response.Status.BAD_REQUEST)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to