CRUNCH-606: Kafka Source for Crunch which supports reading data as BytesWritable
* Some of the code contributed by Bryan Baugher and Andrew Olson Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/321cfef6 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/321cfef6 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/321cfef6 Branch: refs/heads/master Commit: 321cfef6e85325ab7a4d9548686a96972f6f31fd Parents: c09c4ee Author: Micah Whitacre <[email protected]> Authored: Mon Apr 11 09:47:33 2016 -0500 Committer: Micah Whitacre <[email protected]> Committed: Mon May 23 15:13:36 2016 -0500 ---------------------------------------------------------------------- crunch-kafka/pom.xml | 82 ++++ .../java/org/apache/crunch/kafka/KafkaData.java | 63 +++ .../crunch/kafka/KafkaRecordsIterable.java | 294 +++++++++++++ .../org/apache/crunch/kafka/KafkaSource.java | 225 ++++++++++ .../org/apache/crunch/kafka/KafkaUtils.java | 301 ++++++++++++++ .../kafka/inputformat/KafkaInputFormat.java | 235 +++++++++++ .../kafka/inputformat/KafkaInputSplit.java | 117 ++++++ .../kafka/inputformat/KafkaRecordReader.java | 152 +++++++ .../org/apache/crunch/kafka/ClusterTest.java | 217 ++++++++++ .../org/apache/crunch/kafka/KafkaDataIT.java | 118 ++++++ .../crunch/kafka/KafkaRecordsIterableIT.java | 415 +++++++++++++++++++ .../org/apache/crunch/kafka/KafkaSourceIT.java | 169 ++++++++ .../org/apache/crunch/kafka/KafkaUtilsIT.java | 188 +++++++++ .../kafka/inputformat/KafkaInputFormatIT.java | 407 ++++++++++++++++++ .../kafka/inputformat/KafkaInputSplitTest.java | 65 +++ .../kafka/inputformat/KafkaRecordReaderIT.java | 122 ++++++ .../crunch/kafka/utils/EmbeddedZookeeper.java | 102 +++++ .../kafka/utils/KafkaBrokerTestHarness.java | 369 +++++++++++++++++ .../crunch/kafka/utils/KafkaTestUtils.java | 94 +++++ .../crunch/kafka/utils/ZkStringSerializer.java | 43 ++ .../kafka/utils/ZookeeperTestHarness.java | 112 +++++ .../src/test/resources/log4j.properties | 29 ++ pom.xml | 13 + 23 files changed, 3932 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml new file mode 100644 index 0000000..a96a9b0 --- /dev/null +++ b/crunch-kafka/pom.xml @@ -0,0 +1,82 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-parent</artifactId> + <version>0.14.0-SNAPSHOT</version> + </parent> + + <artifactId>crunch-kafka</artifactId> + <name>Apache Crunch for Kafka</name> + + <dependencies> + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.crunch</groupId> + <artifactId>crunch-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java new file mode 100644 index 0000000..6543aad --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka; + +import org.apache.crunch.Pair; +import org.apache.crunch.ReadableData; +import org.apache.crunch.SourceTarget; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +class KafkaData<K, V> implements ReadableData<Pair<K, V>> { + + private static final long serialVersionUID = -6582212311361579556L; + + private final Map<TopicPartition, Pair<Long, Long>> offsets; + private final Properties props; + + public KafkaData(Properties connectionProperties, + Map<TopicPartition, Pair<Long, Long>> offsets) { + this.props = connectionProperties; + this.offsets = offsets; + } + + + @Override + public Set<SourceTarget<?>> getSourceTargets() { + return null; + } + + @Override + public void configure(Configuration conf) { + //no-op + } + + @Override + public Iterable<Pair<K, V>> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException { + Consumer<K, V> consumer = new KafkaConsumer<K, V>(props); + return new KafkaRecordsIterable<>(consumer, offsets, props); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java new file mode 100644 index 0000000..8fec7f8 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka; + +import org.apache.crunch.Pair; +import org.apache.crunch.kafka.inputformat.KafkaRecordReader; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Properties; +import java.util.Set; + + +class KafkaRecordsIterable<K, V> implements Iterable<Pair<K, V>> { + + /** + * Logger + */ + private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordsIterable.class); + + /** + * The Kafka consumer responsible for retrieving messages. + */ + private final Consumer<K, V> consumer; + + /** + * The starting positions of the iterable for the topic. + */ + private final Map<TopicPartition, Pair<Long, Long>> offsets; + + /** + * Tracks if the iterable is empty. + */ + private final boolean isEmpty; + + /** + * The poll time between each request to Kafka + */ + private final long scanPollTime; + + private final int maxRetryAttempts; + + /** + * Creates the iterable that will pull values for a collection of topics using the provided {@code consumer} between + * the {@code startOffsets} and {@code stopOffsets}. + * @param consumer The consumer for pulling the data from Kafka. The consumer will be closed automatically once all + * of the records have been consumed. + * @param offsets offsets for pulling data + * @param properties properties for tweaking the behavior of the iterable. + * @throws IllegalArgumentException if any of the arguments are {@code null} or empty. + */ + public KafkaRecordsIterable(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, Long>> offsets, + Properties properties) { + if (consumer == null) { + throw new IllegalArgumentException("The 'consumer' cannot be 'null'."); + } + this.consumer = consumer; + + if (properties == null) { + throw new IllegalArgumentException("The 'properties' cannot be 'null'."); + } + + String retryString = properties.getProperty(KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY, + KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING); + maxRetryAttempts = Integer.parseInt(retryString); + + if (offsets == null || offsets.isEmpty()) { + throw new IllegalArgumentException("The 'offsets' cannot 'null' or empty."); + } + + //filter out any topics and partitions that do not have offset ranges that will produce data. + Map<TopicPartition, Pair<Long, Long>> filteredOffsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { + Pair<Long, Long> value = entry.getValue(); + //if start is less than one less than stop then there is data to be had + if(value.first() < value.second()){ + filteredOffsets.put(entry.getKey(), value); + }else{ + LOG.debug("Removing offsets for {} because start is not less than the end offset.", entry.getKey()); + } + } + + //check to make sure that based on the offsets there is data to retrieve, otherwise false. + //there will be data if the start offsets are less than stop offsets + isEmpty = filteredOffsets.isEmpty(); + if (isEmpty) { + LOG.warn("Iterable for Kafka for is empty because offsets are empty."); + } + + //assign this + this.offsets = filteredOffsets; + + scanPollTime = Long.parseLong(properties.getProperty(KafkaSource.CONSUMER_POLL_TIMEOUT_KEY, + Long.toString(KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT))); + } + + @Override + public Iterator<Pair<K, V>> iterator() { + if (isEmpty) { + LOG.debug("Returning empty iterator since offsets align."); + return Collections.emptyIterator(); + } + //Assign consumer to all of the partitions + LOG.debug("Assigning topics and partitions and seeking to start offsets."); + + consumer.assign(new LinkedList<>(offsets.keySet())); + //hack so maybe look at removing this + consumer.poll(0); + for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { + consumer.seek(entry.getKey(), entry.getValue().first()); + } + + return new RecordsIterator<K, V>(consumer, offsets, scanPollTime, maxRetryAttempts); + } + + private static class RecordsIterator<K, V> implements Iterator<Pair<K, V>> { + + private final Consumer<K, V> consumer; + private final Map<TopicPartition, Pair<Long, Long>> offsets; + private final long pollTime; + private final int maxNumAttempts; + private ConsumerRecords<K, V> records; + private Iterator<ConsumerRecord<K, V>> currentIterator; + private final Set<TopicPartition> remainingPartitions; + + private Pair<K, V> next; + + public RecordsIterator(Consumer<K, V> consumer, + Map<TopicPartition, Pair<Long, Long>> offsets, long pollTime, int maxNumRetries) { + this.consumer = consumer; + remainingPartitions = new HashSet<>(offsets.keySet()); + this.offsets = offsets; + this.pollTime = pollTime; + this.maxNumAttempts = maxNumRetries; + } + + @Override + public boolean hasNext() { + if (next != null) + return true; + + //if partitions to consume then pull next value + if (remainingPartitions.size() > 0) { + next = getNext(); + } + + return next != null; + } + + @Override + public Pair<K, V> next() { + if (next == null) { + next = getNext(); + } + + if (next != null) { + Pair<K, V> returnedNext = next; + //prime for next call + next = getNext(); + //return the current next + return returnedNext; + } else { + throw new NoSuchElementException("No more elements."); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove is not supported."); + } + + /** + * Gets the current iterator. + * + * @return the current iterator or {@code null} if there are no more values to consume. + */ + private Iterator<ConsumerRecord<K, V>> getIterator() { + if (!remainingPartitions.isEmpty()) { + if (currentIterator != null && currentIterator.hasNext()) { + return currentIterator; + } + LOG.debug("Retrieving next set of records."); + int numTries = 0; + boolean notSuccess = false; + while(!notSuccess && numTries < maxNumAttempts) { + try { + records = consumer.poll(pollTime); + notSuccess = true; + }catch(RetriableException re){ + numTries++; + if(numTries < maxNumAttempts) { + LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries, re); + }else{ + LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumAttempts, re); + throw re; + } + } + } + if (records == null || records.isEmpty()) { + LOG.debug("Retrieved empty records."); + currentIterator = null; + return null; + } + currentIterator = records.iterator(); + return currentIterator; + } + + LOG.debug("No more partitions to consume therefore not retrieving any more records."); + return null; + } + + /** + * Internal method for retrieving the next value to retrieve. + * + * @return {@code null} if there are no more values to retrieve otherwise the next event. + */ + private Pair<K, V> getNext() { + while (!remainingPartitions.isEmpty()) { + Iterator<ConsumerRecord<K, V>> iterator = getIterator(); + + while (iterator != null && iterator.hasNext()) { + ConsumerRecord<K, V> record = iterator.next(); + TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); + long offset = record.offset(); + + if (withinRange(topicPartition, offset)) { + LOG.debug("Retrieving value for {} with offset {}.", topicPartition, offset); + return Pair.of(record.key(), record.value()); + } + LOG.debug("Value for {} with offset {} is outside of range skipping.", topicPartition, offset); + } + } + + LOG.debug("Closing the consumer because there are no more remaining partitions."); + consumer.close(); + + LOG.debug("Consumed data from all partitions."); + return null; + + } + + /** + * Checks whether the value for {@code topicPartition} with an {@code offset} is within scan range. If + * the value is not then {@code false} is returned otherwise {@code true}. + * + * @param topicPartion The partition for the offset + * @param offset the offset in the partition + * @return {@code true} if the value is within the expected consumption range, otherwise {@code false}. + */ + private boolean withinRange(TopicPartition topicPartion, long offset) { + long endOffset = offsets.get(topicPartion).second(); + //end offsets are one higher than the last written value. + boolean emit = offset < endOffset; + if (offset >= endOffset - 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.", + new Object[]{topicPartion, offset, endOffset}); + } + remainingPartitions.remove(topicPartion); + consumer.pause(topicPartion); + } + LOG.debug("Value for partition {} and offset {} is within range.", topicPartion, offset); + return emit; + } + } +} + http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java new file mode 100644 index 0000000..485604d --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka; + + +import org.apache.crunch.DoFn; +import org.apache.crunch.Pair; +import org.apache.crunch.ReadableData; +import org.apache.crunch.Source; +import org.apache.crunch.TableSource; +import org.apache.crunch.impl.mr.run.CrunchMapper; +import org.apache.crunch.io.CrunchInputs; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.io.ReadableSource; +import org.apache.crunch.kafka.inputformat.KafkaInputFormat; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * A Crunch Source that will retrieve events from Kafka given start and end offsets. The source is not designed to + * process unbounded data but instead to retrieve data between a specified range. + * <p> + * + * The values retrieved from Kafka are returned as raw bytes inside of a {@link BytesWritable}. If callers + * need specific parsing logic based on the topic then consumers are encouraged to use multiple Kafka Sources + * for each topic and use special {@link DoFn} to parse the payload. + */ +public class KafkaSource + implements TableSource<BytesWritable, BytesWritable>, ReadableSource<Pair<BytesWritable, BytesWritable>> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); + + private final FormatBundle inputBundle; + private final Properties props; + private final Map<TopicPartition, Pair<Long, Long>> offsets; + + /** + * The consistent PType describing all of the data being retrieved from Kafka as a BytesWritable. + */ + private static PTableType<BytesWritable, BytesWritable> KAFKA_SOURCE_TYPE = + Writables.tableOf(Writables.writables(BytesWritable.class), Writables.writables(BytesWritable.class)); + + /** + * Constant to indicate how long the reader waits before timing out when retrieving data from Kafka. + */ + public static final String CONSUMER_POLL_TIMEOUT_KEY = "org.apache.crunch.kafka.consumer.poll.timeout"; + + /** + * Default timeout value for {@link #CONSUMER_POLL_TIMEOUT_KEY} of 1 second. + */ + public static final long CONSUMER_POLL_TIMEOUT_DEFAULT = 1000L; + + + /** + * Constructs a Kafka source that will read data from the Kafka cluster identified by the {@code kafkaConnectionProperties} + * and from the specific topics and partitions identified in the {@code offsets} + * @param kafkaConnectionProperties The connection properties for reading from Kafka. These properties will be honored + * with the exception of the {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and + * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + * @param offsets A map of {@link TopicPartition} to a pair of start and end offsets respectively. The start and end offsets + * are evaluated at [start, end) where the ending offset is excluded. Each TopicPartition must have a + * non-null pair describing its offsets. The start offset should be less than the end offset. If the values + * are equal or start is greater than the end then that partition will be skipped. + */ + public KafkaSource(Properties kafkaConnectionProperties, Map<TopicPartition, Pair<Long, Long>> offsets) { + this.props = copyAndSetProperties(kafkaConnectionProperties); + + inputBundle = createFormatBundle(props, offsets); + + this.offsets = Collections.unmodifiableMap(new HashMap<>(offsets)); + } + + @Override + public Source<Pair<BytesWritable, BytesWritable>> inputConf(String key, String value) { + inputBundle.set(key, value); + return this; + } + + @Override + public PType<Pair<BytesWritable, BytesWritable>> getType() { + return KAFKA_SOURCE_TYPE; + } + + @Override + public Converter<?, ?, ?, ?> getConverter() { + return KAFKA_SOURCE_TYPE.getConverter(); + } + + @Override + public PTableType<BytesWritable, BytesWritable> getTableType() { + return KAFKA_SOURCE_TYPE; + } + + @Override + public long getSize(Configuration configuration) { + // TODO something smarter here. + return 1000L * 1000L * 1000L; + } + + @Override + public String toString() { + return "KafkaSource("+props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)+")"; + } + + @Override + public long getLastModifiedAt(Configuration configuration) { + LOG.warn("Cannot determine last modified time for source: {}", toString()); + return -1; + } + + private static <K, V> FormatBundle createFormatBundle(Properties kafkaConnectionProperties, + Map<TopicPartition, Pair<Long, Long>> offsets) { + + FormatBundle<KafkaInputFormat> bundle = FormatBundle.forInput(KafkaInputFormat.class); + + KafkaInputFormat.writeOffsetsToBundle(offsets, bundle); + + for (String name : kafkaConnectionProperties.stringPropertyNames()) { + bundle.set(name, kafkaConnectionProperties.getProperty(name)); + } + + return bundle; + } + + private static <K, V> Properties copyAndSetProperties(Properties kakfaConnectionProperties) { + Properties props = new Properties(); + props.putAll(kakfaConnectionProperties); + + //Setting the key/value deserializer to ensure proper translation from Kafka to PType format. + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); + + return props; + } + + + @Override + public Iterable<Pair<BytesWritable, BytesWritable>> read(Configuration conf) throws IOException { + // consumer will get closed when the iterable is fully consumed. + // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity + // of parallelism when reading. + Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(props); + return new KafkaRecordsIterable<>(consumer, offsets, props); + } + + + @Override + public void configureSource(Job job, int inputId) throws IOException { + Configuration conf = job.getConfiguration(); + //an id of -1 indicates that this is the only input so just use it directly + if (inputId == -1) { + job.setMapperClass(CrunchMapper.class); + job.setInputFormatClass(inputBundle.getFormatClass()); + inputBundle.configure(conf); + } else { + //there are multiple inputs for this mapper so add it as a CrunchInputs and need a fake path just to + //make it play well with other file based inputs. + Path dummy = new Path("/kafka/" + inputId); + CrunchInputs.addInputPath(job, dummy, inputBundle, inputId); + } + } + + @Override + public ReadableData<Pair<BytesWritable, BytesWritable>> asReadable() { + // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity + // of parallelism when reading. + return new KafkaData<>(props, offsets); + } + + + /** + * Basic {@link Deserializer} which simply wraps the payload as a {@link BytesWritable}. + */ + public static class BytesDeserializer implements Deserializer<BytesWritable> { + + @Override + public void configure(Map<String, ?> configProperties, boolean isKey) { + //no-op + } + + @Override + public BytesWritable deserialize(String topic, byte[] valueBytes) { + return new BytesWritable(valueBytes); + } + + @Override + public void close() { + //no-op + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java new file mode 100644 index 0000000..aeea6fb --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka; + +import kafka.api.PartitionOffsetRequestInfo; +import kafka.cluster.Broker; +import kafka.cluster.BrokerEndPoint; +import kafka.cluster.EndPoint; +import kafka.common.TopicAndPartition; +import kafka.javaapi.OffsetRequest; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.TopicMetadata; +import kafka.javaapi.TopicMetadataRequest; +import kafka.javaapi.TopicMetadataResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +/** + * Simple utilities for retrieving offset and Kafka information to assist in setting up and configuring a + * {@link KafkaSource} instance. + */ +public class KafkaUtils { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); + + private static final String CLIENT_ID = "crunch-kafka-client"; + + private static final Random RANDOM = new Random(); + + /** + * Configuration property for the number of retry attempts that will be made to Kafka. + */ + public static final String KAFKA_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.attempts"; + + /** + * Default number of retry attempts. + */ + public static final int KAFKA_RETRY_ATTEMPTS_DEFAULT = 5; + public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT); + + /** + * Converts the provided {@code config} into a {@link Properties} object to connect with Kafka. + * @param config the config to read properties + * @return a properties instance populated with all of the values inside the provided {@code config}. + */ + public static Properties getKafkaConnectionProperties(Configuration config) { + Properties props = new Properties(); + for (Map.Entry<String, String> value : config) { + props.setProperty(value.getKey(), value.getValue()); + } + + return props; + } + + /** + * Adds the {@code properties} to the provided {@code config} instance. + * @param properties the properties to add to the config. + * @param config the configuration instance to be modified. + * @return the config instance with the populated properties + */ + public static Configuration addKafkaConnectionProperties(Properties properties, Configuration config) { + for (String name : properties.stringPropertyNames()) { + config.set(name, properties.getProperty(name)); + } + return config; + } + + /** + * Returns a {@link TopicMetadataRequest} from the given topics + * + * @param topics an array of topics you want metadata for + * @return a {@link TopicMetadataRequest} from the given topics + * @throws IllegalArgumentException if topics is {@code null} or empty, or if any of the topics is null, empty or blank + */ + private static TopicMetadataRequest getTopicMetadataRequest(String... topics) { + if (topics == null) + throw new IllegalArgumentException("topics cannot be null"); + if (topics.length == 0) + throw new IllegalArgumentException("topics cannot be empty"); + + for (String topic : topics) + if (StringUtils.isBlank(topic)) + throw new IllegalArgumentException("No topic can be null, empty or blank"); + + return new TopicMetadataRequest(Arrays.asList(topics)); + } + + /** + * <p> + * Retrieves the offset values for an array of topics at the specified time. + * </p> + * <p> + * If the Kafka cluster does not have the logs for the partition at the specified time or if the topic did not exist + * at that time this will instead return the earliest offset for that partition. + * </p> + * + * @param properties the properties containing the configuration for kafka + * @param time the time at which we want to know what the offset values were + * @param topics the topics we want to know the offset values of + * @return the offset values for an array of topics at the specified time + * @throws IllegalArgumentException if properties is {@code null} or if topics is {@code null} or empty or if any of + * the topics are {@code null}, empty or blank, or if there is an error parsing the + * properties. + * @throws IllegalStateException if there is an error communicating with the Kafka cluster to retrieve information. + */ + public static Map<TopicPartition, Long> getBrokerOffsets(Properties properties, long time, String... topics) { + if (properties == null) + throw new IllegalArgumentException("properties cannot be null"); + + final List<Broker> brokers = getBrokers(properties); + Collections.shuffle(brokers, RANDOM); + + return getBrokerOffsets(brokers, time, topics); + } + + // Visible for testing + static Map<TopicPartition, Long> getBrokerOffsets(List<Broker> brokers, long time, String... topics) { + if (topics == null) + throw new IllegalArgumentException("topics cannot be null"); + if (topics.length == 0) + throw new IllegalArgumentException("topics cannot be empty"); + + for (String topic : topics) + if (StringUtils.isBlank(topic)) + throw new IllegalArgumentException("No topic can be null, empty or blank"); + + TopicMetadataResponse topicMetadataResponse = null; + + final TopicMetadataRequest topicMetadataRequest = getTopicMetadataRequest(topics); + + for (final Broker broker : brokers) { + final SimpleConsumer consumer = getSimpleConsumer(broker); + try { + topicMetadataResponse = consumer.send(topicMetadataRequest); + break; + } catch (Exception err) { + EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get(); + LOG.warn(String.format("Fetching topic metadata for topic(s) '%s' from broker '%s' failed", + Arrays.toString(topics), endpoint.host()), err); + } finally { + consumer.close(); + } + } + + if (topicMetadataResponse == null) { + throw new IllegalStateException(String.format("Fetching topic metadata for topic(s) '%s' from broker(s) '%s' failed", + Arrays.toString(topics), Arrays.toString(brokers.toArray()))); + } + + // From the topic metadata, build a PartitionOffsetRequestInfo for each partition of each topic. It should be noted that + // only the leader Broker has the partition offset information[1] so save the leader Broker so we + // can send the request to it. + // [1] - https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-OffsetAPI + Map<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequests = + new HashMap<>(); + + for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) { + for (PartitionMetadata partition : metadata.partitionsMetadata()) { + Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = + new HashMap<>(); + + BrokerEndPoint brokerEndPoint = partition.leader(); + Broker leader = new Broker(0, brokerEndPoint.host(), brokerEndPoint.port(), SecurityProtocol.PLAINTEXT); + + if (brokerRequests.containsKey(leader)) + requestInfo = brokerRequests.get(leader); + + requestInfo.put(new TopicAndPartition(metadata.topic(), partition.partitionId()), new PartitionOffsetRequestInfo( + time, 1)); + + brokerRequests.put(leader, requestInfo); + } + } + + Map<TopicPartition, Long> topicPartitionToOffset = new HashMap<>(); + + // Send the offset request to the leader broker + for (Map.Entry<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequest : brokerRequests.entrySet()) { + SimpleConsumer simpleConsumer = getSimpleConsumer(brokerRequest.getKey()); + + OffsetResponse offsetResponse = null; + try { + OffsetRequest offsetRequest = new OffsetRequest(brokerRequest.getValue(), kafka.api.OffsetRequest.CurrentVersion(), + CLIENT_ID); + offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest); + } finally { + simpleConsumer.close(); + } + + Map<TopicPartition, Long> earliestOffsets = null; + + // Retrieve/parse the results + for (Map.Entry<TopicAndPartition, PartitionOffsetRequestInfo> entry : brokerRequest.getValue().entrySet()) { + TopicAndPartition topicAndPartition = entry.getKey(); + TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition()); + long[] offsets = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition()); + long offset; + + // The Kafka API will return no value if a time is given which there is no log that contains messages from that time + // (i.e. before a topic existed or in a log that was rolled/cleaned) + if (offsets.length > 0) { + offset = offsets[0]; + } else { + LOG.info("Kafka did not have an offset for topic/partition [{}]. Returning earliest known offset instead", + topicAndPartition); + + // This shouldn't happen but if kafka's API did not provide us with a value and we are asking for the earliest + // time we can't be sure what to do so quit + if (time == kafka.api.OffsetRequest.EarliestTime()) + throw new IllegalStateException("We requested the earliest offsets for topic [" + topicAndPartition.topic() + + "] but Kafka returned no values"); + + // Load the earliest offsets for the topic if it hasn't been loaded already + if (earliestOffsets == null) + earliestOffsets = getBrokerOffsets(Arrays.asList(brokerRequest.getKey()), + kafka.api.OffsetRequest.EarliestTime(), topicAndPartition.topic()); + + offset = earliestOffsets.get(topicPartition); + } + + topicPartitionToOffset.put(topicPartition, offset); + } + } + + return topicPartitionToOffset; + } + + /** + * Returns a {@link SimpleConsumer} connected to the given {@link Broker} + */ + private static SimpleConsumer getSimpleConsumer(final Broker broker) { + // BrokerHost, BrokerPort, timeout, buffer size, client id + EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get(); + return new SimpleConsumer(endpoint.host(), endpoint.port(), 100000, 64 * 1024, CLIENT_ID); + } + + /** + * Returns a {@link Broker} list from the given {@link Properties} + * + * @param properties the {@link Properties} with configuration to connect to a Kafka broker + */ + private static List<Broker> getBrokers(final Properties properties) { + if (properties == null) + throw new IllegalArgumentException("props cannot be null"); + + String commaDelimitedBrokerList = properties.getProperty("metadata.broker.list"); + if (commaDelimitedBrokerList == null) + throw new IllegalArgumentException("Unable to find 'metadata.broker.list' in given properties"); + + // Split broker list into host/port pairs + String[] brokerPortList = commaDelimitedBrokerList.split(","); + if (brokerPortList.length < 1) + throw new IllegalArgumentException("Unable to parse broker list : [" + Arrays.toString(brokerPortList) + "]"); + + final List<Broker> brokers = new ArrayList<Broker>(brokerPortList.length); + for (final String brokerHostPortString : brokerPortList) { + // Split host/port + String[] brokerHostPort = brokerHostPortString.split(":"); + if (brokerHostPort.length != 2) + throw new IllegalArgumentException("Unable to parse host/port from broker string : [" + + Arrays.toString(brokerHostPort) + "] from broker list : [" + Arrays.toString(brokerPortList) + "]"); + try { + brokers.add(new Broker(0, brokerHostPort[0], Integer.parseInt(brokerHostPort[1]), SecurityProtocol.PLAINTEXT)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Error parsing broker port : " + brokerHostPort[1], e); + } + } + return brokers; + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java new file mode 100644 index 0000000..eba4a97 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.inputformat; + +import org.apache.commons.lang.StringUtils; +import org.apache.crunch.Pair; +import org.apache.crunch.io.FormatBundle; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Basic input format for reading data from Kafka. Data is read and maintained in its pure byte form and wrapped + * inside of a {@link BytesWritable} instance. + * + * Populating the configuration of the input format is handled with the convenience method of + * {@link #writeOffsetsToConfiguration(Map, Configuration)}. This should be done to ensure + * the Kafka offset information is available when the input format {@link #getSplits(JobContext) creates its splits} + * and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}. + */ +public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> implements Configurable { + + /** + * Constant for constructing configuration keys for the input format. + */ + private static final String KAFKA_INPUT_OFFSETS_BASE = "org.apache.crunch.kafka.offsets.topic"; + + /** + * Constant used for building configuration keys and specifying partitions. + */ + private static final String PARTITIONS = "partitions"; + + /** + * Constant used for building configuration keys and specifying the start of a partition. + */ + private static final String START = "start"; + + /** + * Constant used for building configuration keys and specifying the end of a partition. + */ + private static final String END = "end"; + + /** + * Regex to discover all of the defined partitions which should be consumed by the input format. + */ + private static final String TOPIC_KEY_REGEX = KAFKA_INPUT_OFFSETS_BASE + "\\..*\\." + PARTITIONS + "$"; + + private Configuration configuration; + + @Override + public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException { + Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf()); + List<InputSplit> splits = new LinkedList<>(); + for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + + long start = entry.getValue().first(); + long end = entry.getValue().second(); + if(start != end) { + splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), entry.getValue().first(), + entry.getValue().second())); + } + } + + return splits; + } + + @Override + public RecordReader<BytesWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + return new KafkaRecordReader<>(); + } + + @Override + public void setConf(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public Configuration getConf() { + return configuration; + } + + + //The following methods are used for reading and writing Kafka Partition offset information into Hadoop's Configuration + //objects and into Crunch's FormatBundle. For a specific Kafka Topic it might have one or many partitions and for + //each partition it will need a start and end offset. Assuming you have a topic of "abc" and it has 2 partitions the + //configuration would be populated with the following: + // org.apache.crunch.kafka.offsets.topic.abc.partitions = [0,1] + // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start = <partition start> + // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end = <partition end> + // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start = <partition start> + // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end = <partition end> + + /** + * Writes the start and end offsets for the provided topic partitions to the {@code bundle}. + * @param offsets The starting and ending offsets for the topics and partitions. + * @param bundle the bundle into which the information should be persisted. + */ + public static void writeOffsetsToBundle(Map<TopicPartition, Pair<Long, Long>> offsets, FormatBundle bundle) { + for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) { + bundle.set(entry.getKey(), entry.getValue()); + } + } + + /** + * Writes the start and end offsets for the provided topic partitions to the {@code config}. + * @param offsets The starting and ending offsets for the topics and partitions. + * @param config the config into which the information should be persisted. + */ + public static void writeOffsetsToConfiguration(Map<TopicPartition, Pair<Long, Long>> offsets, Configuration config) { + for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) { + config.set(entry.getKey(), entry.getValue()); + } + } + + /** + * Reads the {@code configuration} to determine which topics, partitions, and offsets should be used for reading data. + * + * @param configuration the configuration to derive the data to read. + * @return a map of {@link TopicPartition} to a pair of start and end offsets. + * @throws IllegalStateException if the {@code configuration} does not have the start and end offsets set properly + * for a partition. + */ + public static Map<TopicPartition, Pair<Long, Long>> getOffsets(Configuration configuration) { + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + //find configuration for all of the topics with defined partitions + Map<String, String> topicPartitionKeys = configuration.getValByRegex(TOPIC_KEY_REGEX); + + //for each topic start to process it's partitions + for (String key : topicPartitionKeys.keySet()) { + String topic = getTopicFromKey(key); + int[] partitions = configuration.getInts(key); + //for each partition find and add the start/end offset + for (int partitionId : partitions) { + TopicPartition topicPartition = new TopicPartition(topic, partitionId); + long start = configuration.getLong(generatePartitionStartKey(topic, partitionId),Long.MIN_VALUE); + long end = configuration.getLong(generatePartitionEndKey(topic, partitionId), + Long.MIN_VALUE); + + if(start == Long.MIN_VALUE || end == Long.MIN_VALUE){ + throw new IllegalStateException("The "+topicPartition+ "has an invalid start:"+start+ " or end:"+end + +" offset configured."); + } + + offsets.put(topicPartition, Pair.of(start, end)); + } + } + + return offsets; + } + + private static Map<String, String> generateValues(Map<TopicPartition, Pair<Long, Long>> offsets) { + Map<String, String> offsetConfigValues = new HashMap<>(); + Map<String, Set<Integer>> topicsPartitions = new HashMap<>(); + + for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + String topic = topicPartition.topic(); + int partition = topicPartition.partition(); + String startKey = generatePartitionStartKey(topic, partition); + String endKey = generatePartitionEndKey(topic, partition); + //Add the start and end offsets for a specific partition + offsetConfigValues.put(startKey, Long.toString(entry.getValue().first())); + offsetConfigValues.put(endKey, Long.toString(entry.getValue().second())); + + Set<Integer> partitions = topicsPartitions.get(topic); + if (partitions == null) { + partitions = new HashSet<>(); + topicsPartitions.put(topic, partitions); + } + partitions.add(partition); + } + + //generate the partitions values for each topic + for (Map.Entry<String, Set<Integer>> entry : topicsPartitions.entrySet()) { + String key = KAFKA_INPUT_OFFSETS_BASE + "." + entry.getKey() + "." + PARTITIONS; + Set<Integer> partitions = entry.getValue(); + String partitionsString = StringUtils.join(partitions, ","); + offsetConfigValues.put(key, partitionsString); + } + + return offsetConfigValues; + } + + static String generatePartitionStartKey(String topic, int partition) { + return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + START; + } + + static String generatePartitionEndKey(String topic, int partition) { + return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + END; + } + + static String generateTopicPartitionsKey(String topic) { + return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS; + } + + static String getTopicFromKey(String key) { + //strip off the base key + a trailing "." + String value = key.substring(KAFKA_INPUT_OFFSETS_BASE.length() + 1); + //strip off the end part + a preceding "." + value = value.substring(0, (value.length() - (PARTITIONS.length() + 1))); + + return value; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java new file mode 100644 index 0000000..c8ebc6a --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.inputformat; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.kafka.common.TopicPartition; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * InputSplit that represent retrieving data from a single {@link TopicPartition} between the specified start + * and end offsets. + */ +public class KafkaInputSplit extends InputSplit implements Writable { + + private long startingOffset; + private long endingOffset; + private TopicPartition topicPartition; + + /** + * Nullary Constructor for creating the instance inside the Mapper instance. + */ + public KafkaInputSplit() { + + } + + /** + * Constructs an input split for the provided {@code topic} and {@code partition} restricting data to be between + * the {@code startingOffset} and {@code endingOffset} + * @param topic the topic for the split + * @param partition the partition for the topic + * @param startingOffset the start of the split + * @param endingOffset the end of the split + */ + public KafkaInputSplit(String topic, int partition, long startingOffset, long endingOffset) { + this.startingOffset = startingOffset; + this.endingOffset = endingOffset; + topicPartition = new TopicPartition(topic, partition); + } + + @Override + public long getLength() throws IOException, InterruptedException { + // This is just used as a hint for size of bytes so it is already inaccurate. + return startingOffset > 0 ? endingOffset - startingOffset : endingOffset; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + //Leave empty since data locality not really an issue. + return new String[0]; + } + + /** + * Returns the topic and partition for the split + * @return the topic and partition for the split + */ + public TopicPartition getTopicPartition() { + return topicPartition; + } + + /** + * Returns the starting offset for the split + * @return the starting offset for the split + */ + public long getStartingOffset() { + return startingOffset; + } + + /** + * Returns the ending offset for the split + * @return the ending offset for the split + */ + public long getEndingOffset() { + return endingOffset; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeUTF(topicPartition.topic()); + dataOutput.writeInt(topicPartition.partition()); + dataOutput.writeLong(startingOffset); + dataOutput.writeLong(endingOffset); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + String topic = dataInput.readUTF(); + int partition = dataInput.readInt(); + startingOffset = dataInput.readLong(); + endingOffset = dataInput.readLong(); + + topicPartition = new TopicPartition(topic, partition); + } + + @Override + public String toString() { + return getTopicPartition() + " Start: " + startingOffset + " End: " + endingOffset; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java new file mode 100644 index 0000000..1420519 --- /dev/null +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.inputformat; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; + +import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT; +import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY; +import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties; +import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT; +import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY; + +/** + * A {@link RecordReader} for pulling data from Kafka. + * @param <K> the key of the records from Kafka + * @param <V> the value of the records from Kafka + */ +public class KafkaRecordReader<K, V> extends RecordReader<K, V> { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class); + + private Consumer<K, V> consumer; + private ConsumerRecord<K, V> record; + private long endingOffset; + private Iterator<ConsumerRecord<K, V>> recordIterator; + private long consumerPollTimeout; + private long maxNumberOfRecords; + private long startingOffset; + private int maxNumberAttempts; + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + consumer = new KafkaConsumer<>(getKafkaConnectionProperties(taskAttemptContext.getConfiguration())); + KafkaInputSplit split = (KafkaInputSplit) inputSplit; + TopicPartition topicPartition = split.getTopicPartition(); + consumer.assign(Collections.singletonList(topicPartition)); + //suggested hack to gather info without gathering data + consumer.poll(0); + //now seek to the desired start location + startingOffset = split.getStartingOffset(); + consumer.seek(topicPartition,startingOffset); + + endingOffset = split.getEndingOffset(); + + maxNumberOfRecords = endingOffset - split.getStartingOffset(); + if(LOG.isInfoEnabled()) { + LOG.info("Reading data from {} between {} and {}", new Object[]{topicPartition, startingOffset, endingOffset}); + } + + Configuration config = taskAttemptContext.getConfiguration(); + consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT); + maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + recordIterator = getRecords(); + record = recordIterator.hasNext() ? recordIterator.next() : null; + if(LOG.isDebugEnabled()){ + if(record != null) { + LOG.debug("nextKeyValue: Retrieved record with offset {}", record.offset()); + }else{ + LOG.debug("nextKeyValue: Retrieved null record"); + } + } + return record != null && record.offset() < endingOffset; + } + + @Override + public K getCurrentKey() throws IOException, InterruptedException { + return record == null ? null : record.key(); + } + + @Override + public V getCurrentValue() throws IOException, InterruptedException { + return record == null ? null : record.value(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + //not most accurate but gives reasonable estimate + return record == null ? 0.0f : ((float) (record.offset()- startingOffset)) / maxNumberOfRecords; + } + + private Iterator<ConsumerRecord<K, V>> getRecords() { + if (recordIterator == null || !recordIterator.hasNext()) { + ConsumerRecords<K, V> records = null; + int numTries = 0; + boolean notSuccess = false; + while(!notSuccess && numTries < maxNumberAttempts) { + try { + records = consumer.poll(consumerPollTimeout); + notSuccess = true; + } catch (RetriableException re) { + numTries++; + if (numTries < maxNumberAttempts) { + LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries, re); + } else { + LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumberAttempts, re); + throw re; + } + } + } + + if(LOG.isDebugEnabled() && records != null){ + LOG.debug("No records retrieved from Kafka therefore nothing to iterate over."); + }else{ + LOG.debug("Retrieved records from Kafka to iterate over."); + } + return records != null ? records.iterator() : ConsumerRecords.<K, V>empty().iterator(); + } + return recordIterator; + } + + @Override + public void close() throws IOException { + LOG.debug("Closing the record reader."); + if(consumer != null) { + consumer.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java new file mode 100644 index 0000000..836039c --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.serializer.Decoder; +import kafka.serializer.Encoder; +import kafka.utils.VerifiableProperties; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.kafka.inputformat.KafkaInputFormatIT; +import org.apache.crunch.kafka.inputformat.KafkaRecordReaderIT; +import org.apache.crunch.kafka.utils.KafkaBrokerTestHarness; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +@RunWith(Suite.class) [email protected]({ + //org.apache.crunch.kafka + KafkaSourceIT.class, KafkaRecordsIterableIT.class, KafkaDataIT.class, + //org.apache.crunch.kafka.inputformat + KafkaRecordReaderIT.class, KafkaInputFormatIT.class, KafkaUtilsIT.class, +}) +public class ClusterTest { + + + private static TemporaryFolder folder = new TemporaryFolder(); + private static KafkaBrokerTestHarness kafka; + private static boolean runAsSuite = false; + private static Configuration conf; + private static FileSystem fs; + + @BeforeClass + public static void startSuite() throws Exception { + runAsSuite = true; + startKafka(); + setupFileSystem(); + } + + @AfterClass + public static void endSuite() throws Exception { + stopKafka(); + } + + public static void startTest() throws Exception { + if (!runAsSuite) { + startKafka(); + setupFileSystem(); + } + } + + public static void endTest() throws Exception { + if (!runAsSuite) { + stopKafka(); + } + } + + private static void stopKafka() throws IOException { + kafka.tearDown(); + } + + private static void startKafka() throws IOException { + Properties props = new Properties(); + props.setProperty("auto.create.topics.enable", Boolean.TRUE.toString()); + + kafka = new KafkaBrokerTestHarness(props); + kafka.setUp(); + } + + private static void setupFileSystem() throws IOException { + folder.create(); + + conf = new Configuration(); + conf.set(RuntimeParameters.TMP_DIR, folder.getRoot().getAbsolutePath()); + // Run Map/Reduce tests in process. + conf.set("mapreduce.jobtracker.address", "local"); + } + + public static Configuration getConf() { + // Clone the configuration so it doesn't get modified for other tests. + return new Configuration(conf); + } + + public static Properties getConsumerProperties() { + Properties props = new Properties(); + props.putAll(kafka.getProps()); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerDe.class.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerDe.class.getName()); + //set this because still needed by some APIs. + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("metadata.broker.list")); + props.setProperty("enable.auto.commit", Boolean.toString(false)); + + //when set this causes some problems with initializing the consumer. + props.remove(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + return props; + } + + public static Properties getProducerProperties() { + Properties props = new Properties(); + props.putAll(kafka.getProps()); + //set this because still needed by some APIs. + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("metadata.broker.list")); + return props; + } + + public static Configuration getConsumerConfig() { + Configuration kafkaConfig = new Configuration(conf); + KafkaUtils.addKafkaConnectionProperties(getConsumerProperties(), kafkaConfig); + return kafkaConfig; + } + + public static List<String> writeData(Properties props, String topic, String batch, int loops, int numValuesPerLoop) { + Properties producerProps = new Properties(); + producerProps.putAll(props); + producerProps.setProperty("serializer.class", StringEncoderDecoder.class.getName()); + producerProps.setProperty("key.serializer.class", StringEncoderDecoder.class.getName()); + + // Set the default compression used to be snappy + producerProps.setProperty("compression.codec", "snappy"); + producerProps.setProperty("request.required.acks", "1"); + + ProducerConfig producerConfig = new ProducerConfig(producerProps); + + Producer<String, String> producer = new Producer<>(producerConfig); + List<String> keys = new LinkedList<>(); + try { + for (int i = 0; i < loops; i++) { + List<KeyedMessage<String, String>> events = new LinkedList<>(); + for (int j = 0; j < numValuesPerLoop; j++) { + String key = "key" + batch + i + j; + String value = "value" + batch + i + j; + keys.add(key); + events.add(new KeyedMessage<>(topic, key, value)); + } + producer.send(events); + } + } finally { + producer.close(); + } + return keys; + } + + + public static class StringSerDe implements Serializer<String>, Deserializer<String> { + + @Override + public void configure(Map map, boolean b) { + + } + + @Override + public byte[] serialize(String topic, String value) { + return value.getBytes(); + } + + @Override + public String deserialize(String topic, byte[] bytes) { + return new String(bytes); + } + + @Override + public void close() { + + } + } + + public static class StringEncoderDecoder implements Encoder<String>, Decoder<String> { + + public StringEncoderDecoder() { + + } + + public StringEncoderDecoder(VerifiableProperties props) { + + } + + @Override + public String fromBytes(byte[] bytes) { + return new String(bytes); + } + + @Override + public byte[] toBytes(String value) { + return value.getBytes(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java new file mode 100644 index 0000000..595a94b --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka; + + +import kafka.api.OffsetRequest; +import org.apache.crunch.Pair; +import org.apache.kafka.common.TopicPartition; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.crunch.kafka.ClusterTest.writeData; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNot.not; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class KafkaDataIT { + + @Rule + public TestName testName = new TestName(); + + private String topic; + private Map<TopicPartition, Long> startOffsets; + private Map<TopicPartition, Long> stopOffsets; + private Map<TopicPartition, Pair<Long, Long>> offsets; + private Properties props; + + @BeforeClass + public static void init() throws Exception { + ClusterTest.startTest(); + } + + @AfterClass + public static void cleanup() throws Exception { + ClusterTest.endTest(); + } + + @Before + public void setup() { + topic = testName.getMethodName(); + + props = ClusterTest.getConsumerProperties(); + + startOffsets = new HashMap<>(); + stopOffsets = new HashMap<>(); + offsets = new HashMap<>(); + for (int i = 0; i < 4; i++) { + TopicPartition tp = new TopicPartition(topic, i); + startOffsets.put(tp, 0L); + stopOffsets.put(tp, 100L); + + offsets.put(tp, Pair.of(0L, 100L)); + } + } + + @Test + public void getDataIterable() throws IOException { + int loops = 10; + int numPerLoop = 100; + int total = loops * numPerLoop; + List<String> keys = writeData(props, topic, "batch", loops, numPerLoop); + + startOffsets = getStartOffsets(props, topic); + stopOffsets = getStopOffsets(props, topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey()))); + } + + Iterable<Pair<String, String>> data = new KafkaData<String, String>(props, offsets).read(null); + + int count = 0; + for (Pair<String, String> event : data) { + assertThat(keys, hasItem(event.first())); + assertTrue(keys.remove(event.first())); + count++; + } + + assertThat(count, is(total)); + assertThat(keys.size(), is(0)); + } + + private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) { + return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic); + } + + private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) { + return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic); + } +}
