CRUNCH-653: Created KafkaSource that provides ConsumerRecord messages Signed-off-by: Josh Wills <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f4734781 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f4734781 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f4734781 Branch: refs/heads/master Commit: f473478141144dea5ce422309aec37e8212a9f1e Parents: 28ab199 Author: Bryan Baugher <[email protected]> Authored: Wed Aug 16 16:19:42 2017 -0500 Committer: Josh Wills <[email protected]> Committed: Thu Oct 26 21:01:19 2017 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/kafka/KafkaSource.java | 3 + .../kafka/record/ConsumerRecordHelper.java | 166 ++++++++ .../apache/crunch/kafka/record/KafkaData.java | 62 +++ .../crunch/kafka/record/KafkaInputFormat.java | 318 ++++++++++++++ .../crunch/kafka/record/KafkaInputSplit.java | 121 ++++++ .../crunch/kafka/record/KafkaRecordReader.java | 235 +++++++++++ .../kafka/record/KafkaRecordsIterable.java | 292 +++++++++++++ .../apache/crunch/kafka/record/KafkaSource.java | 214 ++++++++++ .../kafka/record/KafkaSourceConverter.java | 70 ++++ .../org/apache/crunch/kafka/ClusterTest.java | 3 + .../crunch/kafka/KafkaRecordsIterableIT.java | 1 - .../kafka/record/ConsumerRecordHelperTest.java | 101 +++++ .../apache/crunch/kafka/record/KafkaDataIT.java | 118 ++++++ .../kafka/record/KafkaRecordsIterableIT.java | 416 +++++++++++++++++++ .../crunch/kafka/record/KafkaSourceIT.java | 207 +++++++++ 15 files changed, 2326 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java index bdcb7a9..1614053 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java @@ -58,7 +58,10 @@ import java.util.Properties; * The values retrieved from Kafka are returned as raw bytes inside of a {@link BytesWritable}. If callers * need specific parsing logic based on the topic then consumers are encouraged to use multiple Kafka Sources * for each topic and use special {@link DoFn} to parse the payload. + * + * @deprecated Use {@link org.apache.crunch.kafka.record.KafkaSource} instead */ +@Deprecated public class KafkaSource implements TableSource<BytesWritable, BytesWritable>, ReadableSource<Pair<BytesWritable, BytesWritable>> { http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/ConsumerRecordHelper.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/ConsumerRecordHelper.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/ConsumerRecordHelper.java new file mode 100644 index 0000000..0520ed8 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/ConsumerRecordHelper.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.record; + +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.MapFn; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.io.BytesWritable; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * Serializer/De-Serializer for Kafka's {@link ConsumerRecord} + */ +public class ConsumerRecordHelper { + + /** + * PType for {@link ConsumerRecord} + */ + @SuppressWarnings("unchecked") + public static final PType<ConsumerRecord<BytesWritable, BytesWritable>> CONSUMER_RECORD_P_TYPE = Writables + .derived((Class<ConsumerRecord<BytesWritable, BytesWritable>>) (Object) ConsumerRecord.class, + new ConsumerRecordHelper.BytesToConsumerRecord(), new ConsumerRecordHelper.ConsumerRecordToBytes(), + Writables.writables(BytesWritable.class)); + + /** + * Serializes the record into {@code byte[]}s + * + * @param record the record to serialize + * @return the record in {@code byte[]}s + * @throws IllegalArgumentException if record is {@code null} + * @throws IOException if there is an issue during serialization + */ + public static byte[] serialize(ConsumerRecord<BytesWritable, BytesWritable> record) throws IOException { + if (record == null) + throw new IllegalArgumentException("record cannot be null"); + + ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); + + try (DataOutputStream dataOut = new DataOutputStream(byteOut)) { + dataOut.writeUTF(record.topic()); + dataOut.writeInt(record.partition()); + dataOut.writeLong(record.offset()); + dataOut.writeLong(record.timestamp()); + dataOut.writeUTF(record.timestampType().name); + dataOut.writeLong(record.checksum()); + dataOut.writeInt(record.serializedKeySize()); + dataOut.writeInt(record.serializedValueSize()); + + if (record.key() == null) { + dataOut.writeInt(-1); + } else { + byte[] keyBytes = record.key().getBytes(); + dataOut.writeInt(keyBytes.length); + dataOut.write(keyBytes); + } + + if (record.value() == null) { + dataOut.writeInt(-1); + } else { + byte[] valueBytes = record.value().getBytes(); + dataOut.writeInt(valueBytes.length); + dataOut.write(valueBytes); + } + + return byteOut.toByteArray(); + } + } + + /** + * De-serializes the bytes into a {@link ConsumerRecord} + * + * @param bytes the bytes of a {@link ConsumerRecord} + * @return a {@link ConsumerRecord} from the bytes + * @throws IllegalArgumentException if bytes is {@code null} + * @throws IOException if there is an issue de-serializing the bytes + */ + public static ConsumerRecord<BytesWritable, BytesWritable> deserialize(byte[] bytes) throws IOException { + if (bytes == null) + throw new IllegalArgumentException("bytes cannot be null"); + + try (DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(bytes))) { + String topic = dataIn.readUTF(); + int partition = dataIn.readInt(); + long offset = dataIn.readLong(); + long timestamp = dataIn.readLong(); + String timestampTypeName = dataIn.readUTF(); + long checksum = dataIn.readLong(); + int serializedKeySize = dataIn.readInt(); + int serializedValueSize = dataIn.readInt(); + + BytesWritable key = null; + int keySize = dataIn.readInt(); + if (keySize != -1) { + byte[] keyBytes = new byte[keySize]; + dataIn.readFully(keyBytes); + key = new BytesWritable(keyBytes); + } + + BytesWritable value = null; + int valueSize = dataIn.readInt(); + if (valueSize != -1) { + byte[] valueBytes = new byte[valueSize]; + dataIn.readFully(valueBytes); + value = new BytesWritable(valueBytes); + } + + return new ConsumerRecord<>(topic, partition, offset, timestamp, TimestampType.forName(timestampTypeName), checksum, + serializedKeySize, serializedValueSize, key, value); + } + } + + /** + * {@link MapFn} to convert {@link ConsumerRecord} to {@link BytesWritable} + */ + public static class ConsumerRecordToBytes extends MapFn<ConsumerRecord<BytesWritable, BytesWritable>, BytesWritable> { + private static final long serialVersionUID = -6821080008375335537L; + + @Override + public BytesWritable map(ConsumerRecord<BytesWritable, BytesWritable> record) { + try { + return new BytesWritable(ConsumerRecordHelper.serialize(record)); + } catch (IOException e) { + throw new CrunchRuntimeException("Error serializing consumer record " + record, e); + } + } + } + + /** + * {@link MapFn} to convert {@link BytesWritable} to {@link ConsumerRecord} + */ + public static class BytesToConsumerRecord extends MapFn<BytesWritable, ConsumerRecord<BytesWritable, BytesWritable>> { + private static final long serialVersionUID = -6545017910063252322L; + + @Override + public ConsumerRecord<BytesWritable, BytesWritable> map(BytesWritable bytesWritable) { + try { + return ConsumerRecordHelper.deserialize(bytesWritable.getBytes()); + } catch (IOException e) { + throw new CrunchRuntimeException("Error deserializing consumer record", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaData.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaData.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaData.java new file mode 100644 index 0000000..7cd9ee1 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaData.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.record; + +import org.apache.crunch.Pair; +import org.apache.crunch.ReadableData; +import org.apache.crunch.SourceTarget; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +class KafkaData<K, V> implements ReadableData<ConsumerRecord<K, V>> { + + private static final long serialVersionUID = -6582212311361579556L; + + private final Map<TopicPartition, Pair<Long, Long>> offsets; + private final Properties props; + + KafkaData(Properties connectionProperties, Map<TopicPartition, Pair<Long, Long>> offsets) { + this.props = connectionProperties; + this.offsets = offsets; + } + + @Override + public Set<SourceTarget<?>> getSourceTargets() { + return null; + } + + @Override + public void configure(Configuration conf) { + //no-op + } + + @Override + public Iterable<ConsumerRecord<K, V>> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException { + Consumer<K, V> consumer = new KafkaConsumer<>(props); + return new KafkaRecordsIterable<>(consumer, offsets, props); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java new file mode 100644 index 0000000..4e378ce --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java @@ -0,0 +1,318 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.record; + +import org.apache.commons.lang.StringUtils; +import org.apache.crunch.Pair; +import org.apache.crunch.io.FormatBundle; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * Basic input format for reading data from Kafka. Data is read and provided as {@link ConsumerRecord} with the data left in its + * raw byte form. + * <p> + * Populating the configuration of the input format is handled with the convenience method of + * {@link #writeOffsetsToConfiguration(Map, Configuration)}. This should be done to ensure + * the Kafka offset information is available when the input format {@link #getSplits(JobContext) creates its splits} + * and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}. + * <p> + * To allow for suppression of warnings referring to unknown configs in the Kafka {@code ConsumerConfig}, properties containing + * Kafka connection information will be prefixed using {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey} + * and will be written to the {@code FormatBundle} using + * {@link #writeConnectionPropertiesToBundle(Properties, FormatBundle) writeConnectionPropertiesToBundle}. These properties can then + * be retrieved using {@link #filterConnectionProperties(Properties) filterConnectionProperties}. + */ +public class KafkaInputFormat extends InputFormat<ConsumerRecord<BytesWritable, BytesWritable>, Void> implements Configurable { + + /** + * Constant for constructing configuration keys for the input format. + */ + private static final String KAFKA_INPUT_OFFSETS_BASE = "org.apache.crunch.kafka.offsets.topic"; + + /** + * Constant used for building configuration keys and specifying partitions. + */ + private static final String PARTITIONS = "partitions"; + + /** + * Constant used for building configuration keys and specifying the start of a partition. + */ + private static final String START = "start"; + + /** + * Constant used for building configuration keys and specifying the end of a partition. + */ + private static final String END = "end"; + + /** + * Regex to discover all of the defined partitions which should be consumed by the input format. + */ + private static final String TOPIC_KEY_REGEX = Pattern.quote(KAFKA_INPUT_OFFSETS_BASE) + "\\..*\\." + PARTITIONS + "$"; + + /** + * Constant for constructing configuration keys for the Kafka connection properties. + */ + private static final String KAFKA_CONNECTION_PROPERTY_BASE = "org.apache.crunch.kafka.connection.properties"; + + /** + * Regex to discover all of the defined Kafka connection properties which should be passed to the ConsumerConfig. + */ + private static final Pattern CONNECTION_PROPERTY_REGEX = Pattern + .compile(Pattern.quote(KAFKA_CONNECTION_PROPERTY_BASE) + "\\..*$"); + + private Configuration configuration; + + @Override + public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException { + Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf()); + List<InputSplit> splits = new LinkedList<>(); + for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + + long start = entry.getValue().first(); + long end = entry.getValue().second(); + if (start != end) { + splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), entry.getValue().first(), + entry.getValue().second())); + } + } + + return splits; + } + + @Override + public RecordReader<ConsumerRecord<BytesWritable, BytesWritable>, Void> createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return new KafkaRecordReader<>(); + } + + @Override + public void setConf(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public Configuration getConf() { + return configuration; + } + + //The following methods are used for reading and writing Kafka Partition offset information into Hadoop's Configuration + //objects and into Crunch's FormatBundle. For a specific Kafka Topic it might have one or many partitions and for + //each partition it will need a start and end offset. Assuming you have a topic of "abc" and it has 2 partitions the + //configuration would be populated with the following: + // org.apache.crunch.kafka.offsets.topic.abc.partitions = [0,1] + // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start = <partition start> + // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end = <partition end> + // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start = <partition start> + // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end = <partition end> + + /** + * Writes the start and end offsets for the provided topic partitions to the {@code bundle}. + * + * @param offsets The starting and ending offsets for the topics and partitions. + * @param bundle the bundle into which the information should be persisted. + */ + public static void writeOffsetsToBundle(Map<TopicPartition, Pair<Long, Long>> offsets, FormatBundle bundle) { + for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) { + bundle.set(entry.getKey(), entry.getValue()); + } + } + + /** + * Writes the start and end offsets for the provided topic partitions to the {@code config}. + * + * @param offsets The starting and ending offsets for the topics and partitions. + * @param config the config into which the information should be persisted. + */ + public static void writeOffsetsToConfiguration(Map<TopicPartition, Pair<Long, Long>> offsets, Configuration config) { + for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) { + config.set(entry.getKey(), entry.getValue()); + } + } + + /** + * Reads the {@code configuration} to determine which topics, partitions, and offsets should be used for reading data. + * + * @param configuration the configuration to derive the data to read. + * @return a map of {@link TopicPartition} to a pair of start and end offsets. + * @throws IllegalStateException if the {@code configuration} does not have the start and end offsets set properly + * for a partition. + */ + public static Map<TopicPartition, Pair<Long, Long>> getOffsets(Configuration configuration) { + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + //find configuration for all of the topics with defined partitions + Map<String, String> topicPartitionKeys = configuration.getValByRegex(TOPIC_KEY_REGEX); + + //for each topic start to process it's partitions + for (String key : topicPartitionKeys.keySet()) { + String topic = getTopicFromKey(key); + int[] partitions = configuration.getInts(key); + //for each partition find and add the start/end offset + for (int partitionId : partitions) { + TopicPartition topicPartition = new TopicPartition(topic, partitionId); + long start = configuration.getLong(generatePartitionStartKey(topic, partitionId), Long.MIN_VALUE); + long end = configuration.getLong(generatePartitionEndKey(topic, partitionId), Long.MIN_VALUE); + + if (start == Long.MIN_VALUE || end == Long.MIN_VALUE) { + throw new IllegalStateException( + "The " + topicPartition + " has an invalid start:" + start + " or end:" + end + " offset configured."); + } + + offsets.put(topicPartition, Pair.of(start, end)); + } + } + + return offsets; + } + + private static Map<String, String> generateValues(Map<TopicPartition, Pair<Long, Long>> offsets) { + Map<String, String> offsetConfigValues = new HashMap<>(); + Map<String, Set<Integer>> topicsPartitions = new HashMap<>(); + + for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + String topic = topicPartition.topic(); + int partition = topicPartition.partition(); + String startKey = generatePartitionStartKey(topic, partition); + String endKey = generatePartitionEndKey(topic, partition); + //Add the start and end offsets for a specific partition + offsetConfigValues.put(startKey, Long.toString(entry.getValue().first())); + offsetConfigValues.put(endKey, Long.toString(entry.getValue().second())); + + Set<Integer> partitions = topicsPartitions.get(topic); + if (partitions == null) { + partitions = new HashSet<>(); + topicsPartitions.put(topic, partitions); + } + partitions.add(partition); + } + + //generate the partitions values for each topic + for (Map.Entry<String, Set<Integer>> entry : topicsPartitions.entrySet()) { + String key = KAFKA_INPUT_OFFSETS_BASE + "." + entry.getKey() + "." + PARTITIONS; + Set<Integer> partitions = entry.getValue(); + String partitionsString = StringUtils.join(partitions, ","); + offsetConfigValues.put(key, partitionsString); + } + + return offsetConfigValues; + } + + static String generatePartitionStartKey(String topic, int partition) { + return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + START; + } + + static String generatePartitionEndKey(String topic, int partition) { + return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + END; + } + + static String generateTopicPartitionsKey(String topic) { + return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS; + } + + static String getTopicFromKey(String key) { + //strip off the base key + a trailing "." + String value = key.substring(KAFKA_INPUT_OFFSETS_BASE.length() + 1); + //strip off the end part + a preceding "." + value = value.substring(0, (value.length() - (PARTITIONS.length() + 1))); + + return value; + } + + // The following methods are used for writing prefixed Kafka connection properties into Crunch's FormatBundle and + // {@link #filterConnectionProperties(Properties props) filtering} out the prefixed properties. This allows for + // suppression of {@code ConsumerConfig} warnings that are generated by unused properties carried over from the Hadoop + // properties. + + /** + * Writes the Kafka connection properties to the {@code bundle}. The connection properties are prefixed with + * "org.apache.crunch.kafka.connection.properties" to allow for suppression of unused {@code ConsumerConfig} warnings + * generated by unused Hadoop properties. + * + * @param connectionProperties The Kafka connection properties to be prefixed for later + * {@link #filterConnectionProperties(Properties props) filtering}. + * @param bundle The bundle into which the information should be persisted. + */ + public static void writeConnectionPropertiesToBundle(Properties connectionProperties, FormatBundle bundle) { + for (final String name : connectionProperties.stringPropertyNames()) { + bundle.set(generateConnectionPropertyKey(name), connectionProperties.getProperty(name)); + } + } + + /** + * Filters out properties properties written by + * {@link #writeConnectionPropertiesToBundle(Properties, FormatBundle) writeConnectionPropertiesToBundle}. + * + * @param props The properties to be filtered. + * @return The properties containing Kafka connection information that were written by + * {@link #writeConnectionPropertiesToBundle(Properties, FormatBundle) writeConnectionPropertiesToBundle}. + */ + public static Properties filterConnectionProperties(Properties props) { + Properties filteredProperties = new Properties(); + + for (final String name : props.stringPropertyNames()) { + if (CONNECTION_PROPERTY_REGEX.matcher(name).matches()) { + filteredProperties.put(getConnectionPropertyFromKey(name), props.getProperty(name)); + } + } + + return filteredProperties; + } + + /** + * Prefixes a given property with "org.apache.crunch.kafka.connection.properties" to allow for filtering with + * {@link #filterConnectionProperties(Properties) filterConnectionProperties}. + * + * @param property The Kafka connection property that will be prefixed for retrieval at a later time. + * @return The property prefixed with "org.apache.crunch.kafka.connection.properties" + */ + static String generateConnectionPropertyKey(String property) { + return KAFKA_CONNECTION_PROPERTY_BASE + "." + property; + } + + /** + * Retrieves the original property that was prefixed using + * {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey}. + * + * @param key The key that was prefixed using {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey}. + * @return The original property prior to prefixing. + */ + static String getConnectionPropertyFromKey(String key) { + // Strip off the base key + a trailing "." + return key.substring(KAFKA_CONNECTION_PROPERTY_BASE.length() + 1); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputSplit.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputSplit.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputSplit.java new file mode 100644 index 0000000..b2b0bb9 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputSplit.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.record; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.kafka.common.TopicPartition; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * InputSplit that represent retrieving data from a single {@link TopicPartition} between the specified start + * and end offsets. + */ +public class KafkaInputSplit extends InputSplit implements Writable { + + private long startingOffset; + private long endingOffset; + private TopicPartition topicPartition; + + /** + * Nullary Constructor for creating the instance inside the Mapper instance. + */ + public KafkaInputSplit() { + + } + + /** + * Constructs an input split for the provided {@code topic} and {@code partition} restricting data to be between + * the {@code startingOffset} and {@code endingOffset} + * + * @param topic the topic for the split + * @param partition the partition for the topic + * @param startingOffset the start of the split + * @param endingOffset the end of the split + */ + public KafkaInputSplit(String topic, int partition, long startingOffset, long endingOffset) { + this.startingOffset = startingOffset; + this.endingOffset = endingOffset; + topicPartition = new TopicPartition(topic, partition); + } + + @Override + public long getLength() throws IOException, InterruptedException { + // This is just used as a hint for size of bytes so it is already inaccurate. + return startingOffset > 0 ? endingOffset - startingOffset : endingOffset; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + //Leave empty since data locality not really an issue. + return new String[0]; + } + + /** + * Returns the topic and partition for the split + * + * @return the topic and partition for the split + */ + public TopicPartition getTopicPartition() { + return topicPartition; + } + + /** + * Returns the starting offset for the split + * + * @return the starting offset for the split + */ + public long getStartingOffset() { + return startingOffset; + } + + /** + * Returns the ending offset for the split + * + * @return the ending offset for the split + */ + public long getEndingOffset() { + return endingOffset; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeUTF(topicPartition.topic()); + dataOutput.writeInt(topicPartition.partition()); + dataOutput.writeLong(startingOffset); + dataOutput.writeLong(endingOffset); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + String topic = dataInput.readUTF(); + int partition = dataInput.readInt(); + startingOffset = dataInput.readLong(); + endingOffset = dataInput.readLong(); + + topicPartition = new TopicPartition(topic, partition); + } + + @Override + public String toString() { + return getTopicPartition() + " Start: " + startingOffset + " End: " + endingOffset; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java new file mode 100644 index 0000000..ec138b3 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.record; + +import kafka.api.OffsetRequest; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.kafka.KafkaUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +import static org.apache.crunch.kafka.KafkaUtils.KAFKA_EMPTY_RETRY_ATTEMPTS_KEY; +import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT; +import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY; +import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT; +import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties; +import static org.apache.crunch.kafka.record.KafkaInputFormat.filterConnectionProperties; +import static org.apache.crunch.kafka.record.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT; +import static org.apache.crunch.kafka.record.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY; + +/** + * A {@link RecordReader} for pulling data from Kafka. + * + * @param <K> the key of the records from Kafka + * @param <V> the value of the records from Kafka + */ +public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, V>, Void> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class); + + private Consumer<K, V> consumer; + private ConsumerRecord<K, V> record; + private long endingOffset; + private Iterator<ConsumerRecord<K, V>> recordIterator; + private long consumerPollTimeout; + private long maxNumberOfRecords; + private long startingOffset; + private long currentOffset; + private int maxNumberAttempts; + private Properties kafkaConnectionProperties; + private int maxConcurrentEmptyResponses; + private int concurrentEmptyResponses; + private TopicPartition topicPartition; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + if (!(inputSplit instanceof KafkaInputSplit)) { + throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type."); + } + + kafkaConnectionProperties = filterConnectionProperties(getKafkaConnectionProperties(taskAttemptContext.getConfiguration())); + + consumer = new KafkaConsumer<>(kafkaConnectionProperties); + + KafkaInputSplit split = (KafkaInputSplit) inputSplit; + topicPartition = split.getTopicPartition(); + + consumer.assign(Collections.singletonList(topicPartition)); + + //suggested hack to gather info without gathering data + consumer.poll(0); + + //now seek to the desired start location + startingOffset = split.getStartingOffset(); + consumer.seek(topicPartition, startingOffset); + + currentOffset = startingOffset - 1; + endingOffset = split.getEndingOffset(); + + maxNumberOfRecords = endingOffset - startingOffset; + if (LOG.isInfoEnabled()) { + LOG.info("Reading data from {} between {} and {}", new Object[] { topicPartition, startingOffset, endingOffset }); + } + + Configuration config = taskAttemptContext.getConfiguration(); + consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT); + maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT); + maxConcurrentEmptyResponses = config.getInt(KAFKA_EMPTY_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT); + concurrentEmptyResponses = 0; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (hasPendingData()) { + recordIterator = getRecords(); + record = recordIterator.hasNext() ? recordIterator.next() : null; + if (record != null) { + LOG.debug("nextKeyValue: Retrieved record with offset {}", record.offset()); + long oldOffset = currentOffset; + currentOffset = record.offset(); + LOG.debug("Current offset will be updated to be [{}]", currentOffset); + if (LOG.isWarnEnabled() && (currentOffset - oldOffset > 1)) { + LOG.warn("Offset increment was larger than expected value of one, old {} new {}", oldOffset, currentOffset); + } + return true; + } else { + LOG.warn("nextKeyValue: Retrieved null record last offset was {} and ending offset is {}", currentOffset, endingOffset); + } + } + record = null; + return false; + } + + @Override + public ConsumerRecord<K, V> getCurrentKey() throws IOException, InterruptedException { + return record == null ? null : record; + } + + @Override + public Void getCurrentValue() throws IOException, InterruptedException { + return null; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + //not most accurate but gives reasonable estimate + return ((float) (currentOffset - startingOffset + 1)) / maxNumberOfRecords; + } + + private boolean hasPendingData() { + //offset range is exclusive at the end which means the ending offset is one higher + // than the actual physical last offset + + boolean hasPending = currentOffset < endingOffset - 1; + + if (concurrentEmptyResponses > maxConcurrentEmptyResponses) { + long earliest = getEarliestOffset(); + if (earliest == endingOffset) { + LOG.warn("Possible data loss for {} as earliest {} is equal to {} and greater than expected current {}.", + new Object[] { topicPartition, earliest, endingOffset, currentOffset }); + return false; + } + } + + return hasPending; + } + + private Iterator<ConsumerRecord<K, V>> getRecords() { + if ((recordIterator == null) || !recordIterator.hasNext()) { + ConsumerRecords<K, V> records = null; + int numTries = 0; + boolean success = false; + while (!success && (numTries < maxNumberAttempts)) { + try { + records = getConsumer().poll(consumerPollTimeout); + } catch (RetriableException re) { + numTries++; + if (numTries < maxNumberAttempts) { + LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries + 1, re); + } else { + LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumberAttempts, re); + throw re; + } + } + if (((records == null) || records.isEmpty()) && hasPendingData()) { + concurrentEmptyResponses++; + LOG.warn("No records retrieved but pending offsets to consume therefore polling again. Attempt {}/{}", + concurrentEmptyResponses, maxConcurrentEmptyResponses); + } else { + success = true; + } + } + concurrentEmptyResponses = 0; + + if ((records == null) || records.isEmpty()) { + LOG.info("No records retrieved from Kafka therefore nothing to iterate over."); + } else { + LOG.info("Retrieved records from Kafka to iterate over."); + } + return records != null ? records.iterator() : ConsumerRecords.<K, V>empty().iterator(); + } + return recordIterator; + } + + /** + * @return the consumer + */ + protected Consumer<K, V> getConsumer() { + return consumer; + } + + /** + * @return the earliest offset of the topic partition + */ + protected long getEarliestOffset() { + Map<TopicPartition, Long> brokerOffsets = KafkaUtils + .getBrokerOffsets(kafkaConnectionProperties, OffsetRequest.EarliestTime(), topicPartition.topic()); + Long offset = brokerOffsets.get(topicPartition); + if (offset == null) { + LOG.debug("Unable to determine earliest offset for {} so returning 0", topicPartition); + return 0L; + } + LOG.debug("Earliest offset for {} is {}", topicPartition, offset); + return offset; + } + + @Override + public void close() throws IOException { + LOG.debug("Closing the record reader."); + if (consumer != null) { + consumer.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordsIterable.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordsIterable.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordsIterable.java new file mode 100644 index 0000000..3e946ef --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordsIterable.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.record; + +import org.apache.crunch.Pair; +import org.apache.crunch.kafka.KafkaUtils; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Properties; +import java.util.Set; + +class KafkaRecordsIterable<K, V> implements Iterable<ConsumerRecord<K, V>> { + + /** + * Logger + */ + private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordsIterable.class); + + /** + * The Kafka consumer responsible for retrieving messages. + */ + private final Consumer<K, V> consumer; + + /** + * The starting positions of the iterable for the topic. + */ + private final Map<TopicPartition, Pair<Long, Long>> offsets; + + /** + * Tracks if the iterable is empty. + */ + private final boolean isEmpty; + + /** + * The poll time between each request to Kafka + */ + private final long scanPollTime; + + private final int maxRetryAttempts; + + /** + * Creates the iterable that will pull values for a collection of topics using the provided {@code consumer} between + * the {@code startOffsets} and {@code stopOffsets}. + * + * @param consumer The consumer for pulling the data from Kafka. The consumer will be closed automatically once all + * of the records have been consumed. + * @param offsets offsets for pulling data + * @param properties properties for tweaking the behavior of the iterable. + * @throws IllegalArgumentException if any of the arguments are {@code null} or empty. + */ + KafkaRecordsIterable(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, Long>> offsets, Properties properties) { + if (consumer == null) { + throw new IllegalArgumentException("The 'consumer' cannot be 'null'."); + } + this.consumer = consumer; + + if (properties == null) { + throw new IllegalArgumentException("The 'properties' cannot be 'null'."); + } + + String retryString = properties + .getProperty(KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY, KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING); + maxRetryAttempts = Integer.parseInt(retryString); + + if (offsets == null || offsets.isEmpty()) { + throw new IllegalArgumentException("The 'offsets' cannot 'null' or empty."); + } + + //filter out any topics and partitions that do not have offset ranges that will produce data. + Map<TopicPartition, Pair<Long, Long>> filteredOffsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { + Pair<Long, Long> value = entry.getValue(); + //if start is less than one less than stop then there is data to be had + if (value.first() < value.second()) { + filteredOffsets.put(entry.getKey(), value); + } else { + LOG.debug("Removing offsets for {} because start is not less than the end offset.", entry.getKey()); + } + } + + //check to make sure that based on the offsets there is data to retrieve, otherwise false. + //there will be data if the start offsets are less than stop offsets + isEmpty = filteredOffsets.isEmpty(); + if (isEmpty) { + LOG.warn("Iterable for Kafka for is empty because offsets are empty."); + } + + //assign this + this.offsets = filteredOffsets; + + scanPollTime = Long.parseLong( + properties.getProperty(KafkaSource.CONSUMER_POLL_TIMEOUT_KEY, Long.toString(KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT))); + } + + @Override + public Iterator<ConsumerRecord<K, V>> iterator() { + if (isEmpty) { + LOG.debug("Returning empty iterator since offsets align."); + return Collections.emptyIterator(); + } + //Assign consumer to all of the partitions + LOG.debug("Assigning topics and partitions and seeking to start offsets."); + + consumer.assign(new LinkedList<>(offsets.keySet())); + //hack so maybe look at removing this + consumer.poll(0); + for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { + consumer.seek(entry.getKey(), entry.getValue().first()); + } + + return new RecordsIterator<K, V>(consumer, offsets, scanPollTime, maxRetryAttempts); + } + + private static class RecordsIterator<K, V> implements Iterator<ConsumerRecord<K, V>> { + + private final Consumer<K, V> consumer; + private final Map<TopicPartition, Pair<Long, Long>> offsets; + private final long pollTime; + private final int maxNumAttempts; + private ConsumerRecords<K, V> records; + private Iterator<ConsumerRecord<K, V>> currentIterator; + private final Set<TopicPartition> remainingPartitions; + + private ConsumerRecord<K, V> next; + + RecordsIterator(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, Long>> offsets, long pollTime, int maxNumRetries) { + this.consumer = consumer; + remainingPartitions = new HashSet<>(offsets.keySet()); + this.offsets = offsets; + this.pollTime = pollTime; + this.maxNumAttempts = maxNumRetries; + } + + @Override + public boolean hasNext() { + if (next != null) + return true; + + //if partitions to consume then pull next value + if (remainingPartitions.size() > 0) { + next = getNext(); + } + + return next != null; + } + + @Override + public ConsumerRecord<K, V> next() { + if (next == null) { + next = getNext(); + } + + if (next != null) { + ConsumerRecord<K, V> returnedNext = next; + //prime for next call + next = getNext(); + //return the current next + return returnedNext; + } else { + throw new NoSuchElementException("No more elements."); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove is not supported."); + } + + /** + * Gets the current iterator. + * + * @return the current iterator or {@code null} if there are no more values to consume. + */ + private Iterator<ConsumerRecord<K, V>> getIterator() { + if (!remainingPartitions.isEmpty()) { + if (currentIterator != null && currentIterator.hasNext()) { + return currentIterator; + } + LOG.debug("Retrieving next set of records."); + int numTries = 0; + boolean notSuccess = false; + while (!notSuccess && numTries < maxNumAttempts) { + try { + records = consumer.poll(pollTime); + notSuccess = true; + } catch (RetriableException re) { + numTries++; + if (numTries < maxNumAttempts) { + LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries, re); + } else { + LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumAttempts, re); + throw re; + } + } + } + if (records == null || records.isEmpty()) { + LOG.debug("Retrieved empty records."); + currentIterator = null; + return null; + } + currentIterator = records.iterator(); + return currentIterator; + } + + LOG.debug("No more partitions to consume therefore not retrieving any more records."); + return null; + } + + /** + * Internal method for retrieving the next value to retrieve. + * + * @return {@code null} if there are no more values to retrieve otherwise the next event. + */ + private ConsumerRecord<K, V> getNext() { + while (!remainingPartitions.isEmpty()) { + Iterator<ConsumerRecord<K, V>> iterator = getIterator(); + + while (iterator != null && iterator.hasNext()) { + ConsumerRecord<K, V> record = iterator.next(); + TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); + long offset = record.offset(); + + if (withinRange(topicPartition, offset)) { + LOG.debug("Retrieving value for {} with offset {}.", topicPartition, offset); + return record; + } + LOG.debug("Value for {} with offset {} is outside of range skipping.", topicPartition, offset); + } + } + + LOG.debug("Closing the consumer because there are no more remaining partitions."); + consumer.close(); + + LOG.debug("Consumed data from all partitions."); + return null; + + } + + /** + * Checks whether the value for {@code topicPartition} with an {@code offset} is within scan range. If + * the value is not then {@code false} is returned otherwise {@code true}. + * + * @param topicPartion The partition for the offset + * @param offset the offset in the partition + * @return {@code true} if the value is within the expected consumption range, otherwise {@code false}. + */ + private boolean withinRange(TopicPartition topicPartion, long offset) { + long endOffset = offsets.get(topicPartion).second(); + //end offsets are one higher than the last written value. + boolean emit = offset < endOffset; + if (offset >= endOffset - 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.", + new Object[] { topicPartion, offset, endOffset }); + } + remainingPartitions.remove(topicPartion); + consumer.pause(Arrays.asList(topicPartion)); + } + LOG.debug("Value for partition {} and offset {} is within range.", topicPartion, offset); + return emit; + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java new file mode 100644 index 0000000..d7fb763 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.record; + +import org.apache.crunch.Pair; +import org.apache.crunch.ReadableData; +import org.apache.crunch.Source; +import org.apache.crunch.impl.mr.run.CrunchMapper; +import org.apache.crunch.io.CrunchInputs; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * A Crunch Source that will retrieve events from Kafka given start and end offsets. The source is not designed to + * process unbounded data but instead to retrieve data between a specified range. + * <p> + * <p> + * The values retrieved from Kafka are returned as {@link ConsumerRecord} with key and value as raw bytes. If callers need specific + * parsing logic based on the topic then consumers are encouraged to use multiple Kafka Sources for each topic and use special + * {@link org.apache.crunch.DoFn} to parse the payload. + */ +public class KafkaSource + implements Source<ConsumerRecord<BytesWritable, BytesWritable>>, ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); + + private final FormatBundle inputBundle; + private final Properties props; + private final Map<TopicPartition, Pair<Long, Long>> offsets; + + /** + * Constant to indicate how long the reader waits before timing out when retrieving data from Kafka. + */ + public static final String CONSUMER_POLL_TIMEOUT_KEY = "org.apache.crunch.kafka.consumer.poll.timeout"; + + /** + * Default timeout value for {@link #CONSUMER_POLL_TIMEOUT_KEY} of 1 second. + */ + public static final long CONSUMER_POLL_TIMEOUT_DEFAULT = 1000L; + + /** + * Constructs a Kafka source that will read data from the Kafka cluster identified by the {@code kafkaConnectionProperties} + * and from the specific topics and partitions identified in the {@code offsets} + * + * @param kafkaConnectionProperties The connection properties for reading from Kafka. These properties will be honored + * with the exception of the {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and + * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + * @param offsets A map of {@link TopicPartition} to a pair of start and end offsets respectively. The start + * and end offsets are evaluated at [start, end) where the ending offset is excluded. Each + * TopicPartition must have a non-null pair describing its offsets. The start offset should be + * less than the end offset. If the values are equal or start is greater than the end then + * that partition will be skipped. + */ + public KafkaSource(Properties kafkaConnectionProperties, Map<TopicPartition, Pair<Long, Long>> offsets) { + this.props = copyAndSetProperties(kafkaConnectionProperties); + + inputBundle = createFormatBundle(props, offsets); + + this.offsets = Collections.unmodifiableMap(new HashMap<>(offsets)); + } + + @Override + public Source<ConsumerRecord<BytesWritable, BytesWritable>> inputConf(String key, String value) { + inputBundle.set(key, value); + return this; + } + + @Override + public PType<ConsumerRecord<BytesWritable, BytesWritable>> getType() { + return ConsumerRecordHelper.CONSUMER_RECORD_P_TYPE; + } + + @Override + public Converter<?, ?, ?, ?> getConverter() { + return new KafkaSourceConverter(); + } + + @Override + public long getSize(Configuration configuration) { + // TODO something smarter here. + return 1000L * 1000L * 1000L; + } + + @Override + public String toString() { + return "KafkaSource(" + props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + ")"; + } + + @Override + public long getLastModifiedAt(Configuration configuration) { + LOG.warn("Cannot determine last modified time for source: {}", toString()); + return -1; + } + + private static <K, V> FormatBundle createFormatBundle(Properties kafkaConnectionProperties, + Map<TopicPartition, Pair<Long, Long>> offsets) { + + FormatBundle<KafkaInputFormat> bundle = FormatBundle.forInput(KafkaInputFormat.class); + + KafkaInputFormat.writeOffsetsToBundle(offsets, bundle); + KafkaInputFormat.writeConnectionPropertiesToBundle(kafkaConnectionProperties, bundle); + + return bundle; + } + + private static Properties copyAndSetProperties(Properties kafkaConnectionProperties) { + Properties props = new Properties(); + + //set the default to be earliest for auto reset but allow it to be overridden if appropriate. + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + props.putAll(kafkaConnectionProperties); + + //Setting the key/value deserializer to ensure proper translation from Kafka to PType format. + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); + + return props; + } + + @Override + public Iterable<ConsumerRecord<BytesWritable, BytesWritable>> read(Configuration conf) throws IOException { + // consumer will get closed when the iterable is fully consumed. + // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity + // of parallelism when reading. + Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(props); + return new KafkaRecordsIterable<>(consumer, offsets, props); + } + + @Override + @SuppressWarnings("unchecked") + public void configureSource(Job job, int inputId) throws IOException { + Configuration conf = job.getConfiguration(); + //an id of -1 indicates that this is the only input so just use it directly + if (inputId == -1) { + job.setMapperClass(CrunchMapper.class); + job.setInputFormatClass(inputBundle.getFormatClass()); + inputBundle.configure(conf); + } else { + //there are multiple inputs for this mapper so add it as a CrunchInputs and need a fake path just to + //make it play well with other file based inputs. + Path dummy = new Path("/kafka/" + inputId); + CrunchInputs.addInputPath(job, dummy, inputBundle, inputId); + } + } + + //exposed for testing purposes + FormatBundle getInputBundle() { + return inputBundle; + } + + @Override + public ReadableData<ConsumerRecord<BytesWritable, BytesWritable>> asReadable() { + // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity + // of parallelism when reading. + return new KafkaData<>(props, offsets); + } + + /** + * Basic {@link Deserializer} which simply wraps the payload as a {@link BytesWritable}. + */ + public static class BytesDeserializer implements Deserializer<BytesWritable> { + + @Override + public void configure(Map<String, ?> configProperties, boolean isKey) { + //no-op + } + + @Override + public BytesWritable deserialize(String topic, byte[] valueBytes) { + return new BytesWritable(valueBytes); + } + + @Override + public void close() { + //no-op + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSourceConverter.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSourceConverter.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSourceConverter.java new file mode 100644 index 0000000..b3d56c0 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSourceConverter.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.record; + +import org.apache.crunch.types.Converter; +import org.apache.hadoop.io.BytesWritable; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * {@link Converter} for {@link KafkaSource} + */ +public class KafkaSourceConverter implements + Converter<ConsumerRecord<BytesWritable, BytesWritable>, Void, ConsumerRecord<BytesWritable, BytesWritable>, Iterable<ConsumerRecord<BytesWritable, BytesWritable>>> { + + private static final long serialVersionUID = 5270341393169043945L; + + @Override + public ConsumerRecord<BytesWritable, BytesWritable> convertInput(ConsumerRecord<BytesWritable, BytesWritable> record, + Void aVoid) { + return record; + } + + @Override + public Iterable<ConsumerRecord<BytesWritable, BytesWritable>> convertIterableInput( + ConsumerRecord<BytesWritable, BytesWritable> bytesWritableBytesWritableConsumerRecord, Iterable<Void> iterable) { + throw new UnsupportedOperationException("Should not be possible"); + } + + @Override + public ConsumerRecord<BytesWritable, BytesWritable> outputKey(ConsumerRecord<BytesWritable, BytesWritable> record) { + return record; + } + + @Override + public Void outputValue(ConsumerRecord<BytesWritable, BytesWritable> record) { + // No value, we just use the record as the key. + return null; + } + + @Override + @SuppressWarnings("unchecked") + public Class<ConsumerRecord<BytesWritable, BytesWritable>> getKeyClass() { + return (Class<ConsumerRecord<BytesWritable, BytesWritable>>) (Object) ConsumerRecord.class; + } + + @Override + public Class<Void> getValueClass() { + return Void.class; + } + + @Override + public boolean applyPTypeTransforms() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java index 38ded40..0694551 100644 --- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java @@ -51,6 +51,9 @@ import java.util.Properties; KafkaSourceIT.class, KafkaRecordsIterableIT.class, KafkaDataIT.class, //org.apache.crunch.kafka.inputformat KafkaRecordReaderIT.class, KafkaInputFormatIT.class, KafkaUtilsIT.class, + // org.apache.crunch.kafka.record + org.apache.crunch.kafka.record.KafkaSourceIT.class, org.apache.crunch.kafka.record.KafkaRecordsIterableIT.class, + org.apache.crunch.kafka.record.KafkaDataIT.class }) public class ClusterTest { http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java index 36f293f..fd940f3 100644 --- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java @@ -17,7 +17,6 @@ */ package org.apache.crunch.kafka; - import kafka.api.OffsetRequest; import org.apache.crunch.Pair; import org.apache.kafka.clients.consumer.Consumer; http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/ConsumerRecordHelperTest.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/ConsumerRecordHelperTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/ConsumerRecordHelperTest.java new file mode 100644 index 0000000..e07d3bc --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/ConsumerRecordHelperTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.record; + +import org.apache.crunch.PCollection; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.io.BytesWritable; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.junit.Test; + +import java.io.IOException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class ConsumerRecordHelperTest { + + @Test (expected = IllegalArgumentException.class) + public void serialize_nullRecord() throws IOException { + ConsumerRecordHelper.serialize(null); + } + + @Test (expected = IllegalArgumentException.class) + public void deserialize_nullRecord() throws IOException { + ConsumerRecordHelper.deserialize(null); + } + + @Test + public void serializeDeserialize() throws IOException { + ConsumerRecord<BytesWritable, BytesWritable> record = new ConsumerRecord<>("topic", + 1, 2, 3L, TimestampType.CREATE_TIME, 4L, 5, + 6, new BytesWritable("key".getBytes()), new BytesWritable("value".getBytes())); + + ConsumerRecord<BytesWritable, BytesWritable> newRecord = ConsumerRecordHelper.deserialize( + ConsumerRecordHelper.serialize(record)); + + assertRecordsAreEqual(record, newRecord); + } + + @Test + public void serializeDeserialize_nullKeyValue() throws IOException { + ConsumerRecord<BytesWritable, BytesWritable> record = new ConsumerRecord<>("topic", + 1, 2, 3L, TimestampType.CREATE_TIME, 4L, 5, + 6, null, null); + + ConsumerRecord<BytesWritable, BytesWritable> newRecord = ConsumerRecordHelper.deserialize( + ConsumerRecordHelper.serialize(record)); + + assertRecordsAreEqual(record, newRecord); + } + + @Test + public void mapFns() throws IOException { + ConsumerRecord<BytesWritable, BytesWritable> record = new ConsumerRecord<>("topic", + 1, 2, 3L, TimestampType.CREATE_TIME, 4L, 5, + 6, new BytesWritable("key".getBytes()), new BytesWritable("value".getBytes())); + + PCollection<BytesWritable> bytes = MemPipeline.collectionOf(new BytesWritable(ConsumerRecordHelper.serialize(record))); + PCollection<ConsumerRecord<BytesWritable, BytesWritable>> records = bytes.parallelDo( + new ConsumerRecordHelper.BytesToConsumerRecord(), ConsumerRecordHelper.CONSUMER_RECORD_P_TYPE); + PCollection<BytesWritable> newBytes = records.parallelDo( + new ConsumerRecordHelper.ConsumerRecordToBytes(), Writables.writables(BytesWritable.class)); + + ConsumerRecord<BytesWritable, BytesWritable> newRecord = ConsumerRecordHelper.deserialize( + newBytes.materialize().iterator().next().getBytes()); + + assertRecordsAreEqual(record, newRecord); + } + + private void assertRecordsAreEqual(ConsumerRecord<BytesWritable, BytesWritable> record1, + ConsumerRecord<BytesWritable, BytesWritable> record2) { + // ConsumerRecord doesn't implement equals so have to verify each field + assertThat(record1.topic(), is(record2.topic())); + assertThat(record1.partition(), is(record2.partition())); + assertThat(record1.offset(), is(record2.offset())); + assertThat(record1.timestamp(), is(record2.timestamp())); + assertThat(record1.timestampType(), is(record2.timestampType())); + assertThat(record1.checksum(), is(record2.checksum())); + assertThat(record1.serializedKeySize(), is(record2.serializedKeySize())); + assertThat(record1.serializedValueSize(), is(record2.serializedValueSize())); + assertThat(record1.key(), is(record2.key())); + assertThat(record1.value(), is(record2.value())); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/f4734781/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java new file mode 100644 index 0000000..475f20f --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.record; + +import kafka.api.OffsetRequest; +import org.apache.crunch.Pair; +import org.apache.crunch.kafka.*; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import static org.apache.crunch.kafka.ClusterTest.writeData; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class KafkaDataIT { + @Rule + public TestName testName = new TestName(); + + private String topic; + private Map<TopicPartition, Long> startOffsets; + private Map<TopicPartition, Long> stopOffsets; + private Map<TopicPartition, Pair<Long, Long>> offsets; + private Properties props; + + @BeforeClass + public static void init() throws Exception { + ClusterTest.startTest(); + } + + @AfterClass + public static void cleanup() throws Exception { + ClusterTest.endTest(); + } + + @Before + public void setup() { + topic = UUID.randomUUID().toString(); + + props = ClusterTest.getConsumerProperties(); + + startOffsets = new HashMap<>(); + stopOffsets = new HashMap<>(); + offsets = new HashMap<>(); + for (int i = 0; i < 4; i++) { + TopicPartition tp = new TopicPartition(topic, i); + startOffsets.put(tp, 0L); + stopOffsets.put(tp, 100L); + + offsets.put(tp, Pair.of(0L, 100L)); + } + } + + @Test + public void getDataIterable() throws IOException { + int loops = 10; + int numPerLoop = 100; + int total = loops * numPerLoop; + List<String> keys = writeData(props, topic, "batch", loops, numPerLoop); + + startOffsets = getStartOffsets(props, topic); + stopOffsets = getStopOffsets(props, topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey()))); + } + + Iterable<ConsumerRecord<String, String>> data = new KafkaData<String, String>(props, offsets).read(null); + + int count = 0; + for (ConsumerRecord<String, String> record : data) { + assertThat(keys, hasItem(record.key())); + assertTrue(keys.remove(record.key())); + count++; + } + + assertThat(count, is(total)); + assertThat(keys.size(), is(0)); + } + + private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) { + return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic); + } + + private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) { + return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic); + } +}
