This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new d0abf5c fix kafka index task doesn't resume when recieve duplicate
request (#6990)
d0abf5c is described below
commit d0abf5c20a4c7e3a8f5ade03ca4efec2bc742094
Author: Mingming Qiu <[email protected]>
AuthorDate: Wed Feb 13 05:24:28 2019 +0800
fix kafka index task doesn't resume when recieve duplicate request (#6990)
* fix kafka index task doesn't resume when recieve duplicate request
* add unit test
---
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 43 ++++++++++++++++++++++
.../SeekableStreamIndexTaskRunner.java | 1 +
2 files changed, 44 insertions(+)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 04d3802..b7b3896 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2127,6 +2127,49 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1",
desc2));
}
+ @Test(timeout = 60_000L)
+ public void testRunWithDuplicateRequest() throws Exception
+ {
+ // Insert data
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ for (ProducerRecord<byte[], byte[]> record : records) {
+ kafkaProducer.send(record).get();
+ }
+ }
+
+ final KafkaIndexTask task = createTask(
+ null,
+ new KafkaIndexTaskIOConfig(
+ 0,
+ "sequence0",
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)),
+ new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)),
+ kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ true,
+ null,
+ null,
+ false
+ )
+ );
+
+ runTask(task);
+
+ while (!task.getRunner().getStatus().equals(Status.READING)) {
+ Thread.sleep(20);
+ }
+
+ // first setEndOffsets request
+ task.getRunner().pause();
+ task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true);
+ Assert.assertEquals(Status.READING, task.getRunner().getStatus());
+
+ // duplicate setEndOffsets request
+ task.getRunner().pause();
+ task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true);
+ Assert.assertEquals(Status.READING, task.getRunner().getStatus());
+ }
+
private ListenableFuture<TaskStatus> runTask(final Task task)
{
try {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 8572649..c86a2b5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1403,6 +1403,7 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
exclusivePartitions) && !finish) ||
(latestSequence.getEndOffsets().equals(sequenceNumbers) &&
finish)) {
log.warn("Ignoring duplicate request, end sequences already set for
sequences [%s]", sequenceNumbers);
+ resume();
return Response.ok(sequenceNumbers).build();
} else if (latestSequence.isCheckpointed()) {
return Response.status(Response.Status.BAD_REQUEST)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]