This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new acdda58  Fix KafkaRecordSupplier assign (#7260) (#7265)
acdda58 is described below

commit acdda58d232668d06cb0e32ef08d20f9c24fa284
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Mar 14 17:11:57 2019 -0700

    Fix KafkaRecordSupplier assign (#7260) (#7265)
    
    * Fix KafkaRecordSupplier assign
    
    * TeamCity fix
---
 .../druid/indexing/kafka/KafkaRecordSupplier.java  |  1 -
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 59 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index 935404c..60aea3c 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -68,7 +68,6 @@ public class KafkaRecordSupplier implements 
RecordSupplier<Integer, Long>
                         .stream()
                         .map(x -> new TopicPartition(x.getStream(), 
x.getPartitionId()))
                         .collect(Collectors.toSet()));
-    seekToEarliest(streamPartitions);
   }
 
   @Override
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index a2224e5..d2f7373 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2295,6 +2295,65 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(Status.READING, task.getRunner().getStatus());
   }
 
+  @Test(timeout = 60_000L)
+  public void testCanStartFromLaterThanEarliestOffset() throws Exception
+  {
+    if (!isIncrementalHandoffSupported) {
+      return;
+    }
+    final String baseSequenceName = "sequence0";
+    maxRowsPerSegment = Integer.MAX_VALUE;
+    maxTotalRows = null;
+
+    // Insert data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+      for (ProducerRecord<byte[], byte[]> record : records) {
+        kafkaProducer.send(record).get();
+      }
+    }
+
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final SeekableStreamPartitions<Integer, Long> startPartitions = new 
SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(
+            0,
+            0L,
+            1,
+            1L
+        )
+    );
+
+    final SeekableStreamPartitions<Integer, Long> endPartitions = new 
SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(
+            0,
+            10L,
+            1,
+            2L
+        )
+    );
+
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            false
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+  }
+
   private ListenableFuture<TaskStatus> runTask(final Task task)
   {
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to