CRUNCH-653: Created KafkaSource that provides ConsumerRecord messages

Signed-off-by: Josh Wills <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f4734781
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f4734781
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f4734781

Branch: refs/heads/master
Commit: f473478141144dea5ce422309aec37e8212a9f1e
Parents: 28ab199
Author: Bryan Baugher <[email protected]>
Authored: Wed Aug 16 16:19:42 2017 -0500
Committer: Josh Wills <[email protected]>
Committed: Thu Oct 26 21:01:19 2017 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/kafka/KafkaSource.java    |   3 +
 .../kafka/record/ConsumerRecordHelper.java      | 166 ++++++++
 .../apache/crunch/kafka/record/KafkaData.java   |  62 +++
 .../crunch/kafka/record/KafkaInputFormat.java   | 318 ++++++++++++++
 .../crunch/kafka/record/KafkaInputSplit.java    | 121 ++++++
 .../crunch/kafka/record/KafkaRecordReader.java  | 235 +++++++++++
 .../kafka/record/KafkaRecordsIterable.java      | 292 +++++++++++++
 .../apache/crunch/kafka/record/KafkaSource.java | 214 ++++++++++
 .../kafka/record/KafkaSourceConverter.java      |  70 ++++
 .../org/apache/crunch/kafka/ClusterTest.java    |   3 +
 .../crunch/kafka/KafkaRecordsIterableIT.java    |   1 -
 .../kafka/record/ConsumerRecordHelperTest.java  | 101 +++++
 .../apache/crunch/kafka/record/KafkaDataIT.java | 118 ++++++
 .../kafka/record/KafkaRecordsIterableIT.java    | 416 +++++++++++++++++++
 .../crunch/kafka/record/KafkaSourceIT.java      | 207 +++++++++
 15 files changed, 2326 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
index bdcb7a9..1614053 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
@@ -58,7 +58,10 @@ import java.util.Properties;
  * The values retrieved from Kafka are returned as raw bytes inside of a 
{@link BytesWritable}.  If callers
  * need specific parsing logic based on the topic then consumers are 
encouraged to use multiple Kafka Sources
  * for each topic and use special {@link DoFn} to parse the payload.
+ *
+ * @deprecated Use {@link org.apache.crunch.kafka.record.KafkaSource} instead
  */
+@Deprecated
 public class KafkaSource
     implements TableSource<BytesWritable, BytesWritable>, 
ReadableSource<Pair<BytesWritable, BytesWritable>> {
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/ConsumerRecordHelper.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/ConsumerRecordHelper.java
 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/ConsumerRecordHelper.java
new file mode 100644
index 0000000..0520ed8
--- /dev/null
+++ 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/ConsumerRecordHelper.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Serializer/De-Serializer for Kafka's {@link ConsumerRecord}
+ */
+public class ConsumerRecordHelper {
+
+  /**
+   * PType for {@link ConsumerRecord}
+   */
+  @SuppressWarnings("unchecked")
+  public static final PType<ConsumerRecord<BytesWritable, BytesWritable>> 
CONSUMER_RECORD_P_TYPE = Writables
+      .derived((Class<ConsumerRecord<BytesWritable, BytesWritable>>) (Object) 
ConsumerRecord.class,
+          new ConsumerRecordHelper.BytesToConsumerRecord(), new 
ConsumerRecordHelper.ConsumerRecordToBytes(),
+          Writables.writables(BytesWritable.class));
+
+  /**
+   * Serializes the record into {@code byte[]}s
+   *
+   * @param record the record to serialize
+   * @return the record in {@code byte[]}s
+   * @throws IllegalArgumentException if record is {@code null}
+   * @throws IOException              if there is an issue during serialization
+   */
+  public static byte[] serialize(ConsumerRecord<BytesWritable, BytesWritable> 
record) throws IOException {
+    if (record == null)
+      throw new IllegalArgumentException("record cannot be null");
+
+    ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
+
+    try (DataOutputStream dataOut = new DataOutputStream(byteOut)) {
+      dataOut.writeUTF(record.topic());
+      dataOut.writeInt(record.partition());
+      dataOut.writeLong(record.offset());
+      dataOut.writeLong(record.timestamp());
+      dataOut.writeUTF(record.timestampType().name);
+      dataOut.writeLong(record.checksum());
+      dataOut.writeInt(record.serializedKeySize());
+      dataOut.writeInt(record.serializedValueSize());
+
+      if (record.key() == null) {
+        dataOut.writeInt(-1);
+      } else {
+        byte[] keyBytes = record.key().getBytes();
+        dataOut.writeInt(keyBytes.length);
+        dataOut.write(keyBytes);
+      }
+
+      if (record.value() == null) {
+        dataOut.writeInt(-1);
+      } else {
+        byte[] valueBytes = record.value().getBytes();
+        dataOut.writeInt(valueBytes.length);
+        dataOut.write(valueBytes);
+      }
+
+      return byteOut.toByteArray();
+    }
+  }
+
+  /**
+   * De-serializes the bytes into a {@link ConsumerRecord}
+   *
+   * @param bytes the bytes of a {@link ConsumerRecord}
+   * @return a {@link ConsumerRecord} from the bytes
+   * @throws IllegalArgumentException if bytes is {@code null}
+   * @throws IOException              if there is an issue de-serializing the 
bytes
+   */
+  public static ConsumerRecord<BytesWritable, BytesWritable> 
deserialize(byte[] bytes) throws IOException {
+    if (bytes == null)
+      throw new IllegalArgumentException("bytes cannot be null");
+
+    try (DataInputStream dataIn = new DataInputStream(new 
ByteArrayInputStream(bytes))) {
+      String topic = dataIn.readUTF();
+      int partition = dataIn.readInt();
+      long offset = dataIn.readLong();
+      long timestamp = dataIn.readLong();
+      String timestampTypeName = dataIn.readUTF();
+      long checksum = dataIn.readLong();
+      int serializedKeySize = dataIn.readInt();
+      int serializedValueSize = dataIn.readInt();
+
+      BytesWritable key = null;
+      int keySize = dataIn.readInt();
+      if (keySize != -1) {
+        byte[] keyBytes = new byte[keySize];
+        dataIn.readFully(keyBytes);
+        key = new BytesWritable(keyBytes);
+      }
+
+      BytesWritable value = null;
+      int valueSize = dataIn.readInt();
+      if (valueSize != -1) {
+        byte[] valueBytes = new byte[valueSize];
+        dataIn.readFully(valueBytes);
+        value = new BytesWritable(valueBytes);
+      }
+
+      return new ConsumerRecord<>(topic, partition, offset, timestamp, 
TimestampType.forName(timestampTypeName), checksum,
+          serializedKeySize, serializedValueSize, key, value);
+    }
+  }
+
+  /**
+   * {@link MapFn} to convert {@link ConsumerRecord} to {@link BytesWritable}
+   */
+  public static class ConsumerRecordToBytes extends 
MapFn<ConsumerRecord<BytesWritable, BytesWritable>, BytesWritable> {
+    private static final long serialVersionUID = -6821080008375335537L;
+
+    @Override
+    public BytesWritable map(ConsumerRecord<BytesWritable, BytesWritable> 
record) {
+      try {
+        return new BytesWritable(ConsumerRecordHelper.serialize(record));
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error serializing consumer record " 
+ record, e);
+      }
+    }
+  }
+
+  /**
+   * {@link MapFn} to convert {@link BytesWritable} to {@link ConsumerRecord}
+   */
+  public static class BytesToConsumerRecord extends MapFn<BytesWritable, 
ConsumerRecord<BytesWritable, BytesWritable>> {
+    private static final long serialVersionUID = -6545017910063252322L;
+
+    @Override
+    public ConsumerRecord<BytesWritable, BytesWritable> map(BytesWritable 
bytesWritable) {
+      try {
+        return ConsumerRecordHelper.deserialize(bytesWritable.getBytes());
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error deserializing consumer 
record", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaData.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaData.java 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaData.java
new file mode 100644
index 0000000..7cd9ee1
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaData.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+class KafkaData<K, V> implements ReadableData<ConsumerRecord<K, V>> {
+
+  private static final long serialVersionUID = -6582212311361579556L;
+
+  private final Map<TopicPartition, Pair<Long, Long>> offsets;
+  private final Properties props;
+
+  KafkaData(Properties connectionProperties, Map<TopicPartition, Pair<Long, 
Long>> offsets) {
+    this.props = connectionProperties;
+    this.offsets = offsets;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return null;
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    //no-op
+  }
+
+  @Override
+  public Iterable<ConsumerRecord<K, V>> read(TaskInputOutputContext<?, ?, ?, 
?> context) throws IOException {
+    Consumer<K, V> consumer = new KafkaConsumer<>(props);
+    return new KafkaRecordsIterable<>(consumer, offsets, props);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
new file mode 100644
index 0000000..4e378ce
--- /dev/null
+++ 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * Basic input format for reading data from Kafka.  Data is read and provided 
as {@link ConsumerRecord} with the data left in its
+ * raw byte form.
+ * <p>
+ * Populating the configuration of the input format is handled with the 
convenience method of
+ * {@link #writeOffsetsToConfiguration(Map, Configuration)}.  This should be 
done to ensure
+ * the Kafka offset information is available when the input format {@link 
#getSplits(JobContext) creates its splits}
+ * and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}.
+ * <p>
+ * To allow for suppression of warnings referring to unknown configs in the 
Kafka {@code ConsumerConfig}, properties containing
+ * Kafka connection information will be prefixed using {@link 
#generateConnectionPropertyKey(String) generateConnectionPropertyKey}
+ * and will be written to the {@code FormatBundle} using
+ * {@link #writeConnectionPropertiesToBundle(Properties, FormatBundle) 
writeConnectionPropertiesToBundle}. These properties can then
+ * be retrieved using {@link #filterConnectionProperties(Properties) 
filterConnectionProperties}.
+ */
+public class KafkaInputFormat extends 
InputFormat<ConsumerRecord<BytesWritable, BytesWritable>, Void> implements 
Configurable {
+
+  /**
+   * Constant for constructing configuration keys for the input format.
+   */
+  private static final String KAFKA_INPUT_OFFSETS_BASE = 
"org.apache.crunch.kafka.offsets.topic";
+
+  /**
+   * Constant used for building configuration keys and specifying partitions.
+   */
+  private static final String PARTITIONS = "partitions";
+
+  /**
+   * Constant used for building configuration keys and specifying the start of 
a partition.
+   */
+  private static final String START = "start";
+
+  /**
+   * Constant used for building configuration keys and specifying the end of a 
partition.
+   */
+  private static final String END = "end";
+
+  /**
+   * Regex to discover all of the defined partitions which should be consumed 
by the input format.
+   */
+  private static final String TOPIC_KEY_REGEX = 
Pattern.quote(KAFKA_INPUT_OFFSETS_BASE) + "\\..*\\." + PARTITIONS + "$";
+
+  /**
+   * Constant for constructing configuration keys for the Kafka connection 
properties.
+   */
+  private static final String KAFKA_CONNECTION_PROPERTY_BASE = 
"org.apache.crunch.kafka.connection.properties";
+
+  /**
+   * Regex to discover all of the defined Kafka connection properties which 
should be passed to the ConsumerConfig.
+   */
+  private static final Pattern CONNECTION_PROPERTY_REGEX = Pattern
+      .compile(Pattern.quote(KAFKA_CONNECTION_PROPERTY_BASE) + "\\..*$");
+
+  private Configuration configuration;
+
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext) throws IOException, 
InterruptedException {
+    Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf());
+    List<InputSplit> splits = new LinkedList<>();
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : 
offsets.entrySet()) {
+      TopicPartition topicPartition = entry.getKey();
+
+      long start = entry.getValue().first();
+      long end = entry.getValue().second();
+      if (start != end) {
+        splits.add(new KafkaInputSplit(topicPartition.topic(), 
topicPartition.partition(), entry.getValue().first(),
+            entry.getValue().second()));
+      }
+    }
+
+    return splits;
+  }
+
+  @Override
+  public RecordReader<ConsumerRecord<BytesWritable, BytesWritable>, Void> 
createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
+    return new KafkaRecordReader<>();
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return configuration;
+  }
+
+  //The following methods are used for reading and writing Kafka Partition 
offset information into Hadoop's Configuration
+  //objects and into Crunch's FormatBundle.  For a specific Kafka Topic it 
might have one or many partitions and for
+  //each partition it will need a start and end offset.  Assuming you have a 
topic of "abc" and it has 2 partitions the
+  //configuration would be populated with the following:
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions = [0,1]
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start = <partition 
start>
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end = <partition 
end>
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start = <partition 
start>
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end = <partition 
end>
+
+  /**
+   * Writes the start and end offsets for the provided topic partitions to the 
{@code bundle}.
+   *
+   * @param offsets The starting and ending offsets for the topics and 
partitions.
+   * @param bundle  the bundle into which the information should be persisted.
+   */
+  public static void writeOffsetsToBundle(Map<TopicPartition, Pair<Long, 
Long>> offsets, FormatBundle bundle) {
+    for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) 
{
+      bundle.set(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   * Writes the start and end offsets for the provided topic partitions to the 
{@code config}.
+   *
+   * @param offsets The starting and ending offsets for the topics and 
partitions.
+   * @param config  the config into which the information should be persisted.
+   */
+  public static void writeOffsetsToConfiguration(Map<TopicPartition, 
Pair<Long, Long>> offsets, Configuration config) {
+    for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) 
{
+      config.set(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   * Reads the {@code configuration} to determine which topics, partitions, 
and offsets should be used for reading data.
+   *
+   * @param configuration the configuration to derive the data to read.
+   * @return a map of {@link TopicPartition} to a pair of start and end 
offsets.
+   * @throws IllegalStateException if the {@code configuration} does not have 
the start and end offsets set properly
+   *                               for a partition.
+   */
+  public static Map<TopicPartition, Pair<Long, Long>> getOffsets(Configuration 
configuration) {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    //find configuration for all of the topics with defined partitions
+    Map<String, String> topicPartitionKeys = 
configuration.getValByRegex(TOPIC_KEY_REGEX);
+
+    //for each topic start to process it's partitions
+    for (String key : topicPartitionKeys.keySet()) {
+      String topic = getTopicFromKey(key);
+      int[] partitions = configuration.getInts(key);
+      //for each partition find and add the start/end offset
+      for (int partitionId : partitions) {
+        TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+        long start = configuration.getLong(generatePartitionStartKey(topic, 
partitionId), Long.MIN_VALUE);
+        long end = configuration.getLong(generatePartitionEndKey(topic, 
partitionId), Long.MIN_VALUE);
+
+        if (start == Long.MIN_VALUE || end == Long.MIN_VALUE) {
+          throw new IllegalStateException(
+              "The " + topicPartition + " has an invalid start:" + start + " 
or end:" + end + " offset configured.");
+        }
+
+        offsets.put(topicPartition, Pair.of(start, end));
+      }
+    }
+
+    return offsets;
+  }
+
+  private static Map<String, String> generateValues(Map<TopicPartition, 
Pair<Long, Long>> offsets) {
+    Map<String, String> offsetConfigValues = new HashMap<>();
+    Map<String, Set<Integer>> topicsPartitions = new HashMap<>();
+
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : 
offsets.entrySet()) {
+      TopicPartition topicPartition = entry.getKey();
+      String topic = topicPartition.topic();
+      int partition = topicPartition.partition();
+      String startKey = generatePartitionStartKey(topic, partition);
+      String endKey = generatePartitionEndKey(topic, partition);
+      //Add the start and end offsets for a specific partition
+      offsetConfigValues.put(startKey, 
Long.toString(entry.getValue().first()));
+      offsetConfigValues.put(endKey, Long.toString(entry.getValue().second()));
+
+      Set<Integer> partitions = topicsPartitions.get(topic);
+      if (partitions == null) {
+        partitions = new HashSet<>();
+        topicsPartitions.put(topic, partitions);
+      }
+      partitions.add(partition);
+    }
+
+    //generate the partitions values for each topic
+    for (Map.Entry<String, Set<Integer>> entry : topicsPartitions.entrySet()) {
+      String key = KAFKA_INPUT_OFFSETS_BASE + "." + entry.getKey() + "." + 
PARTITIONS;
+      Set<Integer> partitions = entry.getValue();
+      String partitionsString = StringUtils.join(partitions, ",");
+      offsetConfigValues.put(key, partitionsString);
+    }
+
+    return offsetConfigValues;
+  }
+
+  static String generatePartitionStartKey(String topic, int partition) {
+    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + 
partition + "." + START;
+  }
+
+  static String generatePartitionEndKey(String topic, int partition) {
+    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + 
partition + "." + END;
+  }
+
+  static String generateTopicPartitionsKey(String topic) {
+    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS;
+  }
+
+  static String getTopicFromKey(String key) {
+    //strip off the base key + a trailing "."
+    String value = key.substring(KAFKA_INPUT_OFFSETS_BASE.length() + 1);
+    //strip off the end part + a preceding "."
+    value = value.substring(0, (value.length() - (PARTITIONS.length() + 1)));
+
+    return value;
+  }
+
+  // The following methods are used for writing prefixed Kafka connection 
properties into Crunch's FormatBundle and
+  // {@link #filterConnectionProperties(Properties props) filtering} out the 
prefixed properties. This allows for
+  // suppression of {@code ConsumerConfig} warnings that are generated by 
unused properties carried over from the Hadoop
+  // properties.
+
+  /**
+   * Writes the Kafka connection properties to the {@code bundle}. The 
connection properties are prefixed with
+   * "org.apache.crunch.kafka.connection.properties" to allow for suppression 
of unused {@code ConsumerConfig} warnings
+   * generated by unused Hadoop properties.
+   *
+   * @param connectionProperties The Kafka connection properties to be 
prefixed for later
+   *                             {@link #filterConnectionProperties(Properties 
props) filtering}.
+   * @param bundle               The bundle into which the information should 
be persisted.
+   */
+  public static void writeConnectionPropertiesToBundle(Properties 
connectionProperties, FormatBundle bundle) {
+    for (final String name : connectionProperties.stringPropertyNames()) {
+      bundle.set(generateConnectionPropertyKey(name), 
connectionProperties.getProperty(name));
+    }
+  }
+
+  /**
+   * Filters out properties properties written by
+   * {@link #writeConnectionPropertiesToBundle(Properties, FormatBundle) 
writeConnectionPropertiesToBundle}.
+   *
+   * @param props The properties to be filtered.
+   * @return The properties containing Kafka connection information that were 
written by
+   * {@link #writeConnectionPropertiesToBundle(Properties, FormatBundle) 
writeConnectionPropertiesToBundle}.
+   */
+  public static Properties filterConnectionProperties(Properties props) {
+    Properties filteredProperties = new Properties();
+
+    for (final String name : props.stringPropertyNames()) {
+      if (CONNECTION_PROPERTY_REGEX.matcher(name).matches()) {
+        filteredProperties.put(getConnectionPropertyFromKey(name), 
props.getProperty(name));
+      }
+    }
+
+    return filteredProperties;
+  }
+
+  /**
+   * Prefixes a given property with 
"org.apache.crunch.kafka.connection.properties" to allow for filtering with
+   * {@link #filterConnectionProperties(Properties) 
filterConnectionProperties}.
+   *
+   * @param property The Kafka connection property that will be prefixed for 
retrieval at a later time.
+   * @return The property prefixed with 
"org.apache.crunch.kafka.connection.properties"
+   */
+  static String generateConnectionPropertyKey(String property) {
+    return KAFKA_CONNECTION_PROPERTY_BASE + "." + property;
+  }
+
+  /**
+   * Retrieves the original property that was prefixed using
+   * {@link #generateConnectionPropertyKey(String) 
generateConnectionPropertyKey}.
+   *
+   * @param key The key that was prefixed using {@link 
#generateConnectionPropertyKey(String) generateConnectionPropertyKey}.
+   * @return The original property prior to prefixing.
+   */
+  static String getConnectionPropertyFromKey(String key) {
+    // Strip off the base key + a trailing "."
+    return key.substring(KAFKA_CONNECTION_PROPERTY_BASE.length() + 1);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputSplit.java
 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputSplit.java
new file mode 100644
index 0000000..b2b0bb9
--- /dev/null
+++ 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputSplit.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * InputSplit that represent retrieving data from a single {@link 
TopicPartition} between the specified start
+ * and end offsets.
+ */
+public class KafkaInputSplit extends InputSplit implements Writable {
+
+  private long startingOffset;
+  private long endingOffset;
+  private TopicPartition topicPartition;
+
+  /**
+   * Nullary Constructor for creating the instance inside the Mapper instance.
+   */
+  public KafkaInputSplit() {
+
+  }
+
+  /**
+   * Constructs an input split for the provided {@code topic} and {@code 
partition} restricting data to be between
+   * the {@code startingOffset} and {@code endingOffset}
+   *
+   * @param topic          the topic for the split
+   * @param partition      the partition for the topic
+   * @param startingOffset the start of the split
+   * @param endingOffset   the end of the split
+   */
+  public KafkaInputSplit(String topic, int partition, long startingOffset, 
long endingOffset) {
+    this.startingOffset = startingOffset;
+    this.endingOffset = endingOffset;
+    topicPartition = new TopicPartition(topic, partition);
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    // This is just used as a hint for size of bytes so it is already 
inaccurate.
+    return startingOffset > 0 ? endingOffset - startingOffset : endingOffset;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    //Leave empty since data locality not really an issue.
+    return new String[0];
+  }
+
+  /**
+   * Returns the topic and partition for the split
+   *
+   * @return the topic and partition for the split
+   */
+  public TopicPartition getTopicPartition() {
+    return topicPartition;
+  }
+
+  /**
+   * Returns the starting offset for the split
+   *
+   * @return the starting offset for the split
+   */
+  public long getStartingOffset() {
+    return startingOffset;
+  }
+
+  /**
+   * Returns the ending offset for the split
+   *
+   * @return the ending offset for the split
+   */
+  public long getEndingOffset() {
+    return endingOffset;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeUTF(topicPartition.topic());
+    dataOutput.writeInt(topicPartition.partition());
+    dataOutput.writeLong(startingOffset);
+    dataOutput.writeLong(endingOffset);
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    String topic = dataInput.readUTF();
+    int partition = dataInput.readInt();
+    startingOffset = dataInput.readLong();
+    endingOffset = dataInput.readLong();
+
+    topicPartition = new TopicPartition(topic, partition);
+  }
+
+  @Override
+  public String toString() {
+    return getTopicPartition() + " Start: " + startingOffset + " End: " + 
endingOffset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
new file mode 100644
index 0000000..ec138b3
--- /dev/null
+++ 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.kafka.KafkaUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.crunch.kafka.KafkaUtils.KAFKA_EMPTY_RETRY_ATTEMPTS_KEY;
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
+import static 
org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT;
+import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
+import static 
org.apache.crunch.kafka.record.KafkaInputFormat.filterConnectionProperties;
+import static 
org.apache.crunch.kafka.record.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT;
+import static 
org.apache.crunch.kafka.record.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY;
+
+/**
+ * A {@link RecordReader} for pulling data from Kafka.
+ *
+ * @param <K> the key of the records from Kafka
+ * @param <V> the value of the records from Kafka
+ */
+public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, 
V>, Void> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+
+  private Consumer<K, V> consumer;
+  private ConsumerRecord<K, V> record;
+  private long endingOffset;
+  private Iterator<ConsumerRecord<K, V>> recordIterator;
+  private long consumerPollTimeout;
+  private long maxNumberOfRecords;
+  private long startingOffset;
+  private long currentOffset;
+  private int maxNumberAttempts;
+  private Properties kafkaConnectionProperties;
+  private int maxConcurrentEmptyResponses;
+  private int concurrentEmptyResponses;
+  private TopicPartition topicPartition;
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+      throws IOException, InterruptedException {
+    if (!(inputSplit instanceof KafkaInputSplit)) {
+      throw new CrunchRuntimeException("InputSplit for RecordReader is not 
valid split type.");
+    }
+
+    kafkaConnectionProperties = 
filterConnectionProperties(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
+
+    consumer = new KafkaConsumer<>(kafkaConnectionProperties);
+
+    KafkaInputSplit split = (KafkaInputSplit) inputSplit;
+    topicPartition = split.getTopicPartition();
+
+    consumer.assign(Collections.singletonList(topicPartition));
+
+    //suggested hack to gather info without gathering data
+    consumer.poll(0);
+
+    //now seek to the desired start location
+    startingOffset = split.getStartingOffset();
+    consumer.seek(topicPartition, startingOffset);
+
+    currentOffset = startingOffset - 1;
+    endingOffset = split.getEndingOffset();
+
+    maxNumberOfRecords = endingOffset - startingOffset;
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Reading data from {} between {} and {}", new Object[] { 
topicPartition, startingOffset, endingOffset });
+    }
+
+    Configuration config = taskAttemptContext.getConfiguration();
+    consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, 
CONSUMER_POLL_TIMEOUT_DEFAULT);
+    maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, 
KAFKA_RETRY_ATTEMPTS_DEFAULT);
+    maxConcurrentEmptyResponses = 
config.getInt(KAFKA_EMPTY_RETRY_ATTEMPTS_KEY, 
KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT);
+    concurrentEmptyResponses = 0;
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (hasPendingData()) {
+      recordIterator = getRecords();
+      record = recordIterator.hasNext() ? recordIterator.next() : null;
+      if (record != null) {
+        LOG.debug("nextKeyValue: Retrieved record with offset {}", 
record.offset());
+        long oldOffset = currentOffset;
+        currentOffset = record.offset();
+        LOG.debug("Current offset will be updated to be [{}]", currentOffset);
+        if (LOG.isWarnEnabled() && (currentOffset - oldOffset > 1)) {
+          LOG.warn("Offset increment was larger than expected value of one, 
old {} new {}", oldOffset, currentOffset);
+        }
+        return true;
+      } else {
+        LOG.warn("nextKeyValue: Retrieved null record last offset was {} and 
ending offset is {}", currentOffset, endingOffset);
+      }
+    }
+    record = null;
+    return false;
+  }
+
+  @Override
+  public ConsumerRecord<K, V> getCurrentKey() throws IOException, 
InterruptedException {
+    return record == null ? null : record;
+  }
+
+  @Override
+  public Void getCurrentValue() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    //not most accurate but gives reasonable estimate
+    return ((float) (currentOffset - startingOffset + 1)) / maxNumberOfRecords;
+  }
+
+  private boolean hasPendingData() {
+    //offset range is exclusive at the end which means the ending offset is 
one higher
+    // than the actual physical last offset
+
+    boolean hasPending = currentOffset < endingOffset - 1;
+
+    if (concurrentEmptyResponses > maxConcurrentEmptyResponses) {
+      long earliest = getEarliestOffset();
+      if (earliest == endingOffset) {
+        LOG.warn("Possible data loss for {} as earliest {} is equal to {} and 
greater than expected current {}.",
+            new Object[] { topicPartition, earliest, endingOffset, 
currentOffset });
+        return false;
+      }
+    }
+
+    return hasPending;
+  }
+
+  private Iterator<ConsumerRecord<K, V>> getRecords() {
+    if ((recordIterator == null) || !recordIterator.hasNext()) {
+      ConsumerRecords<K, V> records = null;
+      int numTries = 0;
+      boolean success = false;
+      while (!success && (numTries < maxNumberAttempts)) {
+        try {
+          records = getConsumer().poll(consumerPollTimeout);
+        } catch (RetriableException re) {
+          numTries++;
+          if (numTries < maxNumberAttempts) {
+            LOG.warn("Error pulling messages from Kafka. Retrying with attempt 
{}", numTries + 1, re);
+          } else {
+            LOG.error("Error pulling messages from Kafka. Exceeded maximum 
number of attempts {}", maxNumberAttempts, re);
+            throw re;
+          }
+        }
+        if (((records == null) || records.isEmpty()) && hasPendingData()) {
+          concurrentEmptyResponses++;
+          LOG.warn("No records retrieved but pending offsets to consume 
therefore polling again. Attempt {}/{}",
+              concurrentEmptyResponses, maxConcurrentEmptyResponses);
+        } else {
+          success = true;
+        }
+      }
+      concurrentEmptyResponses = 0;
+
+      if ((records == null) || records.isEmpty()) {
+        LOG.info("No records retrieved from Kafka therefore nothing to iterate 
over.");
+      } else {
+        LOG.info("Retrieved records from Kafka to iterate over.");
+      }
+      return records != null ? records.iterator() : ConsumerRecords.<K, 
V>empty().iterator();
+    }
+    return recordIterator;
+  }
+
+  /**
+   * @return the consumer
+   */
+  protected Consumer<K, V> getConsumer() {
+    return consumer;
+  }
+
+  /**
+   * @return the earliest offset of the topic partition
+   */
+  protected long getEarliestOffset() {
+    Map<TopicPartition, Long> brokerOffsets = KafkaUtils
+        .getBrokerOffsets(kafkaConnectionProperties, 
OffsetRequest.EarliestTime(), topicPartition.topic());
+    Long offset = brokerOffsets.get(topicPartition);
+    if (offset == null) {
+      LOG.debug("Unable to determine earliest offset for {} so returning 0", 
topicPartition);
+      return 0L;
+    }
+    LOG.debug("Earliest offset for {} is {}", topicPartition, offset);
+    return offset;
+  }
+
+  @Override
+  public void close() throws IOException {
+    LOG.debug("Closing the record reader.");
+    if (consumer != null) {
+      consumer.close();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordsIterable.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordsIterable.java
 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordsIterable.java
new file mode 100644
index 0000000..3e946ef
--- /dev/null
+++ 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordsIterable.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.kafka.KafkaUtils;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.Set;
+
+class KafkaRecordsIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
+
+  /**
+   * Logger
+   */
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaRecordsIterable.class);
+
+  /**
+   * The Kafka consumer responsible for retrieving messages.
+   */
+  private final Consumer<K, V> consumer;
+
+  /**
+   * The starting positions of the iterable for the topic.
+   */
+  private final Map<TopicPartition, Pair<Long, Long>> offsets;
+
+  /**
+   * Tracks if the iterable is empty.
+   */
+  private final boolean isEmpty;
+
+  /**
+   * The poll time between each request to Kafka
+   */
+  private final long scanPollTime;
+
+  private final int maxRetryAttempts;
+
+  /**
+   * Creates the iterable that will pull values for a collection of topics 
using the provided {@code consumer} between
+   * the {@code startOffsets} and {@code stopOffsets}.
+   *
+   * @param consumer   The consumer for pulling the data from Kafka.  The 
consumer will be closed automatically once all
+   *                   of the records have been consumed.
+   * @param offsets    offsets for pulling data
+   * @param properties properties for tweaking the behavior of the iterable.
+   * @throws IllegalArgumentException if any of the arguments are {@code null} 
or empty.
+   */
+  KafkaRecordsIterable(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, 
Long>> offsets, Properties properties) {
+    if (consumer == null) {
+      throw new IllegalArgumentException("The 'consumer' cannot be 'null'.");
+    }
+    this.consumer = consumer;
+
+    if (properties == null) {
+      throw new IllegalArgumentException("The 'properties' cannot be 'null'.");
+    }
+
+    String retryString = properties
+        .getProperty(KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY, 
KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING);
+    maxRetryAttempts = Integer.parseInt(retryString);
+
+    if (offsets == null || offsets.isEmpty()) {
+      throw new IllegalArgumentException("The 'offsets' cannot 'null' or 
empty.");
+    }
+
+    //filter out any topics and partitions that do not have offset ranges that 
will produce data.
+    Map<TopicPartition, Pair<Long, Long>> filteredOffsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : 
offsets.entrySet()) {
+      Pair<Long, Long> value = entry.getValue();
+      //if start is less than one less than stop then there is data to be had
+      if (value.first() < value.second()) {
+        filteredOffsets.put(entry.getKey(), value);
+      } else {
+        LOG.debug("Removing offsets for {} because start is not less than the 
end offset.", entry.getKey());
+      }
+    }
+
+    //check to make sure that based on the offsets there is data to retrieve, 
otherwise false.
+    //there will be data if the start offsets are less than stop offsets
+    isEmpty = filteredOffsets.isEmpty();
+    if (isEmpty) {
+      LOG.warn("Iterable for Kafka for is empty because offsets are empty.");
+    }
+
+    //assign this
+    this.offsets = filteredOffsets;
+
+    scanPollTime = Long.parseLong(
+        properties.getProperty(KafkaSource.CONSUMER_POLL_TIMEOUT_KEY, 
Long.toString(KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT)));
+  }
+
+  @Override
+  public Iterator<ConsumerRecord<K, V>> iterator() {
+    if (isEmpty) {
+      LOG.debug("Returning empty iterator since offsets align.");
+      return Collections.emptyIterator();
+    }
+    //Assign consumer to all of the partitions
+    LOG.debug("Assigning topics and partitions and seeking to start offsets.");
+
+    consumer.assign(new LinkedList<>(offsets.keySet()));
+    //hack so maybe look at removing this
+    consumer.poll(0);
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : 
offsets.entrySet()) {
+      consumer.seek(entry.getKey(), entry.getValue().first());
+    }
+
+    return new RecordsIterator<K, V>(consumer, offsets, scanPollTime, 
maxRetryAttempts);
+  }
+
+  private static class RecordsIterator<K, V> implements 
Iterator<ConsumerRecord<K, V>> {
+
+    private final Consumer<K, V> consumer;
+    private final Map<TopicPartition, Pair<Long, Long>> offsets;
+    private final long pollTime;
+    private final int maxNumAttempts;
+    private ConsumerRecords<K, V> records;
+    private Iterator<ConsumerRecord<K, V>> currentIterator;
+    private final Set<TopicPartition> remainingPartitions;
+
+    private ConsumerRecord<K, V> next;
+
+    RecordsIterator(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, 
Long>> offsets, long pollTime, int maxNumRetries) {
+      this.consumer = consumer;
+      remainingPartitions = new HashSet<>(offsets.keySet());
+      this.offsets = offsets;
+      this.pollTime = pollTime;
+      this.maxNumAttempts = maxNumRetries;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (next != null)
+        return true;
+
+      //if partitions to consume then pull next value
+      if (remainingPartitions.size() > 0) {
+        next = getNext();
+      }
+
+      return next != null;
+    }
+
+    @Override
+    public ConsumerRecord<K, V> next() {
+      if (next == null) {
+        next = getNext();
+      }
+
+      if (next != null) {
+        ConsumerRecord<K, V> returnedNext = next;
+        //prime for next call
+        next = getNext();
+        //return the current next
+        return returnedNext;
+      } else {
+        throw new NoSuchElementException("No more elements.");
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove is not supported.");
+    }
+
+    /**
+     * Gets the current iterator.
+     *
+     * @return the current iterator or {@code null} if there are no more 
values to consume.
+     */
+    private Iterator<ConsumerRecord<K, V>> getIterator() {
+      if (!remainingPartitions.isEmpty()) {
+        if (currentIterator != null && currentIterator.hasNext()) {
+          return currentIterator;
+        }
+        LOG.debug("Retrieving next set of records.");
+        int numTries = 0;
+        boolean notSuccess = false;
+        while (!notSuccess && numTries < maxNumAttempts) {
+          try {
+            records = consumer.poll(pollTime);
+            notSuccess = true;
+          } catch (RetriableException re) {
+            numTries++;
+            if (numTries < maxNumAttempts) {
+              LOG.warn("Error pulling messages from Kafka. Retrying with 
attempt {}", numTries, re);
+            } else {
+              LOG.error("Error pulling messages from Kafka. Exceeded maximum 
number of attempts {}", maxNumAttempts, re);
+              throw re;
+            }
+          }
+        }
+        if (records == null || records.isEmpty()) {
+          LOG.debug("Retrieved empty records.");
+          currentIterator = null;
+          return null;
+        }
+        currentIterator = records.iterator();
+        return currentIterator;
+      }
+
+      LOG.debug("No more partitions to consume therefore not retrieving any 
more records.");
+      return null;
+    }
+
+    /**
+     * Internal method for retrieving the next value to retrieve.
+     *
+     * @return {@code null} if there are no more values to retrieve otherwise 
the next event.
+     */
+    private ConsumerRecord<K, V> getNext() {
+      while (!remainingPartitions.isEmpty()) {
+        Iterator<ConsumerRecord<K, V>> iterator = getIterator();
+
+        while (iterator != null && iterator.hasNext()) {
+          ConsumerRecord<K, V> record = iterator.next();
+          TopicPartition topicPartition = new TopicPartition(record.topic(), 
record.partition());
+          long offset = record.offset();
+
+          if (withinRange(topicPartition, offset)) {
+            LOG.debug("Retrieving value for {} with offset {}.", 
topicPartition, offset);
+            return record;
+          }
+          LOG.debug("Value for {} with offset {} is outside of range 
skipping.", topicPartition, offset);
+        }
+      }
+
+      LOG.debug("Closing the consumer because there are no more remaining 
partitions.");
+      consumer.close();
+
+      LOG.debug("Consumed data from all partitions.");
+      return null;
+
+    }
+
+    /**
+     * Checks whether the value for {@code topicPartition} with an {@code 
offset} is within scan range.  If
+     * the value is not then {@code false} is returned otherwise {@code true}.
+     *
+     * @param topicPartion The partition for the offset
+     * @param offset       the offset in the partition
+     * @return {@code true} if the value is within the expected consumption 
range, otherwise {@code false}.
+     */
+    private boolean withinRange(TopicPartition topicPartion, long offset) {
+      long endOffset = offsets.get(topicPartion).second();
+      //end offsets are one higher than the last written value.
+      boolean emit = offset < endOffset;
+      if (offset >= endOffset - 1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Completed consuming partition {} with offset {} and 
ending offset {}.",
+              new Object[] { topicPartion, offset, endOffset });
+        }
+        remainingPartitions.remove(topicPartion);
+        consumer.pause(Arrays.asList(topicPartion));
+      }
+      LOG.debug("Value for partition {} and offset {} is within range.", 
topicPartion, offset);
+      return emit;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
new file mode 100644
index 0000000..d7fb763
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Crunch Source that will retrieve events from Kafka given start and end 
offsets.  The source is not designed to
+ * process unbounded data but instead to retrieve data between a specified 
range.
+ * <p>
+ * <p>
+ * The values retrieved from Kafka are returned as {@link ConsumerRecord} with 
key and value as raw bytes. If callers need specific
+ * parsing logic based on the topic then consumers are encouraged to use 
multiple Kafka Sources for each topic and use special
+ * {@link org.apache.crunch.DoFn} to parse the payload.
+ */
+public class KafkaSource
+    implements Source<ConsumerRecord<BytesWritable, BytesWritable>>, 
ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+
+  private final FormatBundle inputBundle;
+  private final Properties props;
+  private final Map<TopicPartition, Pair<Long, Long>> offsets;
+
+  /**
+   * Constant to indicate how long the reader waits before timing out when 
retrieving data from Kafka.
+   */
+  public static final String CONSUMER_POLL_TIMEOUT_KEY = 
"org.apache.crunch.kafka.consumer.poll.timeout";
+
+  /**
+   * Default timeout value for {@link #CONSUMER_POLL_TIMEOUT_KEY} of 1 second.
+   */
+  public static final long CONSUMER_POLL_TIMEOUT_DEFAULT = 1000L;
+
+  /**
+   * Constructs a Kafka source that will read data from the Kafka cluster 
identified by the {@code kafkaConnectionProperties}
+   * and from the specific topics and partitions identified in the {@code 
offsets}
+   *
+   * @param kafkaConnectionProperties The connection properties for reading 
from Kafka.  These properties will be honored
+   *                                  with the exception of the {@link 
ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+   *                                  {@link 
ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+   * @param offsets                   A map of {@link TopicPartition} to a 
pair of start and end offsets respectively.  The start
+   *                                  and end offsets are evaluated at [start, 
end) where the ending offset is excluded.  Each
+   *                                  TopicPartition must have a non-null pair 
describing its offsets.  The start offset should be
+   *                                  less than the end offset.  If the values 
are equal or start is greater than the end then
+   *                                  that partition will be skipped.
+   */
+  public KafkaSource(Properties kafkaConnectionProperties, Map<TopicPartition, 
Pair<Long, Long>> offsets) {
+    this.props = copyAndSetProperties(kafkaConnectionProperties);
+
+    inputBundle = createFormatBundle(props, offsets);
+
+    this.offsets = Collections.unmodifiableMap(new HashMap<>(offsets));
+  }
+
+  @Override
+  public Source<ConsumerRecord<BytesWritable, BytesWritable>> inputConf(String 
key, String value) {
+    inputBundle.set(key, value);
+    return this;
+  }
+
+  @Override
+  public PType<ConsumerRecord<BytesWritable, BytesWritable>> getType() {
+    return ConsumerRecordHelper.CONSUMER_RECORD_P_TYPE;
+  }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return new KafkaSourceConverter();
+  }
+
+  @Override
+  public long getSize(Configuration configuration) {
+    // TODO something smarter here.
+    return 1000L * 1000L * 1000L;
+  }
+
+  @Override
+  public String toString() {
+    return "KafkaSource(" + 
props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + ")";
+  }
+
+  @Override
+  public long getLastModifiedAt(Configuration configuration) {
+    LOG.warn("Cannot determine last modified time for source: {}", toString());
+    return -1;
+  }
+
+  private static <K, V> FormatBundle createFormatBundle(Properties 
kafkaConnectionProperties,
+      Map<TopicPartition, Pair<Long, Long>> offsets) {
+
+    FormatBundle<KafkaInputFormat> bundle = 
FormatBundle.forInput(KafkaInputFormat.class);
+
+    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+    
KafkaInputFormat.writeConnectionPropertiesToBundle(kafkaConnectionProperties, 
bundle);
+
+    return bundle;
+  }
+
+  private static Properties copyAndSetProperties(Properties 
kafkaConnectionProperties) {
+    Properties props = new Properties();
+
+    //set the default to be earliest for auto reset but allow it to be 
overridden if appropriate.
+    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+    props.putAll(kafkaConnectionProperties);
+
+    //Setting the key/value deserializer to ensure proper translation from 
Kafka to PType format.
+    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
+    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BytesDeserializer.class.getName());
+
+    return props;
+  }
+
+  @Override
+  public Iterable<ConsumerRecord<BytesWritable, BytesWritable>> 
read(Configuration conf) throws IOException {
+    // consumer will get closed when the iterable is fully consumed.
+    // skip using the inputformat/splits since this will be read in a single 
JVM and don't need the complexity
+    // of parallelism when reading.
+    Consumer<BytesWritable, BytesWritable> consumer = new 
KafkaConsumer<>(props);
+    return new KafkaRecordsIterable<>(consumer, offsets, props);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void configureSource(Job job, int inputId) throws IOException {
+    Configuration conf = job.getConfiguration();
+    //an id of -1 indicates that this is the only input so just use it directly
+    if (inputId == -1) {
+      job.setMapperClass(CrunchMapper.class);
+      job.setInputFormatClass(inputBundle.getFormatClass());
+      inputBundle.configure(conf);
+    } else {
+      //there are multiple inputs for this mapper so add it as a CrunchInputs 
and need a fake path just to
+      //make it play well with other file based inputs.
+      Path dummy = new Path("/kafka/" + inputId);
+      CrunchInputs.addInputPath(job, dummy, inputBundle, inputId);
+    }
+  }
+
+  //exposed for testing purposes
+  FormatBundle getInputBundle() {
+    return inputBundle;
+  }
+
+  @Override
+  public ReadableData<ConsumerRecord<BytesWritable, BytesWritable>> 
asReadable() {
+    // skip using the inputformat/splits since this will be read in a single 
JVM and don't need the complexity
+    // of parallelism when reading.
+    return new KafkaData<>(props, offsets);
+  }
+
+  /**
+   * Basic {@link Deserializer} which simply wraps the payload as a {@link 
BytesWritable}.
+   */
+  public static class BytesDeserializer implements Deserializer<BytesWritable> 
{
+
+    @Override
+    public void configure(Map<String, ?> configProperties, boolean isKey) {
+      //no-op
+    }
+
+    @Override
+    public BytesWritable deserialize(String topic, byte[] valueBytes) {
+      return new BytesWritable(valueBytes);
+    }
+
+    @Override
+    public void close() {
+      //no-op
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSourceConverter.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSourceConverter.java
 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSourceConverter.java
new file mode 100644
index 0000000..b3d56c0
--- /dev/null
+++ 
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSourceConverter.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.crunch.types.Converter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * {@link Converter} for {@link KafkaSource}
+ */
+public class KafkaSourceConverter implements
+    Converter<ConsumerRecord<BytesWritable, BytesWritable>, Void, 
ConsumerRecord<BytesWritable, BytesWritable>, 
Iterable<ConsumerRecord<BytesWritable, BytesWritable>>> {
+
+  private static final long serialVersionUID = 5270341393169043945L;
+
+  @Override
+  public ConsumerRecord<BytesWritable, BytesWritable> 
convertInput(ConsumerRecord<BytesWritable, BytesWritable> record,
+      Void aVoid) {
+    return record;
+  }
+
+  @Override
+  public Iterable<ConsumerRecord<BytesWritable, BytesWritable>> 
convertIterableInput(
+      ConsumerRecord<BytesWritable, BytesWritable> 
bytesWritableBytesWritableConsumerRecord, Iterable<Void> iterable) {
+    throw new UnsupportedOperationException("Should not be possible");
+  }
+
+  @Override
+  public ConsumerRecord<BytesWritable, BytesWritable> 
outputKey(ConsumerRecord<BytesWritable, BytesWritable> record) {
+    return record;
+  }
+
+  @Override
+  public Void outputValue(ConsumerRecord<BytesWritable, BytesWritable> record) 
{
+    // No value, we just use the record as the key.
+    return null;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Class<ConsumerRecord<BytesWritable, BytesWritable>> getKeyClass() {
+    return (Class<ConsumerRecord<BytesWritable, BytesWritable>>) (Object) 
ConsumerRecord.class;
+  }
+
+  @Override
+  public Class<Void> getValueClass() {
+    return Void.class;
+  }
+
+  @Override
+  public boolean applyPTypeTransforms() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
index 38ded40..0694551 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
@@ -51,6 +51,9 @@ import java.util.Properties;
     KafkaSourceIT.class, KafkaRecordsIterableIT.class, KafkaDataIT.class,
     //org.apache.crunch.kafka.inputformat
     KafkaRecordReaderIT.class, KafkaInputFormatIT.class, KafkaUtilsIT.class,
+    // org.apache.crunch.kafka.record
+    org.apache.crunch.kafka.record.KafkaSourceIT.class, 
org.apache.crunch.kafka.record.KafkaRecordsIterableIT.class,
+    org.apache.crunch.kafka.record.KafkaDataIT.class
 })
 public class ClusterTest {
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
index 36f293f..fd940f3 100644
--- 
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
+++ 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
@@ -17,7 +17,6 @@
  */
 package org.apache.crunch.kafka;
 
-
 import kafka.api.OffsetRequest;
 import org.apache.crunch.Pair;
 import org.apache.kafka.clients.consumer.Consumer;

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/ConsumerRecordHelperTest.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/ConsumerRecordHelperTest.java
 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/ConsumerRecordHelperTest.java
new file mode 100644
index 0000000..e07d3bc
--- /dev/null
+++ 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/ConsumerRecordHelperTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class ConsumerRecordHelperTest {
+
+  @Test (expected = IllegalArgumentException.class)
+  public void serialize_nullRecord() throws IOException {
+    ConsumerRecordHelper.serialize(null);
+  }
+
+  @Test (expected = IllegalArgumentException.class)
+  public void deserialize_nullRecord() throws IOException {
+    ConsumerRecordHelper.deserialize(null);
+  }
+
+  @Test
+  public void serializeDeserialize() throws IOException {
+    ConsumerRecord<BytesWritable, BytesWritable> record = new 
ConsumerRecord<>("topic",
+        1, 2, 3L, TimestampType.CREATE_TIME, 4L, 5,
+        6, new BytesWritable("key".getBytes()), new 
BytesWritable("value".getBytes()));
+
+    ConsumerRecord<BytesWritable, BytesWritable> newRecord = 
ConsumerRecordHelper.deserialize(
+        ConsumerRecordHelper.serialize(record));
+
+    assertRecordsAreEqual(record, newRecord);
+  }
+
+  @Test
+  public void serializeDeserialize_nullKeyValue() throws IOException {
+    ConsumerRecord<BytesWritable, BytesWritable> record = new 
ConsumerRecord<>("topic",
+        1, 2, 3L, TimestampType.CREATE_TIME, 4L, 5,
+        6, null, null);
+
+    ConsumerRecord<BytesWritable, BytesWritable> newRecord = 
ConsumerRecordHelper.deserialize(
+        ConsumerRecordHelper.serialize(record));
+
+    assertRecordsAreEqual(record, newRecord);
+  }
+
+  @Test
+  public void mapFns() throws IOException {
+    ConsumerRecord<BytesWritable, BytesWritable> record = new 
ConsumerRecord<>("topic",
+        1, 2, 3L, TimestampType.CREATE_TIME, 4L, 5,
+        6, new BytesWritable("key".getBytes()), new 
BytesWritable("value".getBytes()));
+
+    PCollection<BytesWritable> bytes = MemPipeline.collectionOf(new 
BytesWritable(ConsumerRecordHelper.serialize(record)));
+    PCollection<ConsumerRecord<BytesWritable, BytesWritable>> records  = 
bytes.parallelDo(
+        new ConsumerRecordHelper.BytesToConsumerRecord(), 
ConsumerRecordHelper.CONSUMER_RECORD_P_TYPE);
+    PCollection<BytesWritable> newBytes  = records.parallelDo(
+        new ConsumerRecordHelper.ConsumerRecordToBytes(), 
Writables.writables(BytesWritable.class));
+
+    ConsumerRecord<BytesWritable, BytesWritable> newRecord = 
ConsumerRecordHelper.deserialize(
+        newBytes.materialize().iterator().next().getBytes());
+
+    assertRecordsAreEqual(record, newRecord);
+  }
+
+  private void assertRecordsAreEqual(ConsumerRecord<BytesWritable, 
BytesWritable> record1,
+      ConsumerRecord<BytesWritable, BytesWritable> record2) {
+    // ConsumerRecord doesn't implement equals so have to verify each field
+    assertThat(record1.topic(), is(record2.topic()));
+    assertThat(record1.partition(), is(record2.partition()));
+    assertThat(record1.offset(), is(record2.offset()));
+    assertThat(record1.timestamp(), is(record2.timestamp()));
+    assertThat(record1.timestampType(), is(record2.timestampType()));
+    assertThat(record1.checksum(), is(record2.checksum()));
+    assertThat(record1.serializedKeySize(), is(record2.serializedKeySize()));
+    assertThat(record1.serializedValueSize(), 
is(record2.serializedValueSize()));
+    assertThat(record1.key(), is(record2.key()));
+    assertThat(record1.value(), is(record2.value()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java
----------------------------------------------------------------------
diff --git 
a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java 
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java
new file mode 100644
index 0000000..475f20f
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.crunch.kafka.*;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.crunch.kafka.ClusterTest.writeData;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class KafkaDataIT {
+  @Rule
+  public TestName testName = new TestName();
+
+  private String topic;
+  private Map<TopicPartition, Long> startOffsets;
+  private Map<TopicPartition, Long> stopOffsets;
+  private Map<TopicPartition, Pair<Long, Long>> offsets;
+  private Properties props;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setup() {
+    topic = UUID.randomUUID().toString();
+
+    props = ClusterTest.getConsumerProperties();
+
+    startOffsets = new HashMap<>();
+    stopOffsets = new HashMap<>();
+    offsets = new HashMap<>();
+    for (int i = 0; i < 4; i++) {
+      TopicPartition tp = new TopicPartition(topic, i);
+      startOffsets.put(tp, 0L);
+      stopOffsets.put(tp, 100L);
+
+      offsets.put(tp, Pair.of(0L, 100L));
+    }
+  }
+
+  @Test
+  public void getDataIterable() throws IOException {
+    int loops = 10;
+    int numPerLoop = 100;
+    int total = loops * numPerLoop;
+    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), 
stopOffsets.get(entry.getKey())));
+    }
+
+    Iterable<ConsumerRecord<String, String>> data = new KafkaData<String, 
String>(props, offsets).read(null);
+
+    int count = 0;
+    for (ConsumerRecord<String, String> record : data) {
+      assertThat(keys, hasItem(record.key()));
+      assertTrue(keys.remove(record.key()));
+      count++;
+    }
+
+    assertThat(count, is(total));
+    assertThat(keys.size(), is(0));
+  }
+
+  private static Map<TopicPartition, Long> getStopOffsets(Properties props, 
String topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), 
topic);
+  }
+
+  private static Map<TopicPartition, Long> getStartOffsets(Properties props, 
String topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), 
topic);
+  }
+}

Reply via email to