This is an automated email from the ASF dual-hosted git repository.
mkwhit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git
The following commit(s) were added to refs/heads/master by this push:
new 21cb425 CRUNCH-680: Kafka Source should split very large partitions
new 9b8849c Merge pull request #21 from noslowerdna/CRUNCH-680
21cb425 is described below
commit 21cb425610676c0ec05627d21f8dc01d51904589
Author: Andrew Olson <[email protected]>
AuthorDate: Fri Feb 22 13:34:32 2019 -0600
CRUNCH-680: Kafka Source should split very large partitions
---
.../crunch/kafka/record/KafkaInputFormat.java | 30 +++-
.../crunch/kafka/record/KafkaRecordReader.java | 36 ++++-
.../apache/crunch/kafka/record/KafkaSource.java | 3 +
.../crunch/kafka/record/KafkaInputFormatTest.java | 141 +++++++++++++++++
.../crunch/kafka/record/KafkaRecordReaderTest.java | 169 +++++++++++++++++++++
.../apache/crunch/kafka/record/KafkaSourceIT.java | 46 +++++-
6 files changed, 416 insertions(+), 9 deletions(-)
diff --git
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
index 4e378ce..2f1d139 100644
---
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
+++
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
@@ -89,6 +89,17 @@ public class KafkaInputFormat extends
InputFormat<ConsumerRecord<BytesWritable,
private static final String KAFKA_CONNECTION_PROPERTY_BASE =
"org.apache.crunch.kafka.connection.properties";
/**
+ * Configuration property for the maximum number of records per input split.
Partitions with more qualifying records than this
+ * limit will be divided into multiple splits.
+ */
+ public static final String KAFKA_MAX_RECORDS_PER_SPLIT =
"org.apache.crunch.kafka.split.max";
+
+ /**
+ * Default value for {@link #KAFKA_MAX_RECORDS_PER_SPLIT}
+ */
+ public static final long DEFAULT_KAFKA_MAX_RECORDS_PER_SPLIT = 5000000L;
+
+ /**
* Regex to discover all of the defined Kafka connection properties which
should be passed to the ConsumerConfig.
*/
private static final Pattern CONNECTION_PROPERTY_REGEX = Pattern
@@ -98,16 +109,27 @@ public class KafkaInputFormat extends
InputFormat<ConsumerRecord<BytesWritable,
@Override
public List<InputSplit> getSplits(JobContext jobContext) throws IOException,
InterruptedException {
- Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf());
+ Configuration conf = getConf();
+ long maxRecordsPerSplit = conf.getLong(KAFKA_MAX_RECORDS_PER_SPLIT,
DEFAULT_KAFKA_MAX_RECORDS_PER_SPLIT);
+ if (maxRecordsPerSplit < 1L) {
+ throw new IllegalArgumentException("Invalid " +
KAFKA_MAX_RECORDS_PER_SPLIT + " value [" + maxRecordsPerSplit + "]");
+ }
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(conf);
List<InputSplit> splits = new LinkedList<>();
for (Map.Entry<TopicPartition, Pair<Long, Long>> entry :
offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
long start = entry.getValue().first();
long end = entry.getValue().second();
- if (start != end) {
- splits.add(new KafkaInputSplit(topicPartition.topic(),
topicPartition.partition(), entry.getValue().first(),
- entry.getValue().second()));
+
+ // Chop up any excessively large partitions into multiple splits for
more balanced map task durations. This will
+ // also exclude any partitions with no records to read (where the start
offset equals the end offset).
+ long splitStart = start;
+ while (splitStart < end) {
+ long splitEnd = Math.min(splitStart + maxRecordsPerSplit, end);
+ splits.add(new KafkaInputSplit(topicPartition.topic(),
topicPartition.partition(), splitStart, splitEnd));
+ splitStart = splitEnd;
}
}
diff --git
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
index 754c9ce..3b7ac88 100644
---
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
+++
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
@@ -75,7 +75,7 @@ public class KafkaRecordReader<K, V> extends
RecordReader<ConsumerRecord<K, V>,
kafkaConnectionProperties =
filterConnectionProperties(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
- consumer = new KafkaConsumer<>(kafkaConnectionProperties);
+ consumer = buildConsumer(kafkaConnectionProperties);
KafkaInputSplit split = (KafkaInputSplit) inputSplit;
topicPartition = split.getTopicPartition();
@@ -102,6 +102,26 @@ public class KafkaRecordReader<K, V> extends
RecordReader<ConsumerRecord<K, V>,
maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY,
KAFKA_RETRY_ATTEMPTS_DEFAULT);
}
+ /**
+ * Builds a new kafka consumer
+ *
+ * @param properties
+ * the properties to configure the consumer
+ * @return a new kafka consumer
+ */
+ // Visible for testing
+ protected KafkaConsumer<K, V> buildConsumer(Properties properties) {
+ return new KafkaConsumer<>(properties);
+ }
+
+ /**
+ * @return the current offset for the reader
+ */
+ // Visible for testing
+ protected long getCurrentOffset() {
+ return currentOffset;
+ }
+
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (hasPendingData()) {
@@ -173,9 +193,18 @@ public class KafkaRecordReader<K, V> extends
RecordReader<ConsumerRecord<K, V>,
}
/**
+ * @return the record iterator used by the reader
+ */
+ // Visble for testing
+ protected Iterator<ConsumerRecord<K, V>> getRecordIterator() {
+ return recordIterator;
+ }
+
+ /**
* Loads new records into the record iterator
*/
- private void loadRecords() {
+ // Visible for testing
+ protected void loadRecords() {
if ((recordIterator == null) || !recordIterator.hasNext()) {
ConsumerRecords<K, V> records = null;
int numTries = 0;
@@ -212,7 +241,8 @@ public class KafkaRecordReader<K, V> extends
RecordReader<ConsumerRecord<K, V>,
if ((records == null) || records.isEmpty()){
LOG.info("No records retrieved from Kafka partition {} therefore
nothing to iterate over", topicPartition);
} else{
- LOG.info("Retrieved records {} from Kafka partition {} to iterate
over", records.count(), topicPartition);
+ LOG.info("Retrieved {} records from Kafka partition {} to iterate over
starting from offset {}", new Object[] {
+ records.count(), topicPartition,
records.iterator().next().offset()});
}
recordIterator = records != null ? records.iterator() :
ConsumerRecords.<K, V>empty().iterator();
diff --git
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
index 4d59edd..98964d8 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
@@ -163,6 +163,9 @@ public class KafkaSource
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
BytesDeserializer.class.getName());
+ //disable automatic committing of consumer offsets
+ props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
Boolean.toString(false));
+
return props;
}
diff --git
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaInputFormatTest.java
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaInputFormatTest.java
new file mode 100644
index 0000000..2062058
--- /dev/null
+++
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaInputFormatTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class KafkaInputFormatTest {
+
+ @Test
+ public void generateConnectionPropertyKeyTest() {
+ String propertyName = "some.property";
+ String actual =
KafkaInputFormat.generateConnectionPropertyKey(propertyName);
+ String expected =
"org.apache.crunch.kafka.connection.properties.some.property";
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void getConnectionPropertyFromKeyTest() {
+ String prefixedConnectionProperty =
"org.apache.crunch.kafka.connection.properties.some.property";
+ String actual =
KafkaInputFormat.getConnectionPropertyFromKey(prefixedConnectionProperty);
+ String expected = "some.property";
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void writeConnectionPropertiesToBundleTest() {
+ FormatBundle<KafkaInputFormat> actual =
FormatBundle.forInput(KafkaInputFormat.class);
+ Properties connectionProperties = new Properties();
+ connectionProperties.put("key1", "value1");
+ connectionProperties.put("key2", "value2");
+ KafkaInputFormat.writeConnectionPropertiesToBundle(connectionProperties,
actual);
+
+ FormatBundle<KafkaInputFormat> expected =
FormatBundle.forInput(KafkaInputFormat.class);
+ expected.set("org.apache.crunch.kafka.connection.properties.key1",
"value1");
+ expected.set("org.apache.crunch.kafka.connection.properties.key2",
"value2");
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void filterConnectionPropertiesTest() {
+ Properties props = new Properties();
+ props.put("org.apache.crunch.kafka.connection.properties.key1", "value1");
+ props.put("org.apache.crunch.kafka.connection.properties.key2", "value2");
+ props.put("org_apache_crunch_kafka_connection_properties.key3", "value3");
+ props.put("org.apache.crunch.another.prefix.properties.key4", "value4");
+
+ Properties actual = KafkaInputFormat.filterConnectionProperties(props);
+ Properties expected = new Properties();
+ expected.put("key1", "value1");
+ expected.put("key2", "value2");
+
+ assertEquals(expected, actual);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void getSplitsInvalidMaxRecords() throws IOException,
InterruptedException {
+ KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+ Configuration conf = new Configuration(false);
+ conf.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 0L);
+ kafkaInputFormat.setConf(conf);
+ kafkaInputFormat.getSplits(mock(JobContext.class));
+ }
+
+ @Test
+ public void getSplitsConfiguredMaxRecords() throws IOException,
InterruptedException {
+ KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+ Configuration conf = new Configuration(false);
+ conf.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 2L);
+
+ conf.set("org.apache.crunch.kafka.offsets.topic.abc.partitions", "0,1");
+
conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start",
300L);
+ conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end",
1000L);
+
conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start",
30L);
+ conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end",
100L);
+
+ conf.set("org.apache.crunch.kafka.offsets.topic.xyz.partitions", "0");
+
conf.setLong("org.apache.crunch.kafka.offsets.topic.xyz.partitions.0.start",
3L);
+ conf.setLong("org.apache.crunch.kafka.offsets.topic.xyz.partitions.0.end",
10L);
+
+ kafkaInputFormat.setConf(conf);
+ List<InputSplit> splits =
kafkaInputFormat.getSplits(mock(JobContext.class));
+ assertThat(splits.size(), is((700/2 + 700%2) + (70/2 + 70%2) + (7/2 +
7%2)));
+ }
+
+ @Test
+ public void getSplitsDefaultMaxRecords() throws IOException,
InterruptedException {
+ KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+ Configuration conf = new Configuration(false);
+
+ conf.set("org.apache.crunch.kafka.offsets.topic.abc.partitions", "0");
+
conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start",
0L);
+ conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end",
5234567L);
+
+ kafkaInputFormat.setConf(conf);
+ List<InputSplit> splits =
kafkaInputFormat.getSplits(mock(JobContext.class));
+ assertThat(splits.size(), is(2));
+ }
+
+ @Test
+ public void getSplitsNoRecords() throws IOException, InterruptedException {
+ KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+ Configuration conf = new Configuration(false);
+
+ conf.set("org.apache.crunch.kafka.offsets.topic.abc.partitions", "0");
+
conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start",
5L);
+ conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end",
5L);
+
+ kafkaInputFormat.setConf(conf);
+ List<InputSplit> splits =
kafkaInputFormat.getSplits(mock(JobContext.class));
+ assertThat(splits.size(), is(0));
+ }
+}
\ No newline at end of file
diff --git
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordReaderTest.java
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordReaderTest.java
new file mode 100644
index 0000000..fd8cd8e
--- /dev/null
+++
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordReaderTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information regarding copyright
ownership. The ASF licenses this file to you under the
+ * Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRecordReaderTest {
+
+ @Mock
+ private KafkaConsumer<String, String> consumer;
+
+ @Mock
+ private TaskAttemptContext taskAttemptContext;
+
+ private TopicPartition topicPartition;
+ private long startOffset;
+ private long endOffset;
+ private KafkaInputSplit inputSplit;
+
+ private ConsumerRecords<String, String> records;
+
+ private KafkaRecordReader<String, String> reader;
+
+ @Before
+ public void before() throws IOException, InterruptedException {
+ when(taskAttemptContext.getConfiguration()).thenReturn(new
Configuration(false));
+ startOffset = 0L;
+ endOffset = 100L;
+ topicPartition = new TopicPartition("topic", 0);
+
+ inputSplit = new KafkaInputSplit(topicPartition.topic(),
topicPartition.partition(), startOffset, endOffset);
+
+
when(consumer.beginningOffsets(Collections.singleton(inputSplit.getTopicPartition()))).thenReturn(
+ Collections.singletonMap(inputSplit.getTopicPartition(), 0L));
+
+ records = new
ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
+ Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, "key",
"value"))));
+
+ when(consumer.poll(anyLong())).thenReturn(records);
+
+ reader = new KafkaRecordReaderTester();
+ reader.initialize(inputSplit, taskAttemptContext);
+ }
+
+ @Test
+ public void getRecords_consumerPollThrowsException_thenReturnsMessage() {
+ // DisconnectException is retriable
+ when(consumer.poll(anyLong())).thenThrow(new
DisconnectException()).thenReturn(records);
+
+ reader.loadRecords();
+ Iterator<ConsumerRecord<String, String>> iterator =
reader.getRecordIterator();
+ assertThat(iterator.hasNext(), is(true));
+ assertThat(iterator.next(), is(records.records(topicPartition).get(0)));
+ }
+
+ @Test
+ public void getRecords_consumerPollEmpty_thenReturnsMessage() {
+ // DisconnectException is retriable
+ when(consumer.poll(anyLong())).thenReturn(new
ConsumerRecords<>(Collections.<TopicPartition,
+ List<ConsumerRecord<String, String>>> emptyMap())).thenReturn(records);
+
+ reader.loadRecords();
+ Iterator<ConsumerRecord<String, String>> iterator =
reader.getRecordIterator();
+ assertThat(iterator.hasNext(), is(true));
+ assertThat(iterator.next(), is(records.records(topicPartition).get(0)));
+ }
+
+ @Test
+ public void nextKeyValue() throws IOException, InterruptedException {
+ assertThat(reader.nextKeyValue(), is(true));
+ assertThat(reader.getCurrentKey(),
is(records.records(topicPartition).get(0)));
+ assertThat(reader.getCurrentOffset(), is(0L));
+ }
+
+ @Test
+ public void nextKeyValue_recordOffsetAheadOfExpected() throws IOException,
InterruptedException {
+ records = new
ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
+ Collections.singletonList(new ConsumerRecord<>("topic", 0, 10L, "key",
"value"))));
+
+ when(consumer.poll(anyLong())).thenReturn(records);
+
+ assertThat(reader.nextKeyValue(), is(true));
+ assertThat(reader.getCurrentKey(),
is(records.records(topicPartition).get(0)));
+ assertThat(reader.getCurrentOffset(), is(10L));
+ }
+
+ @Test
+ public void nextKeyValue_noRecord_emptyPartition() throws IOException,
InterruptedException {
+ when(consumer.poll(anyLong())).thenReturn(new
ConsumerRecords<>(Collections.<TopicPartition,
+ List<ConsumerRecord<String, String>>> emptyMap()));
+
+
when(consumer.beginningOffsets(Collections.singletonList(topicPartition))).thenReturn(
+ Collections.singletonMap(topicPartition, endOffset));
+
+ assertThat(reader.nextKeyValue(), is(false));
+ }
+
+ @Test(expected = IOException.class)
+ public void nextKeyValue_noRecord_nonEmptyPartition() throws IOException,
InterruptedException {
+ when(consumer.poll(anyLong())).thenReturn(new
ConsumerRecords<>(Collections.<TopicPartition,
+ List<ConsumerRecord<String, String>>> emptyMap()));
+
+ reader.nextKeyValue();
+ }
+
+ @Test
+ public void nextKeyValue_recordIsBeyondEndOffset() throws IOException,
InterruptedException {
+ records = new
ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
+ Collections.singletonList(new ConsumerRecord<>("topic", 0, 100L,
"key", "value"))));
+
+ when(consumer.poll(anyLong())).thenReturn(records);
+
+ assertThat(reader.nextKeyValue(), is(false));
+ }
+
+ @Test
+ public void getEarliestOffset_noOffsetFound() {
+
when(consumer.beginningOffsets(Collections.singletonList(inputSplit.getTopicPartition()))).thenReturn(
+ Collections.<TopicPartition, Long> emptyMap());
+ assertThat(reader.getEarliestOffset(), is(0L));
+ }
+
+ @Test
+ public void getEarliestOffset() {
+
when(consumer.beginningOffsets(Collections.singletonList(inputSplit.getTopicPartition()))).thenReturn(
+ Collections.singletonMap(inputSplit.getTopicPartition(), 100L));
+ assertThat(reader.getEarliestOffset(), is(100L));
+ }
+
+ private class KafkaRecordReaderTester extends KafkaRecordReader<String,
String> {
+ @Override
+ protected KafkaConsumer<String, String> buildConsumer(Properties
properties) {
+ return consumer;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
index 56241ae..af03d64 100644
---
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
+++
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
@@ -27,8 +27,8 @@ import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.From;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.io.To;
-import org.apache.crunch.kafka.*;
-import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.crunch.kafka.KafkaUtils;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
@@ -197,6 +197,48 @@ public class KafkaSourceIT {
pipeline.done();
}
+ @Test
+ public void sourceReadDataThroughPipelineMultipleSplitsPerPartition() {
+ Configuration config = ClusterTest.getConf();
+
+ config.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 7L);
+
+ List<String> keys =
ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10,
10);
+ Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps,
OffsetRequest.EarliestTime(), topic);
+ Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps,
OffsetRequest.LatestTime(), topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ Long endingOffset = endOffsets.get(entry.getKey());
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+ }
+
+ Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+ pipeline.enableDebug();
+
+ ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> kafkaSource =
new KafkaSource(consumerProps, offsets);
+
+ PCollection<ConsumerRecord<BytesWritable, BytesWritable>> read =
pipeline.read(kafkaSource);
+ Path out = path.getPath("out");
+ read.parallelDo(new KafkaSourceIT.SimpleConvertFn(),
Avros.strings()).write(To.textFile(out));
+
+ pipeline.run();
+
+ PCollection<String> persistedKeys = pipeline.read(From.textFile(out));
+
+ Set<String> keysRead = new HashSet<>();
+ int numRecordsFound = 0;
+ for (String value : persistedKeys.materialize()) {
+ assertThat(keys, hasItem(value));
+ numRecordsFound++;
+ keysRead.add(value);
+ }
+
+ assertThat(numRecordsFound, is(keys.size()));
+ assertThat(keysRead.size(), is(keys.size()));
+
+ pipeline.done();
+ }
private static class SimpleConvertFn extends
MapFn<ConsumerRecord<BytesWritable, BytesWritable>, String> {
@Override