This is an automated email from the ASF dual-hosted git repository.

mkwhit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git


The following commit(s) were added to refs/heads/master by this push:
     new 21cb425  CRUNCH-680: Kafka Source should split very large partitions
     new 9b8849c  Merge pull request #21 from noslowerdna/CRUNCH-680
21cb425 is described below

commit 21cb425610676c0ec05627d21f8dc01d51904589
Author: Andrew Olson <[email protected]>
AuthorDate: Fri Feb 22 13:34:32 2019 -0600

    CRUNCH-680: Kafka Source should split very large partitions
---
 .../crunch/kafka/record/KafkaInputFormat.java      |  30 +++-
 .../crunch/kafka/record/KafkaRecordReader.java     |  36 ++++-
 .../apache/crunch/kafka/record/KafkaSource.java    |   3 +
 .../crunch/kafka/record/KafkaInputFormatTest.java  | 141 +++++++++++++++++
 .../crunch/kafka/record/KafkaRecordReaderTest.java | 169 +++++++++++++++++++++
 .../apache/crunch/kafka/record/KafkaSourceIT.java  |  46 +++++-
 6 files changed, 416 insertions(+), 9 deletions(-)

diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
index 4e378ce..2f1d139 100644
--- 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
+++ 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
@@ -89,6 +89,17 @@ public class KafkaInputFormat extends 
InputFormat<ConsumerRecord<BytesWritable,
   private static final String KAFKA_CONNECTION_PROPERTY_BASE = 
"org.apache.crunch.kafka.connection.properties";
 
   /**
+   * Configuration property for the maximum number of records per input split. 
Partitions with more qualifying records than this
+   * limit will be divided into multiple splits.
+   */
+  public static final String KAFKA_MAX_RECORDS_PER_SPLIT = 
"org.apache.crunch.kafka.split.max";
+
+  /**
+   * Default value for {@link #KAFKA_MAX_RECORDS_PER_SPLIT}
+   */
+  public static final long DEFAULT_KAFKA_MAX_RECORDS_PER_SPLIT = 5000000L;
+
+  /**
    * Regex to discover all of the defined Kafka connection properties which 
should be passed to the ConsumerConfig.
    */
   private static final Pattern CONNECTION_PROPERTY_REGEX = Pattern
@@ -98,16 +109,27 @@ public class KafkaInputFormat extends 
InputFormat<ConsumerRecord<BytesWritable,
 
   @Override
   public List<InputSplit> getSplits(JobContext jobContext) throws IOException, 
InterruptedException {
-    Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf());
+    Configuration conf = getConf();
+    long maxRecordsPerSplit = conf.getLong(KAFKA_MAX_RECORDS_PER_SPLIT, 
DEFAULT_KAFKA_MAX_RECORDS_PER_SPLIT);
+    if (maxRecordsPerSplit < 1L) {
+      throw new IllegalArgumentException("Invalid " + 
KAFKA_MAX_RECORDS_PER_SPLIT + " value [" + maxRecordsPerSplit + "]");
+    }
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(conf);
     List<InputSplit> splits = new LinkedList<>();
     for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : 
offsets.entrySet()) {
       TopicPartition topicPartition = entry.getKey();
 
       long start = entry.getValue().first();
       long end = entry.getValue().second();
-      if (start != end) {
-        splits.add(new KafkaInputSplit(topicPartition.topic(), 
topicPartition.partition(), entry.getValue().first(),
-            entry.getValue().second()));
+
+      // Chop up any excessively large partitions into multiple splits for 
more balanced map task durations. This will
+      // also exclude any partitions with no records to read (where the start 
offset equals the end offset).
+      long splitStart = start;
+      while (splitStart < end) {
+        long splitEnd = Math.min(splitStart + maxRecordsPerSplit, end);
+        splits.add(new KafkaInputSplit(topicPartition.topic(), 
topicPartition.partition(), splitStart, splitEnd));
+        splitStart = splitEnd;
       }
     }
 
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
index 754c9ce..3b7ac88 100644
--- 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
+++ 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
@@ -75,7 +75,7 @@ public class KafkaRecordReader<K, V> extends 
RecordReader<ConsumerRecord<K, V>,
 
     kafkaConnectionProperties = 
filterConnectionProperties(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
 
-    consumer = new KafkaConsumer<>(kafkaConnectionProperties);
+    consumer = buildConsumer(kafkaConnectionProperties);
 
     KafkaInputSplit split = (KafkaInputSplit) inputSplit;
     topicPartition = split.getTopicPartition();
@@ -102,6 +102,26 @@ public class KafkaRecordReader<K, V> extends 
RecordReader<ConsumerRecord<K, V>,
     maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, 
KAFKA_RETRY_ATTEMPTS_DEFAULT);
   }
 
+  /**
+   * Builds a new kafka consumer
+   *
+   * @param properties
+   *      the properties to configure the consumer
+   * @return a new kafka consumer
+   */
+  // Visible for testing
+  protected KafkaConsumer<K, V> buildConsumer(Properties properties) {
+    return new KafkaConsumer<>(properties);
+  }
+
+  /**
+   * @return the current offset for the reader
+   */
+  // Visible for testing
+  protected long getCurrentOffset() {
+    return currentOffset;
+  }
+
   @Override
   public boolean nextKeyValue() throws IOException, InterruptedException {
     if (hasPendingData()) {
@@ -173,9 +193,18 @@ public class KafkaRecordReader<K, V> extends 
RecordReader<ConsumerRecord<K, V>,
   }
 
   /**
+   * @return the record iterator used by the reader
+   */
+  // Visble for testing
+  protected Iterator<ConsumerRecord<K, V>> getRecordIterator() {
+    return recordIterator;
+  }
+
+  /**
    * Loads new records into the record iterator
    */
-  private void loadRecords() {
+  // Visible for testing
+  protected void loadRecords() {
     if ((recordIterator == null) || !recordIterator.hasNext()) {
       ConsumerRecords<K, V> records = null;
       int numTries = 0;
@@ -212,7 +241,8 @@ public class KafkaRecordReader<K, V> extends 
RecordReader<ConsumerRecord<K, V>,
       if ((records == null) || records.isEmpty()){
         LOG.info("No records retrieved from Kafka partition {} therefore 
nothing to iterate over", topicPartition);
       } else{
-        LOG.info("Retrieved records {} from Kafka partition {} to iterate 
over", records.count(), topicPartition);
+        LOG.info("Retrieved {} records from Kafka partition {} to iterate over 
starting from offset {}", new Object[] {
+            records.count(), topicPartition, 
records.iterator().next().offset()});
       }
 
       recordIterator = records != null ? records.iterator() : 
ConsumerRecords.<K, V>empty().iterator();
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
index 4d59edd..98964d8 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
@@ -163,6 +163,9 @@ public class KafkaSource
     props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
     props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
 
+    //disable automatic committing of consumer offsets
+    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
Boolean.toString(false));
+
     return props;
   }
 
diff --git 
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaInputFormatTest.java
 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaInputFormatTest.java
new file mode 100644
index 0000000..2062058
--- /dev/null
+++ 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaInputFormatTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class KafkaInputFormatTest {
+
+  @Test
+  public void generateConnectionPropertyKeyTest() {
+    String propertyName = "some.property";
+    String actual = 
KafkaInputFormat.generateConnectionPropertyKey(propertyName);
+    String expected = 
"org.apache.crunch.kafka.connection.properties.some.property";
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void getConnectionPropertyFromKeyTest() {
+    String prefixedConnectionProperty = 
"org.apache.crunch.kafka.connection.properties.some.property";
+    String actual = 
KafkaInputFormat.getConnectionPropertyFromKey(prefixedConnectionProperty);
+    String expected = "some.property";
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void writeConnectionPropertiesToBundleTest() {
+    FormatBundle<KafkaInputFormat> actual = 
FormatBundle.forInput(KafkaInputFormat.class);
+    Properties connectionProperties = new Properties();
+    connectionProperties.put("key1", "value1");
+    connectionProperties.put("key2", "value2");
+    KafkaInputFormat.writeConnectionPropertiesToBundle(connectionProperties, 
actual);
+
+    FormatBundle<KafkaInputFormat> expected = 
FormatBundle.forInput(KafkaInputFormat.class);
+    expected.set("org.apache.crunch.kafka.connection.properties.key1", 
"value1");
+    expected.set("org.apache.crunch.kafka.connection.properties.key2", 
"value2");
+
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void filterConnectionPropertiesTest() {
+    Properties props = new Properties();
+    props.put("org.apache.crunch.kafka.connection.properties.key1", "value1");
+    props.put("org.apache.crunch.kafka.connection.properties.key2", "value2");
+    props.put("org_apache_crunch_kafka_connection_properties.key3", "value3");
+    props.put("org.apache.crunch.another.prefix.properties.key4", "value4");
+
+    Properties actual = KafkaInputFormat.filterConnectionProperties(props);
+    Properties expected = new Properties();
+    expected.put("key1", "value1");
+    expected.put("key2", "value2");
+
+    assertEquals(expected, actual);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getSplitsInvalidMaxRecords() throws IOException, 
InterruptedException {
+    KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+    Configuration conf = new Configuration(false);
+    conf.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 0L);
+    kafkaInputFormat.setConf(conf);
+    kafkaInputFormat.getSplits(mock(JobContext.class));
+  }
+
+  @Test
+  public void getSplitsConfiguredMaxRecords() throws IOException, 
InterruptedException {
+    KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+    Configuration conf = new Configuration(false);
+    conf.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 2L);
+
+    conf.set("org.apache.crunch.kafka.offsets.topic.abc.partitions", "0,1");
+    
conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start", 
300L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end", 
1000L);
+    
conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start", 
30L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end", 
100L);
+
+    conf.set("org.apache.crunch.kafka.offsets.topic.xyz.partitions", "0");
+    
conf.setLong("org.apache.crunch.kafka.offsets.topic.xyz.partitions.0.start", 
3L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.xyz.partitions.0.end", 
10L);
+
+    kafkaInputFormat.setConf(conf);
+    List<InputSplit> splits = 
kafkaInputFormat.getSplits(mock(JobContext.class));
+    assertThat(splits.size(), is((700/2 + 700%2) + (70/2 + 70%2) + (7/2 + 
7%2)));
+  }
+
+  @Test
+  public void getSplitsDefaultMaxRecords() throws IOException, 
InterruptedException {
+    KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+    Configuration conf = new Configuration(false);
+
+    conf.set("org.apache.crunch.kafka.offsets.topic.abc.partitions", "0");
+    
conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start", 
0L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end", 
5234567L);
+
+    kafkaInputFormat.setConf(conf);
+    List<InputSplit> splits = 
kafkaInputFormat.getSplits(mock(JobContext.class));
+    assertThat(splits.size(), is(2));
+  }
+
+  @Test
+  public void getSplitsNoRecords() throws IOException, InterruptedException {
+    KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+    Configuration conf = new Configuration(false);
+
+    conf.set("org.apache.crunch.kafka.offsets.topic.abc.partitions", "0");
+    
conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start", 
5L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end", 
5L);
+
+    kafkaInputFormat.setConf(conf);
+    List<InputSplit> splits = 
kafkaInputFormat.getSplits(mock(JobContext.class));
+    assertThat(splits.size(), is(0));
+  }
+}
\ No newline at end of file
diff --git 
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordReaderTest.java
 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordReaderTest.java
new file mode 100644
index 0000000..fd8cd8e
--- /dev/null
+++ 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordReaderTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information regarding copyright 
ownership.  The ASF licenses this file to you under the
+ * Apache License, Version 2.0 (the "License"); you may not use this file 
except in compliance with the License.  You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRecordReaderTest {
+
+  @Mock
+  private KafkaConsumer<String, String> consumer;
+
+  @Mock
+  private TaskAttemptContext taskAttemptContext;
+
+  private TopicPartition topicPartition;
+  private long startOffset;
+  private long endOffset;
+  private KafkaInputSplit inputSplit;
+
+  private ConsumerRecords<String, String> records;
+
+  private KafkaRecordReader<String, String> reader;
+
+  @Before
+  public void before() throws IOException, InterruptedException {
+    when(taskAttemptContext.getConfiguration()).thenReturn(new 
Configuration(false));
+    startOffset = 0L;
+    endOffset = 100L;
+    topicPartition = new TopicPartition("topic", 0);
+
+    inputSplit = new KafkaInputSplit(topicPartition.topic(), 
topicPartition.partition(), startOffset, endOffset);
+
+    
when(consumer.beginningOffsets(Collections.singleton(inputSplit.getTopicPartition()))).thenReturn(
+        Collections.singletonMap(inputSplit.getTopicPartition(), 0L));
+
+    records = new 
ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
+        Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, "key", 
"value"))));
+
+    when(consumer.poll(anyLong())).thenReturn(records);
+
+    reader = new KafkaRecordReaderTester();
+    reader.initialize(inputSplit, taskAttemptContext);
+  }
+
+  @Test
+  public void getRecords_consumerPollThrowsException_thenReturnsMessage() {
+    // DisconnectException is retriable
+    when(consumer.poll(anyLong())).thenThrow(new 
DisconnectException()).thenReturn(records);
+
+    reader.loadRecords();
+    Iterator<ConsumerRecord<String, String>> iterator = 
reader.getRecordIterator();
+    assertThat(iterator.hasNext(), is(true));
+    assertThat(iterator.next(), is(records.records(topicPartition).get(0)));
+  }
+
+  @Test
+  public void getRecords_consumerPollEmpty_thenReturnsMessage() {
+    // DisconnectException is retriable
+    when(consumer.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.<TopicPartition,
+        List<ConsumerRecord<String, String>>> emptyMap())).thenReturn(records);
+
+    reader.loadRecords();
+    Iterator<ConsumerRecord<String, String>> iterator = 
reader.getRecordIterator();
+    assertThat(iterator.hasNext(), is(true));
+    assertThat(iterator.next(), is(records.records(topicPartition).get(0)));
+  }
+
+  @Test
+  public void nextKeyValue() throws IOException, InterruptedException {
+    assertThat(reader.nextKeyValue(), is(true));
+    assertThat(reader.getCurrentKey(), 
is(records.records(topicPartition).get(0)));
+    assertThat(reader.getCurrentOffset(), is(0L));
+  }
+
+  @Test
+  public void nextKeyValue_recordOffsetAheadOfExpected() throws IOException, 
InterruptedException {
+    records = new 
ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
+        Collections.singletonList(new ConsumerRecord<>("topic", 0, 10L, "key", 
"value"))));
+
+    when(consumer.poll(anyLong())).thenReturn(records);
+
+    assertThat(reader.nextKeyValue(), is(true));
+    assertThat(reader.getCurrentKey(), 
is(records.records(topicPartition).get(0)));
+    assertThat(reader.getCurrentOffset(), is(10L));
+  }
+
+  @Test
+  public void nextKeyValue_noRecord_emptyPartition() throws IOException, 
InterruptedException {
+    when(consumer.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.<TopicPartition,
+        List<ConsumerRecord<String, String>>> emptyMap()));
+
+    
when(consumer.beginningOffsets(Collections.singletonList(topicPartition))).thenReturn(
+        Collections.singletonMap(topicPartition, endOffset));
+
+    assertThat(reader.nextKeyValue(), is(false));
+  }
+
+  @Test(expected = IOException.class)
+  public void nextKeyValue_noRecord_nonEmptyPartition() throws IOException, 
InterruptedException {
+    when(consumer.poll(anyLong())).thenReturn(new 
ConsumerRecords<>(Collections.<TopicPartition,
+        List<ConsumerRecord<String, String>>> emptyMap()));
+
+    reader.nextKeyValue();
+  }
+
+  @Test
+  public void nextKeyValue_recordIsBeyondEndOffset() throws IOException, 
InterruptedException {
+    records = new 
ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
+        Collections.singletonList(new ConsumerRecord<>("topic", 0, 100L, 
"key", "value"))));
+
+    when(consumer.poll(anyLong())).thenReturn(records);
+
+    assertThat(reader.nextKeyValue(), is(false));
+  }
+
+  @Test
+  public void getEarliestOffset_noOffsetFound() {
+    
when(consumer.beginningOffsets(Collections.singletonList(inputSplit.getTopicPartition()))).thenReturn(
+        Collections.<TopicPartition, Long> emptyMap());
+    assertThat(reader.getEarliestOffset(), is(0L));
+  }
+
+  @Test
+  public void getEarliestOffset() {
+    
when(consumer.beginningOffsets(Collections.singletonList(inputSplit.getTopicPartition()))).thenReturn(
+        Collections.singletonMap(inputSplit.getTopicPartition(), 100L));
+    assertThat(reader.getEarliestOffset(), is(100L));
+  }
+
+  private class KafkaRecordReaderTester extends KafkaRecordReader<String, 
String> {
+    @Override
+    protected KafkaConsumer<String, String> buildConsumer(Properties 
properties) {
+      return consumer;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
index 56241ae..af03d64 100644
--- 
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
+++ 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
@@ -27,8 +27,8 @@ import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.From;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.To;
-import org.apache.crunch.kafka.*;
-import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.crunch.kafka.KafkaUtils;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
@@ -197,6 +197,48 @@ public class KafkaSourceIT {
     pipeline.done();
   }
 
+  @Test
+  public void sourceReadDataThroughPipelineMultipleSplitsPerPartition() {
+    Configuration config = ClusterTest.getConf();
+
+    config.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 7L);
+
+    List<String> keys = 
ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 
10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, 
OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, 
OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+    pipeline.enableDebug();
+
+    ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> kafkaSource = 
new KafkaSource(consumerProps, offsets);
+
+    PCollection<ConsumerRecord<BytesWritable, BytesWritable>> read = 
pipeline.read(kafkaSource);
+    Path out = path.getPath("out");
+    read.parallelDo(new KafkaSourceIT.SimpleConvertFn(), 
Avros.strings()).write(To.textFile(out));
+
+    pipeline.run();
+
+    PCollection<String> persistedKeys = pipeline.read(From.textFile(out));
+
+    Set<String> keysRead = new HashSet<>();
+    int numRecordsFound = 0;
+    for (String value : persistedKeys.materialize()) {
+      assertThat(keys, hasItem(value));
+      numRecordsFound++;
+      keysRead.add(value);
+    }
+
+    assertThat(numRecordsFound, is(keys.size()));
+    assertThat(keysRead.size(), is(keys.size()));
+
+    pipeline.done();
+  }
 
   private static class SimpleConvertFn extends 
MapFn<ConsumerRecord<BytesWritable, BytesWritable>, String> {
     @Override

Reply via email to