STORM-822: Kafka Spout New Consumer API
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d26b984d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d26b984d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d26b984d Branch: refs/heads/master Commit: d26b984d74987c95badbdf8a73c74c4304bd4ec9 Parents: 955b445 Author: Hugo Louro <[email protected]> Authored: Mon Dec 14 10:16:42 2015 -0800 Committer: Robert (Bobby) Evans <[email protected]> Committed: Thu Mar 31 13:59:32 2016 -0500 ---------------------------------------------------------------------- examples/storm-starter/pom.xml | 16 +- external/flux/flux-examples/pom.xml | 13 +- external/flux/pom.xml | 13 +- external/sql/storm-sql-kafka/pom.xml | 16 +- external/storm-kafka-client/README.md | 9 + external/storm-kafka-client/pom.xml | 86 ++++ .../kafka/spout/KafkaRecordTupleBuilder.java | 44 ++ .../apache/storm/kafka/spout/KafkaSpout.java | 503 +++++++++++++++++++ .../storm/kafka/spout/KafkaSpoutConfig.java | 298 +++++++++++ .../storm/kafka/spout/KafkaSpoutMessageId.java | 111 ++++ .../storm/kafka/spout/KafkaSpoutStream.java | 66 +++ .../storm/kafka/spout/KafkaSpoutStreams.java | 162 ++++++ .../kafka/spout/KafkaSpoutTupleBuilder.java | 28 ++ .../spout/test/KafkaSpoutTopologyMain.java | 120 +++++ .../storm/kafka/spout/test/KafkaTestBolt.java | 52 ++ external/storm-kafka/pom.xml | 16 +- external/storm-solr/pom.xml | 8 +- pom.xml | 57 ++- storm-dist/binary/src/main/assembly/binary.xml | 14 + 19 files changed, 1544 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/examples/storm-starter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index 0b5fd97..112f1e8 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -158,26 +158,12 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.8.2.1</version> - <!-- use provided scope, so users can pull in whichever scala version they choose --> + <artifactId>${kafka.artifact.id}</artifactId> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>0.8.2.1</version> - <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/flux/flux-examples/pom.xml ---------------------------------------------------------------------- diff --git a/external/flux/flux-examples/pom.xml b/external/flux/flux-examples/pom.xml index 28d7239..48cc151 100644 --- a/external/flux/flux-examples/pom.xml +++ b/external/flux/flux-examples/pom.xml @@ -95,18 +95,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.8.1.1</version> - <exclusions> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> + <artifactId>${kafka.artifact.id}</artifactId> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/flux/pom.xml ---------------------------------------------------------------------- diff --git a/external/flux/pom.xml b/external/flux/pom.xml index 1fd1683..56d9bab 100644 --- a/external/flux/pom.xml +++ b/external/flux/pom.xml @@ -78,19 +78,8 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.8.1.1</version> + <artifactId>${kafka.artifact.id}</artifactId> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/sql/storm-sql-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml index 450611e..0642d17 100644 --- a/external/sql/storm-sql-kafka/pom.xml +++ b/external/sql/storm-sql-kafka/pom.xml @@ -63,26 +63,12 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.8.2.1</version> - <!-- use provided scope, so users can pull in whichever scala version they choose --> + <artifactId>${kafka.artifact.id}</artifactId> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>0.8.2.1</version> - <scope>provided</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/README.md ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md new file mode 100644 index 0000000..8ac15f5 --- /dev/null +++ b/external/storm-kafka-client/README.md @@ -0,0 +1,9 @@ +#Storm Kafka Spout New Consumer API + +This patch is still under development and it comes with no warranties at this moment. + +It has not been thoroughly tested, and therefore there may be some bugs and it is not ready for production. + +The documentation will be uploaded soon. + +To see how to use the new Kafka Spout, please refer to the example under tests. Thank you! \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml new file mode 100644 index 0000000..6c82b6a --- /dev/null +++ b/external/storm-kafka-client/pom.xml @@ -0,0 +1,86 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!--/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>storm-kafka-client</artifactId> + <name>storm-kafka-client</name> + + <packaging>jar</packaging> + + <developers> + <developer> + <id>hmcl</id> + <name>Hugo Louro</name> + <email>[email protected]</email> + </developer> + </developers> + + <dependencies> + <!--parent module dependency--> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <!--kafka libraries--> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </dependency> + <!--test dependencies --> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.5</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java new file mode 100644 index 0000000..4d67632 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.List; + +public class KafkaRecordTupleBuilder<K, V> implements KafkaSpoutTupleBuilder<K, V> { + @Override + public List<Object> buildTuple(final ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams) { + final Fields outputFields = kafkaSpoutStreams.getOutputFields(consumerRecord.topic()); + if (outputFields != null) { + if (outputFields.size() == 3) { + return new Values(consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset()); + } else if (outputFields.size() == 5) { + return new Values(consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.key(), + consumerRecord.value()); + } + } + throw new RuntimeException("Failed to build tuple. " + consumerRecord + " " + kafkaSpoutStreams); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java new file mode 100644 index 0000000..9a49ee8 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; + +public class KafkaSpout<K, V> extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); + private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator(); + + // Storm + protected SpoutOutputCollector collector; + + // Kafka + private final KafkaSpoutConfig<K, V> kafkaSpoutConfig; + private transient KafkaConsumer<K, V> kafkaConsumer; + private transient boolean consumerAutoCommitMode; + + + // Bookkeeping + private KafkaSpoutStreams kafkaSpoutStreams; + private KafkaSpoutTupleBuilder<K, V> tupleBuilder; + private transient Timer commitTimer; // timer == null for auto commit mode + private transient Timer logTimer; + private transient Map<TopicPartition, OffsetEntry> acked; // emitted tuples that were successfully acked. These tuples will be committed periodically when the timer expires, on consumer rebalance, or on close/deactivate + private transient int maxRetries; // Max number of times a tuple is retried + private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. + // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() + private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed + private transient FirstPollOffsetStrategy firstPollOffsetStrategy; + private transient PollStrategy pollStrategy; + + + public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaSpoutTupleBuilder<K, V> tupleBuilder) { + this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration + this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams(); + this.tupleBuilder = tupleBuilder; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + initialized = false; + + // Spout internals + this.collector = collector; + maxRetries = kafkaSpoutConfig.getMaxTupleRetries(); + numUncommittedOffsets = 0; + logTimer = new Timer(500, Math.min(1000, kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS); + + // Offset management + firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); + pollStrategy = kafkaSpoutConfig.getPollStrategy(); + consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); + + if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually + commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); + acked = new HashMap<>(); + } + + LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); + } + + // =========== Consumer Rebalance Listener - On the same thread as the caller =========== + + private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + if (!consumerAutoCommitMode && initialized) { + initialized = false; + commitOffsetsForAckedTuples(); + } + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + + initialize(partitions); + } + + private void initialize(Collection<TopicPartition> partitions) { + if (!consumerAutoCommitMode) { + acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout + } + + for (TopicPartition tp : partitions) { + final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); + final long fetchOffset = doSeek(tp, committedOffset); + setAcked(tp, fetchOffset); + } + initialized = true; + LOG.debug("Initialization complete"); + } + + /** + * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset + */ + private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { + long fetchOffset; + if (committedOffset != null) { // offset was committed for this TopicPartition + if (firstPollOffsetStrategy.equals(EARLIEST)) { + kafkaConsumer.seekToBeginning(tp); + fetchOffset = kafkaConsumer.position(tp); + } else if (firstPollOffsetStrategy.equals(LATEST)) { + kafkaConsumer.seekToEnd(tp); + fetchOffset = kafkaConsumer.position(tp); + } else { + // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. + fetchOffset = committedOffset.offset() + 1; + kafkaConsumer.seek(tp, fetchOffset); + } + } else { // no commits have ever been done, so start at the beginning or end depending on the strategy + if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { + kafkaConsumer.seekToBeginning(tp); + } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { + kafkaConsumer.seekToEnd(tp); + } + fetchOffset = kafkaConsumer.position(tp); + } + return fetchOffset; + } + } + + private void setAcked(TopicPartition tp, long fetchOffset) { + // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off + if (!consumerAutoCommitMode && !acked.containsKey(tp)) { + acked.put(tp, new OffsetEntry(tp, fetchOffset)); + } + } + + // ======== Next Tuple ======= + + @Override + public void nextTuple() { + if (initialized) { + if (commit()) { + commitOffsetsForAckedTuples(); + } else if (poll()) { + emitTuples(pollKafkaBroker()); + } else if (logTimer.isExpiredResetOnTrue()) { // to limit the number of messages that get printed. + log(); + } + } else { + LOG.debug("Spout not initialized. Not sending tuples until initialization completes"); + } + } + + private void log() { + switch(pollStrategy) { + case STREAM: + LOG.trace("Reached the maximum number number of uncommitted records [{}]. " + + "No more polls will occur until a sequence of commits sets the count under the [{}] threshold ", + numUncommittedOffsets, kafkaSpoutConfig.getMaxUncommittedOffsets()); + break; + case BATCH: + LOG.trace("No more polls will occur until the last batch completes. [{}] emitted tuples pending", numUncommittedOffsets); + break; + default: + throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy); + } + + } + + // always poll in auto commit mode because no state is kept and therefore there is no need to set an upper limit in memory + private boolean poll() { + switch(pollStrategy) { + case STREAM: + return consumerAutoCommitMode || numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets(); + case BATCH: + return consumerAutoCommitMode || numUncommittedOffsets <= 0; + default: + throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy); + } + } + + private boolean commit() { + return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + } + + private ConsumerRecords<K, V> pollKafkaBroker() { + final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + final int numPolledRecords = consumerRecords.count(); + numUncommittedOffsets+= numPolledRecords; + LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets); + return consumerRecords; + } + + private void emitTuples(ConsumerRecords<K, V> consumerRecords) { + for (TopicPartition tp : consumerRecords.partitions()) { + final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic()); + + for (final ConsumerRecord<K, V> record : records) { + final List<Object> tuple = tupleBuilder.buildTuple(record, kafkaSpoutStreams); + final KafkaSpoutMessageId messageId = new KafkaSpoutMessageId(record, tuple); + + kafkaSpoutStreams.emit(collector, messageId); // emits one tuple per record + LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); + } + } + } + + private void commitOffsetsForAckedTuples() { + // Find offsets that are ready to be committed for every topic partition + final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>(); + for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) { + final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(); + if (nextCommitOffset != null) { + nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset); + } + } + + // Commit offsets that are ready to be committed for every topic partition + if (!nextCommitOffsets.isEmpty()) { + kafkaConsumer.commitSync(nextCommitOffsets); + LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets); + // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition + // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop + for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) { + final OffsetEntry offsetEntry = tpOffset.getValue(); + offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey())); + } + } else { + LOG.trace("No offsets to commit. {}", this); + } + } + + // ======== Ack ======= + + @Override + public void ack(Object messageId) { + if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically + final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; + acked.get(msgId.getTopicPartition()).add(msgId); + LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked); + } + } + + // ======== Fail ======= + + @Override + public void fail(Object messageId) { + final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; + if (msgId.numFails() < maxRetries) { + msgId.incrementNumFails(); + kafkaSpoutStreams.emit(collector, msgId); + LOG.trace("Retried tuple with message id [{}]", msgId); + } else { // limit to max number of retries + LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); + ack(msgId); + } + } + + // ======== Activate / Deactivate / Close / Declare Outputs ======= + + @Override + public void activate() { + subscribeKafkaConsumer(); + } + + private void subscribeKafkaConsumer() { + kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), + kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); + kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), new KafkaSpoutConsumerRebalanceListener()); + // Initial poll to get the consumer registration process going. + // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration + kafkaConsumer.poll(0); + } + + @Override + public void deactivate() { + shutdown(); + } + + @Override + public void close() { + shutdown(); + } + + private void shutdown() { + try { + kafkaConsumer.wakeup(); + if (!consumerAutoCommitMode) { + commitOffsetsForAckedTuples(); + } + } finally { + //remove resources + kafkaConsumer.close(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + kafkaSpoutStreams.declareOutputFields(declarer); + } + + @Override + public String toString() { + return "{acked=" + acked + "} "; + } + + // ======= Offsets Commit Management ========== + + private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> { + public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) { + return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1; + } + } + + /** + * This class is not thread safe + */ + private class OffsetEntry { + private final TopicPartition tp; + private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. + * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ + private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1 + private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); // acked messages sorted by ascending order of offset + + public OffsetEntry(TopicPartition tp, long initialFetchOffset) { + this.tp = tp; + this.initialFetchOffset = initialFetchOffset; + this.committedOffset = initialFetchOffset - 1; + LOG.debug("Created OffsetEntry. {}", this); + } + + public void add(KafkaSpoutMessageId msgId) { // O(Log N) + ackedMsgs.add(msgId); + } + + /** + * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit. + */ + public OffsetAndMetadata findNextCommitOffset() { + boolean found = false; + long currOffset; + long nextCommitOffset = committedOffset; + KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata + + for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap + if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) { // found the next offset to commit + found = true; + nextCommitMsg = currAckedMsg; + nextCommitOffset = currOffset; + LOG.trace("Found offset to commit [{}]. {}", currOffset, this); + } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search + LOG.debug("Non continuous offset found [{}]. It will be processed in a subsequent batch. {}", currOffset, this); + break; + } else { + LOG.debug("Unexpected offset found [{}]. {}", currOffset, this); + break; + } + } + + OffsetAndMetadata nextCommitOffsetAndMetadata = null; + if (found) { + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); + LOG.debug("Offset to be committed next: [{}] {}", nextCommitOffsetAndMetadata.offset(), this); + } else { + LOG.debug("No offsets ready to commit. {}", this); + } + return nextCommitOffsetAndMetadata; + } + + /** + * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future + * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any. + * + * @param committedOffset offset to be marked as committed + */ + public void commit(OffsetAndMetadata committedOffset) { + if (committedOffset != null) { + final long numCommittedOffsets = committedOffset.offset() - this.committedOffset; + this.committedOffset = committedOffset.offset(); + for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) { + if (iterator.next().offset() <= committedOffset.offset()) { + iterator.remove(); + } else { + break; + } + } + numUncommittedOffsets-= numCommittedOffsets; + } + LOG.trace("Object state after update: {}, numUncommittedOffsets [{}]", this, numUncommittedOffsets); + } + + public boolean isEmpty() { + return ackedMsgs.isEmpty(); + } + + @Override + public String toString() { + return "OffsetEntry{" + + "topic-partition=" + tp + + ", fetchOffset=" + initialFetchOffset + + ", committedOffset=" + committedOffset + + ", ackedMsgs=" + ackedMsgs + + '}'; + } + } + + // =========== Timer =========== + + private class Timer { + private final long delay; + private final long period; + private final TimeUnit timeUnit; + private final long periodNanos; + private long start; + + /** + * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link + * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns + * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay. + * + * @param delay the initial delay before the timer starts + * @param period the period between calls {@link #isExpiredResetOnTrue()} + * @param timeUnit the time unit of delay and period + */ + public Timer(long delay, long period, TimeUnit timeUnit) { + this.delay = delay; + this.period = period; + this.timeUnit = timeUnit; + + periodNanos = timeUnit.toNanos(period); + start = System.nanoTime() + timeUnit.toNanos(delay); + } + + public long period() { + return period; + } + + public long delay() { + return delay; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + /** + * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the + * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset + * (re-initiated) and a new cycle will start. + * + * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false + * otherwise. + */ + public boolean isExpiredResetOnTrue() { + final boolean expired = System.nanoTime() - start > periodNanos; + if (expired) { + start = System.nanoTime(); + } + return expired; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java new file mode 100644 index 0000000..d969f1f --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.serialization.Deserializer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics + */ +public class KafkaSpoutConfig<K, V> implements Serializable { + public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000; // 2s + public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000; // 15s + public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever + public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000; // 10,000 records + + // Kafka property names + public interface Consumer { + String GROUP_ID = "group.id"; + String BOOTSTRAP_SERVERS = "bootstrap.servers"; + String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + String KEY_DESERIALIZER = "key.deserializer"; + String VALUE_DESERIALIZER = "value.deserializer"; + } + + /** + * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will + * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/> + * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/> + * <ul> + * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li> + * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li> + * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. + * If no offset has been committed, it behaves as EARLIEST.</li> + * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. + * If no offset has been committed, it behaves as LATEST.</li> + * </ul> + * */ + public enum FirstPollOffsetStrategy { + EARLIEST, + LATEST, + UNCOMMITTED_EARLIEST, + UNCOMMITTED_LATEST } + + /** + * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory + * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory + * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory. + * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM. + * <ul> + * <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets + * threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been + * committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)} + * </li> + * <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li> + * </ul> + */ + public enum PollStrategy { + STREAM, + BATCH + } + + // Kafka consumer configuration + private final Map<String, Object> kafkaProps; + private final Deserializer<K> keyDeserializer; + private final Deserializer<V> valueDeserializer; + private final long pollTimeoutMs; + + // Kafka spout configuration + private final long offsetCommitPeriodMs; + private final int maxRetries; + private final int maxUncommittedOffsets; + private final FirstPollOffsetStrategy firstPollOffsetStrategy; + private final PollStrategy pollStrategy; + private final KafkaSpoutStreams kafkaSpoutStreams; + + private KafkaSpoutConfig(Builder<K,V> builder) { + this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); + this.keyDeserializer = builder.keyDeserializer; + this.valueDeserializer = builder.valueDeserializer; + this.pollTimeoutMs = builder.pollTimeoutMs; + this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; + this.maxRetries = builder.maxRetries; + this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; + this.pollStrategy = builder.pollStrategy; + this.kafkaSpoutStreams = builder.kafkaSpoutStreams; + this.maxUncommittedOffsets = builder.maxUncommittedOffsets; + } + + private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) { + // set defaults for properties not specified + if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) { + kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false"); + } + return kafkaProps; + } + + public static class Builder<K,V> { + private Map<String, Object> kafkaProps; + private Deserializer<K> keyDeserializer; + private Deserializer<V> valueDeserializer; + private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; + private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; + private int maxRetries = DEFAULT_MAX_RETRIES; + private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; + private KafkaSpoutStreams kafkaSpoutStreams; + private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; + private PollStrategy pollStrategy = PollStrategy.STREAM; + + /*** + * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics + * The optional configuration can be specified using the set methods of this builder + * @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a> + * @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream. + */ + public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams) { + if (kafkaProps == null || kafkaProps.isEmpty()) { + throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required. " + kafkaProps); + } + + if (kafkaSpoutStreams == null) { + throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit in the same stream."); + } + this.kafkaProps = kafkaProps; + this.kafkaSpoutStreams = kafkaSpoutStreams; + } + + /** + * Specifying this key deserializer overrides the property key.deserializer + */ + public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) { + this.keyDeserializer = keyDeserializer; + return this; + } + + /** + * Specifying this value deserializer overrides the property value.deserializer + */ + public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) { + this.valueDeserializer = valueDeserializer; + return this; + } + + /** + * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s + * @param pollTimeoutMs time in ms + */ + public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) { + this.pollTimeoutMs = pollTimeoutMs; + return this; + } + + /** + * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s. + * @param offsetCommitPeriodMs time in ms + */ + public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) { + this.offsetCommitPeriodMs = offsetCommitPeriodMs; + return this; + } + + /** + * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that + * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of + * all the previously polled records. + * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous + * polled records in favor of processing more records. + * @param maxRetries max number of retrials + */ + public Builder<K,V> setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + /** + * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. + * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number + * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. + * @param maxUncommittedOffsets max number of records that can be be pending commit + */ + public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) { + this.maxUncommittedOffsets = maxUncommittedOffsets; + return this; + } + + /** + * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. + * Please refer to to the documentation in {@link FirstPollOffsetStrategy} + * @param firstPollOffsetStrategy Offset used by Kafka spout first poll + * */ + public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) { + this.firstPollOffsetStrategy = firstPollOffsetStrategy; + return this; + } + + /** + * Sets the strategy used by the the Kafka spout to decide when to poll the next batch of records from Kafka. + * Please refer to to the documentation in {@link PollStrategy} + * @param pollStrategy strategy used to decide when to poll + * */ + public Builder<K, V> setPollStrategy(PollStrategy pollStrategy) { + this.pollStrategy = pollStrategy; + return this; + } + + public KafkaSpoutConfig<K,V> build() { + return new KafkaSpoutConfig<>(this); + } + } + + public Map<String, Object> getKafkaProps() { + return kafkaProps; + } + + public Deserializer<K> getKeyDeserializer() { + return keyDeserializer; + } + + public Deserializer<V> getValueDeserializer() { + return valueDeserializer; + } + + public long getPollTimeoutMs() { + return pollTimeoutMs; + } + + public long getOffsetsCommitPeriodMs() { + return offsetCommitPeriodMs; + } + + public boolean isConsumerAutoCommitMode() { + return kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT) == null // default is true + || Boolean.valueOf((String)kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT)); + } + + public String getConsumerGroupId() { + return (String) kafkaProps.get(Consumer.GROUP_ID); + } + + public List<String> getSubscribedTopics() { + return new ArrayList<>(kafkaSpoutStreams.getTopics()); + } + + public int getMaxTupleRetries() { + return maxRetries; + } + + public FirstPollOffsetStrategy getFirstPollOffsetStrategy() { + return firstPollOffsetStrategy; + } + + public KafkaSpoutStreams getKafkaSpoutStreams() { + return kafkaSpoutStreams; + } + + public int getMaxUncommittedOffsets() { + return maxUncommittedOffsets; + } + + public PollStrategy getPollStrategy() { + return pollStrategy; + } + + @Override + public String toString() { + return "KafkaSpoutConfig{" + + "kafkaProps=" + kafkaProps + + ", keyDeserializer=" + keyDeserializer + + ", valueDeserializer=" + valueDeserializer + + ", topics=" + getSubscribedTopics() + + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy + + ", pollTimeoutMs=" + pollTimeoutMs + + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs + + ", maxRetries=" + maxRetries + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java new file mode 100644 index 0000000..0a6b126 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collections; +import java.util.List; + +public class KafkaSpoutMessageId { + private transient TopicPartition topicPart; + private transient long offset; + private transient List<Object> tuple; + private transient int numFails = 0; + + public KafkaSpoutMessageId(ConsumerRecord consumerRecord, List<Object> tuple) { + this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), tuple); + } + + public KafkaSpoutMessageId(TopicPartition topicPart, long offset, List<Object> tuple) { + this.topicPart = topicPart; + this.offset = offset; + this.tuple = tuple; + } + + public int partition() { + return topicPart.partition(); + } + + public String topic() { + return topicPart.topic(); + } + + public long offset() { + return offset; + } + + public int numFails() { + return numFails; + } + + public void incrementNumFails() { + ++numFails; + } + + public TopicPartition getTopicPartition() { + return topicPart; + } + + public List<Object> getTuple() { + return Collections.unmodifiableList(tuple); + } + + public String getMetadata(Thread currThread) { + return "{" + + "topic-partition=" + topicPart + + ", offset=" + offset + + ", numFails=" + numFails + + ", thread='" + currThread.getName() + "'" + + '}'; + } + + @Override + public String toString() { + return "{" + + "topic-partition=" + topicPart + + ", offset=" + offset + + ", numFails=" + numFails + + ", tuple=" + tuple + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KafkaSpoutMessageId messageId = (KafkaSpoutMessageId) o; + if (offset != messageId.offset) { + return false; + } + return topicPart.equals(messageId.topicPart); + } + + @Override + public int hashCode() { + int result = topicPart.hashCode(); + result = 31 * result + (int) (offset ^ (offset >>> 32)); + return result; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java new file mode 100644 index 0000000..43464a9 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; + +import java.io.Serializable; + +/** + * Represents the stream and output fields used by a topic + */ +public class KafkaSpoutStream implements Serializable { + private final Fields outputFields; + private final String streamId; + private final String topic; + + /** Declare specified outputFields with default stream for the specified topic */ + KafkaSpoutStream(Fields outputFields, String topic) { + this(outputFields, Utils.DEFAULT_STREAM_ID, topic); + } + + /** Declare specified outputFields with specified stream for the specified topic */ + KafkaSpoutStream(Fields outputFields, String streamId, String topic) { + this.outputFields = outputFields; + this.streamId = streamId; + this.topic = topic; + } + + public Fields getOutputFields() { + return outputFields; + } + + public String getStreamId() { + return streamId; + } + + public String getTopic() { + return topic; + } + + @Override + public String toString() { + return "KafkaSpoutStream{" + + "outputFields=" + outputFields + + ", streamId='" + streamId + '\'' + + ", topic='" + topic + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java new file mode 100644 index 0000000..30215d1 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.OutputFieldsGetter; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Represents the output streams associated with each topic, and provides a public API to + * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. + */ +public class KafkaSpoutStreams implements Serializable { + protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStreams.class); + + private final Map<String, KafkaSpoutStream> topicToStream; + + private KafkaSpoutStreams(Builder builder) { + this.topicToStream = builder.topicToStream; + LOG.debug("Built {}", this); + } + + /** + * @param topic the topic for which to get output fields + * @return the output fields declared + */ + public Fields getOutputFields(String topic) { + if (topicToStream.containsKey(topic)) { + final Fields outputFields = topicToStream.get(topic).getOutputFields(); + LOG.trace("Topic [{}] has output fields [{}]", topic, outputFields); + return outputFields; + } + throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); + } + + /** + * @param topic the topic to for which to get the stream id + * @return the id of the stream to where the tuples are emitted + */ + public String getStreamId(String topic) { + if (topicToStream.containsKey(topic)) { + final String streamId = topicToStream.get(topic).getStreamId(); + LOG.trace("Topic [{}] emitting in stream [{}]", topic, streamId); + return streamId; + } + throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); + } + + /** + * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream} + */ + public List<String> getTopics() { + return new ArrayList<>(topicToStream.keySet()); + } + + void declareOutputFields(OutputFieldsDeclarer declarer) { + for (KafkaSpoutStream stream : topicToStream.values()) { + if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) { + declarer.declareStream(stream.getStreamId(), stream.getOutputFields()); + LOG.debug("Declared " + stream); + } + } + } + + void emit(SpoutOutputCollector collector, KafkaSpoutMessageId messageId) { + collector.emit(getStreamId(messageId.topic()), messageId.getTuple(), messageId); + } + + @Override + public String toString() { + return "KafkaSpoutStreams{" + + "topicToStream=" + topicToStream + + '}'; + } + + public static class Builder { + private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();; + + /** + * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified. + * All the topics will have the same stream id and output fields. + */ + public Builder(Fields outputFields, String... topics) { + this(outputFields, Utils.DEFAULT_STREAM_ID, topics); + } + + /** + * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified. + * All the topics will have the same stream id and output fields. + */ + public Builder (Fields outputFields, String streamId, String... topics) { + for (String topic : topics) { + topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic)); + } + } + + /** + * Adds this stream to the state representing the streams associated with each topic + */ + public Builder(KafkaSpoutStream stream) { + topicToStream.put(stream.getTopic(), stream); + } + + /** + * Adds this stream to the state representing the streams associated with each topic + */ + public Builder addStream(KafkaSpoutStream stream) { + topicToStream.put(stream.getTopic(), stream); + return this; + } + + /** + * Please refer to javadoc in {@link #Builder(Fields, String...)} + */ + public Builder addStream(Fields outputFields, String... topics) { + for (String topic : topics) { + topicToStream.put(topic, new KafkaSpoutStream(outputFields, topic)); + } + return this; + } + + /** + * Please refer to javadoc in {@link #Builder(Fields, String, String...)} + */ + public Builder addStream(Fields outputFields, String streamId, String... topics) { + for (String topic : topics) { + topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic)); + } + return this; + } + + public KafkaSpoutStreams build() { + return new KafkaSpoutStreams(this); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java new file mode 100644 index 0000000..45aab48 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.io.Serializable; +import java.util.List; + +public interface KafkaSpoutTupleBuilder<K,V> extends Serializable { + List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java new file mode 100644 index 0000000..4fcc3ef --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.KafkaRecordTupleBuilder; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutStreams; +import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy.STREAM; + +public class KafkaSpoutTopologyMain { + private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"}; + private static final String[] TOPICS = new String[]{"test","test1","test2"}; + + + public static void main(String[] args) throws Exception { + if (args.length == 0) { + submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig()); + } else { + submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig()); + } + } + + protected static void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", config, topology); + stopWaitingForInput(); + } + + protected static void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception { + StormSubmitter.submitTopology(arg, config, topology); + } + + private static void stopWaitingForInput() { + try { + System.out.println("PRESS ENTER TO STOP"); + new BufferedReader(new InputStreamReader(System.in)).readLine(); + System.exit(0); + } catch (IOException e) { + e.printStackTrace(); + } + } + + protected static Config getConfig() { + Config config = new Config(); + config.setDebug(true); + return config; + } + + public static StormTopology getTopolgyKafkaSpout() { + final TopologyBuilder tp = new TopologyBuilder(); + tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams()), getTupleBuilder()), 1); + tp.setBolt("kafka_bolt", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]); + tp.setBolt("kafka_bolt_1", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]); + return tp.createTopology(); + } + + public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { + return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setPollStrategy(STREAM) + .setMaxUncommittedOffsets(250) + .build(); + } + + public static Map<String,Object> getKafkaConsumerProps() { + Map<String, Object> props = new HashMap<>(); +// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); + props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092"); + props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); + props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + public static KafkaSpoutTupleBuilder<String,String> getTupleBuilder() { + return new KafkaRecordTupleBuilder<>(); + } + + public static KafkaSpoutStreams getKafkaSpoutStreams() { + final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); + final Fields outputFields1 = new Fields("topic", "partition", "offset"); + return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream + .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream + .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream2 + .build(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java new file mode 100644 index 0000000..c9ff9d5 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class KafkaTestBolt extends BaseRichBolt { + protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBolt.class); + + + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + LOG.debug("input = [" + input + "]"); + collector.ack(input); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml index 43b7796..763c15f 100644 --- a/external/storm-kafka/pom.xml +++ b/external/storm-kafka/pom.xml @@ -118,26 +118,12 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>0.8.2.1</version> - <!-- use provided scope, so users can pull in whichever scala version they choose --> + <artifactId>${kafka.artifact.id}</artifactId> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> - <version>0.8.2.1</version> - <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-solr/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-solr/pom.xml b/external/storm-solr/pom.xml index d093ae8..ba79ddc 100644 --- a/external/storm-solr/pom.xml +++ b/external/storm-solr/pom.xml @@ -31,10 +31,10 @@ <developers> <developer> - <id>Hugo-Louro</id> - <name>Hugo Louro</name> - <email>[email protected]</email> - </developer> + <id>hmcl</id> + <name>Hugo Louro</name> + <email>[email protected]</email> + </developer> </developers> <dependencies> http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 12e5a9f..1a899b3 100644 --- a/pom.xml +++ b/pom.xml @@ -82,7 +82,7 @@ <roles> <role>Committer</role> </roles> - <timezone /> + <timezone/> </developer> <developer> <id>afeng</id> @@ -109,7 +109,7 @@ <roles> <role>Committer</role> </roles> - <timezone /> + <timezone/> </developer> <developer> <id>jjackson</id> @@ -249,6 +249,8 @@ <calcite.version>1.4.0-incubating</calcite.version> <jackson.version>2.6.3</jackson.version> <maven-surefire.version>2.18.1</maven-surefire.version> + <kafka.version>0.9.0.1</kafka.version> + <kafka.artifact.id>kafka_2.11</kafka.artifact.id> <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile --> <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude> @@ -282,6 +284,7 @@ <module>external/storm-mongodb</module> <module>examples/storm-starter</module> <module>storm-clojure</module> + <module>external/storm-kafka-client</module> </modules> <dependencies> @@ -673,14 +676,14 @@ <version>${ring-json.version}</version> </dependency> <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlet</artifactId> - <version>${jetty.version}</version> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>${jetty.version}</version> </dependency> <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlets</artifactId> - <version>${jetty.version}</version> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlets</artifactId> + <version>${jetty.version}</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> @@ -831,7 +834,7 @@ <version>${thrift.version}</version> <scope>compile</scope> </dependency> - <!-- used by examples/storm-starter --> + <!-- used by examples/storm-starter --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> @@ -839,14 +842,38 @@ <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.calcite</groupId> - <artifactId>calcite-core</artifactId> - <version>${calcite.version}</version> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <version>${calcite.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>${kafka.artifact.id}</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>${jackson.version}</version> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> </dependency> <dependency> <groupId>uk.org.lidalia</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/storm-dist/binary/src/main/assembly/binary.xml ---------------------------------------------------------------------- diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml index 7f0da6f..648640e 100644 --- a/storm-dist/binary/src/main/assembly/binary.xml +++ b/storm-dist/binary/src/main/assembly/binary.xml @@ -317,6 +317,20 @@ <include>README.*</include> </includes> </fileSet> + <fileSet> + <directory>${project.basedir}/../../external/storm-kafka-client/target</directory> + <outputDirectory>external/storm-kafka-client</outputDirectory> + <includes> + <include>storm*jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${project.basedir}/../../external/storm-kafka-client</directory> + <outputDirectory>external/storm-kafka-client</outputDirectory> + <includes> + <include>README.*</include> + </includes> + </fileSet> <!-- $STORM_HOME/extlib --> <fileSet>
