This is an automated email from the ASF dual-hosted git repository.
asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c020272 Fix KafkaRecordSupplier assign (#7260)
c020272 is described below
commit c020272adda784dd02c8ac1b108a57d6d2045d1c
Author: Jonathan Wei <[email protected]>
AuthorDate: Wed Mar 13 23:36:14 2019 -0700
Fix KafkaRecordSupplier assign (#7260)
* Fix KafkaRecordSupplier assign
* TeamCity fix
---
.../druid/indexing/kafka/KafkaRecordSupplier.java | 1 -
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 63 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 1 deletion(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index 6c3d053..be25c49 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -69,7 +69,6 @@ public class KafkaRecordSupplier implements
RecordSupplier<Integer, Long>
.stream()
.map(x -> new TopicPartition(x.getStream(),
x.getPartitionId()))
.collect(Collectors.toSet()));
- seekToEarliest(streamPartitions);
}
@Override
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 070396f..461ac9e 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2330,6 +2330,69 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1",
desc4));
}
+ @Test(timeout = 60_000L)
+ public void testCanStartFromLaterThanEarliestOffset() throws Exception
+ {
+ if (!isIncrementalHandoffSupported) {
+ return;
+ }
+ final String baseSequenceName = "sequence0";
+ maxRowsPerSegment = Integer.MAX_VALUE;
+ maxTotalRows = null;
+
+ // Insert data
+ int numToAdd = records.size();
+
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ for (ProducerRecord<byte[], byte[]> record : records) {
+ kafkaProducer.send(record).get();
+ }
+ kafkaProducer.commitTransaction();
+ }
+
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+ consumerProps.put("max.poll.records", "1");
+
+ final SeekableStreamPartitions<Integer, Long> startPartitions = new
SeekableStreamPartitions<>(
+ topic,
+ ImmutableMap.of(
+ 0,
+ 0L,
+ 1,
+ 1L
+ )
+ );
+
+ final SeekableStreamPartitions<Integer, Long> endPartitions = new
SeekableStreamPartitions<>(
+ topic,
+ ImmutableMap.of(
+ 0,
+ 10L,
+ 1,
+ 2L
+ )
+ );
+
+ final KafkaIndexTask task = createTask(
+ null,
+ new KafkaIndexTaskIOConfig(
+ 0,
+ baseSequenceName,
+ startPartitions,
+ endPartitions,
+ consumerProps,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ true,
+ null,
+ null
+ )
+ );
+ final ListenableFuture<TaskStatus> future = runTask(task);
+ Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+ }
+
private List<ScanResultValue> scanData(final Task task, QuerySegmentSpec
spec)
{
ScanQuery query = new Druids.ScanQueryBuilder().dataSource(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]