http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index d405c4d..67ff62a 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -18,21 +18,11 @@ package org.apache.storm.kafka.spout; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichSpout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -46,18 +36,28 @@ import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; - -import org.apache.kafka.common.errors.InterruptException; +import org.apache.storm.kafka.spout.internal.Timer; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaSpout<K, V> extends BaseRichSpout { + private static final long serialVersionUID = 4151921085047987154L; private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator(); @@ -79,13 +79,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() - private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples - private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; // Object that contains the logic to build tuples for each ConsumerRecord - transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed + private transient TopologyContext context; + private transient Timer refreshSubscriptionTimer; // Used to say when a subscription should be refreshed public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { @@ -95,13 +94,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout { //This constructor is here for testing KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) { this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration - this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams(); this.kafkaConsumerFactory = kafkaConsumerFactory; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { initialized = false; + this.context = context; // Spout internals this.collector = collector; @@ -115,12 +114,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // Retries management retryService = kafkaSpoutConfig.getRetryService(); - // Tuples builder delegate - tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder(); - if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); } + refreshSubscriptionTimer = new Timer(500, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS); acked = new HashMap<>(); emitted = new HashSet<>(); @@ -158,7 +155,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { retryService.retainAll(partitions); //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted. - Set<TopicPartition> partitionsSet = new HashSet(partitions); + Set<TopicPartition> partitionsSet = new HashSet<>(partitions); emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition())); for (TopicPartition tp : partitions) { @@ -177,10 +174,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { long fetchOffset; if (committedOffset != null) { // offset was committed for this TopicPartition if (firstPollOffsetStrategy.equals(EARLIEST)) { - kafkaConsumer.seekToBeginning(toArrayList(tp)); + kafkaConsumer.seekToBeginning(Collections.singleton(tp)); fetchOffset = kafkaConsumer.position(tp); } else if (firstPollOffsetStrategy.equals(LATEST)) { - kafkaConsumer.seekToEnd(toArrayList(tp)); + kafkaConsumer.seekToEnd(Collections.singleton(tp)); fetchOffset = kafkaConsumer.position(tp); } else { // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. @@ -189,9 +186,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } } else { // no commits have ever been done, so start at the beginning or end depending on the strategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { - kafkaConsumer.seekToBeginning(toArrayList(tp)); + kafkaConsumer.seekToBeginning(Collections.singleton(tp)); } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { - kafkaConsumer.seekToEnd(toArrayList(tp)); + kafkaConsumer.seekToEnd(Collections.singleton(tp)); } fetchOffset = kafkaConsumer.position(tp); } @@ -199,10 +196,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } } - private Collection<TopicPartition> toArrayList(final TopicPartition tp) { - return new ArrayList<TopicPartition>(1){{add(tp);}}; - } - private void setAcked(TopicPartition tp, long fetchOffset) { // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off if (!consumerAutoCommitMode && !acked.containsKey(tp)) { @@ -221,7 +214,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } if (poll()) { - setWaitingToEmit(pollKafkaBroker()); + try { + setWaitingToEmit(pollKafkaBroker()); + } catch (RetriableException e) { + LOG.error("Failed to poll from kafka.", e); + } } if (waitingToEmit()) { @@ -276,7 +273,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout { // ======== poll ========= private ConsumerRecords<K, V> pollKafkaBroker() { doSeekRetriableTopicPartitions(); - + if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { + kafkaSpoutConfig.getSubscription().refreshAssignment(); + } final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); final int numPolledRecords = consumerRecords.count(); LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets); @@ -303,7 +302,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } } - //Emits one tuple per record //@return true if tuple was emitted private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) { @@ -317,15 +315,19 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } else { boolean isScheduled = retryService.isScheduled(msgId); if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried - final List<Object> tuple = tuplesBuilder.buildTuple(record); - kafkaSpoutStreams.emit(collector, tuple, msgId); + final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record); + if (tuple instanceof KafkaTuple) { + collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId); + } else { + collector.emit(tuple, msgId); + } emitted.add(msgId); numUncommittedOffsets++; if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule. retryService.remove(msgId); } LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); - return true; + return true; } } return false; @@ -405,18 +407,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private void subscribeKafkaConsumer() { kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig); - if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) { - final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics(); - kafkaConsumer.subscribe(topics, new KafkaSpoutConsumerRebalanceListener()); - LOG.info("Kafka consumer subscribed topics {}", topics); - } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) { - final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern(); - kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener()); - LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern); - } - // Initial poll to get the consumer registration process going. - // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration - kafkaConsumer.poll(0); + kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context); } @Override @@ -450,7 +441,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - kafkaSpoutStreams.declareOutputFields(declarer); + RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator(); + for (String stream: translator.streams()) { + declarer.declareStream(stream, translator.getFieldsFor(stream)); + } } @Override @@ -469,11 +463,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } String configKeyPrefix = "config."; - if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) { - configuration.put(configKeyPrefix + "topics", getNamedTopics()); - } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) { - configuration.put(configKeyPrefix + "topics", getWildCardTopics()); - } + configuration.put(configKeyPrefix + "topics", getTopicsString()); configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId()); configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers")); @@ -481,16 +471,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { return configuration; } - private String getNamedTopics() { - StringBuilder topics = new StringBuilder(); - for (String topic: kafkaSpoutConfig.getSubscribedTopics()) { - topics.append(topic).append(","); - } - return topics.toString(); - } - - private String getWildCardTopics() { - return kafkaSpoutConfig.getTopicWildcardPattern().toString(); + private String getTopicsString() { + return kafkaSpoutConfig.getSubscription().getTopicsString(); } // ======= Offsets Commit Management ========== @@ -594,7 +576,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { return ackedMsgs.isEmpty(); } - public boolean contains(ConsumerRecord record) { + public boolean contains(ConsumerRecord<K, V> record) { return contains(new KafkaSpoutMessageId(record)); } @@ -612,60 +594,4 @@ public class KafkaSpout<K, V> extends BaseRichSpout { '}'; } } - - // =========== Timer =========== - - private class Timer { - private final long delay; - private final long period; - private final TimeUnit timeUnit; - private final long periodNanos; - private long start; - - /** - * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link - * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns - * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay. - * - * @param delay the initial delay before the timer starts - * @param period the period between calls {@link #isExpiredResetOnTrue()} - * @param timeUnit the time unit of delay and period - */ - public Timer(long delay, long period, TimeUnit timeUnit) { - this.delay = delay; - this.period = period; - this.timeUnit = timeUnit; - - periodNanos = timeUnit.toNanos(period); - start = System.nanoTime() + timeUnit.toNanos(delay); - } - - public long period() { - return period; - } - - public long delay() { - return delay; - } - - public TimeUnit getTimeUnit() { - return timeUnit; - } - - /** - * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the - * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset - * (re-initiated) and a new cycle will start. - * - * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false - * otherwise. - */ - public boolean isExpiredResetOnTrue() { - final boolean expired = System.nanoTime() - start > periodNanos; - if (expired) { - start = System.nanoTime(); - } - return expired; - } - } }
http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 8aa525b..db07fda 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -18,34 +18,40 @@ package org.apache.storm.kafka.spout; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; - import java.io.Serializable; -import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.tuple.Fields; + /** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics */ public class KafkaSpoutConfig<K, V> implements Serializable { + private static final long serialVersionUID = 141902646130682494L; public static final long DEFAULT_POLL_TIMEOUT_MS = 200; // 200ms public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; // 30s public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 10,000,000 records => 80MBs of memory footprint in the worst case - - // Kafka property names - public interface Consumer { - String GROUP_ID = "group.id"; - String BOOTSTRAP_SERVERS = "bootstrap.servers"; - String ENABLE_AUTO_COMMIT = "enable.auto.commit"; - String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; - String KEY_DESERIALIZER = "key.deserializer"; - String VALUE_DESERIALIZER = "value.deserializer"; - } + public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; // 2s + public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), + DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); + /** + * Retry in a tight loop (keep unit tests fasts) do not use in production. + */ + public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0), + DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0)); /** * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will @@ -60,123 +66,254 @@ public class KafkaSpoutConfig<K, V> implements Serializable { * If no offset has been committed, it behaves as LATEST.</li> * </ul> * */ - public enum FirstPollOffsetStrategy { + public static enum FirstPollOffsetStrategy { EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST } - - // Kafka consumer configuration - private final Map<String, Object> kafkaProps; - private final Deserializer<K> keyDeserializer; - private final Deserializer<V> valueDeserializer; - private final long pollTimeoutMs; - - // Kafka spout configuration - private final long offsetCommitPeriodMs; - private final int maxRetries; - private final int maxUncommittedOffsets; - private final FirstPollOffsetStrategy firstPollOffsetStrategy; - private final KafkaSpoutStreams kafkaSpoutStreams; - private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; - private final KafkaSpoutRetryService retryService; - - private KafkaSpoutConfig(Builder<K,V> builder) { - this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); - this.keyDeserializer = builder.keyDeserializer; - this.valueDeserializer = builder.valueDeserializer; - this.pollTimeoutMs = builder.pollTimeoutMs; - this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; - this.maxRetries = builder.maxRetries; - this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; - this.kafkaSpoutStreams = builder.kafkaSpoutStreams; - this.maxUncommittedOffsets = builder.maxUncommittedOffsets; - this.tuplesBuilder = builder.tuplesBuilder; - this.retryService = builder.retryService; + + public static Builder<String, String> builder(String bootstrapServers, String ... topics) { + return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); } - - private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) { + + public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) { + return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); + } + + public static Builder<String, String> builder(String bootstrapServers, Pattern topics) { + return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); + } + + private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) { // set defaults for properties not specified - if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) { - kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false"); + if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { + kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } return kafkaProps; } - + public static class Builder<K,V> { private final Map<String, Object> kafkaProps; - private SerializableDeserializer<K> keyDeserializer; - private SerializableDeserializer<V> valueDeserializer; + private Subscription subscription; + private final SerializableDeserializer<K> keyDes; + private final Class<? extends Deserializer<K>> keyDesClazz; + private final SerializableDeserializer<V> valueDes; + private final Class<? extends Deserializer<V>> valueDesClazz; + private RecordTranslator<K, V> translator; private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; private int maxRetries = DEFAULT_MAX_RETRIES; private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; - private final KafkaSpoutStreams kafkaSpoutStreams; private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; - private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; - private final KafkaSpoutRetryService retryService; + private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE; + private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; + + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) { + this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + } + + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) { + this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + } + + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) { + this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics)); + } + + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) { + this(bootstrapServers, keyDes, null, valDes, null, subscription); + } + + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String ... topics) { + this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + } + + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) { + this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + } + + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) { + this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics)); + } + + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) { + this(bootstrapServers, null, keyDes, null, valDes, subscription); + } + + private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) { + kafkaProps = new HashMap<>(); + if (bootstrapServers == null || bootstrapServers.isEmpty()) { + throw new IllegalArgumentException("bootstrap servers cannot be null"); + } + kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + this.keyDes = keyDes; + this.keyDesClazz = keyDesClazz; + this.valueDes = valDes; + this.valueDesClazz = valDesClazz; + this.subscription = subscription; + this.translator = new DefaultRecordTranslator<K,V>(); + } + + private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) { + this.kafkaProps = new HashMap<>(builder.kafkaProps); + this.subscription = builder.subscription; + this.pollTimeoutMs = builder.pollTimeoutMs; + this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; + this.maxRetries = builder.maxRetries; + this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; + this.maxUncommittedOffsets = builder.maxUncommittedOffsets; + //this could result in a lot of class case exceptions at runtime, + // but because some translators will work no matter what the generics + // are I thought it best not to force someone to reset the translator + // when they change the key/value types. + this.translator = (RecordTranslator<K, V>) builder.translator; + this.retryService = builder.retryService; + this.keyDes = keyDes; + this.keyDesClazz = keyDesClazz; + this.valueDes = valueDes; + this.valueDesClazz = valueDesClazz; + } /** - * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/> - * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/> - * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), - * DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))} + * Specifying this key deserializer overrides the property key.deserializer. If you have + * set a custom RecordTranslator before calling this it may result in class cast + * exceptions at runtime. */ - public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams, - KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) { - this(kafkaProps, kafkaSpoutStreams, tuplesBuilder, - new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), - DEFAULT_MAX_RETRIES, TimeInterval.seconds(10))); + public <NK> Builder<NK,V> setKey(SerializableDeserializer<NK> keyDeserializer) { + return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz); } - - /*** - * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics - * The optional configuration can be specified using the set methods of this builder - * @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a> - * @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream. - * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s. - * @param retryService logic that manages the retrial of failed tuples + + /** + * Specify a class that can be instantiated to create a key.deserializer + * This is the same as setting key.deserializer, but overrides it. If you have + * set a custom RecordTranslator before calling this it may result in class cast + * exceptions at runtime. */ - public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams, - KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) { - if (kafkaProps == null || kafkaProps.isEmpty()) { - throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps); - } - - if (kafkaSpoutStreams == null) { - throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream"); - } - - if (tuplesBuilder == null) { - throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); - } - - if (retryService == null) { - throw new IllegalArgumentException("Must specify at implementation of retry service"); - } - - this.kafkaProps = kafkaProps; - this.kafkaSpoutStreams = kafkaSpoutStreams; - this.tuplesBuilder = tuplesBuilder; - this.retryService = retryService; + public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) { + return new Builder<>(this, null, clazz, valueDes, valueDesClazz); } /** - * Specifying this key deserializer overrides the property key.deserializer + * Specifying this value deserializer overrides the property value.deserializer. If you have + * set a custom RecordTranslator before calling this it may result in class cast + * exceptions at runtime. */ - public Builder<K,V> setKeyDeserializer(SerializableDeserializer<K> keyDeserializer) { - this.keyDeserializer = keyDeserializer; + public <NV> Builder<K,NV> setValue(SerializableDeserializer<NV> valueDeserializer) { + return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null); + } + + /** + * Specify a class that can be instantiated to create a value.deserializer + * This is the same as setting value.deserializer, but overrides it. If you have + * set a custom RecordTranslator before calling this it may result in class cast + * exceptions at runtime. + */ + public <NV> Builder<K,NV> setValue(Class<? extends Deserializer<NV>> clazz) { + return new Builder<>(this, keyDes, keyDesClazz, null, clazz); + } + + /** + * Set a Kafka property config + */ + public Builder<K,V> setProp(String key, Object value) { + kafkaProps.put(key, value); return this; } - + /** - * Specifying this value deserializer overrides the property value.deserializer + * Set multiple Kafka property configs */ - public Builder<K,V> setValueDeserializer(SerializableDeserializer<V> valueDeserializer) { - this.valueDeserializer = valueDeserializer; + public Builder<K,V> setProp(Map<String, Object> props) { + kafkaProps.putAll(props); return this; } + + /** + * Set multiple Kafka property configs + */ + public Builder<K,V> setProp(Properties props) { + for (String name: props.stringPropertyNames()) { + kafkaProps.put(name, props.get(name)); + } + return this; + } + + /** + * Set the group.id for the consumers + */ + public Builder<K,V> setGroupId(String id) { + return setProp("group.id", id); + } + + /** + * reset the bootstrap servers for the Consumer + */ + public Builder<K,V> setBootstrapServers(String servers) { + return setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + } + + /** + * The minimum amount of data the broker should return for a fetch request. + */ + public Builder<K,V> setFetchMinBytes(int bytes) { + return setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, bytes); + } + + /** + * The maximum amount of data per-partition the broker will return. + */ + public Builder<K,V> setMaxPartitionFectchBytes(int bytes) { + return setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, bytes); + } + + /** + * The maximum number of records a poll will return. + * Will only work with Kafka 0.10.0 and above. + */ + public Builder<K,V> setMaxPollRecords(int records) { + //to avoid issues with 0.9 versions that technically still work + // with this we do not use ConsumerConfig.MAX_POLL_RECORDS_CONFIG + return setProp("max.poll.records", records); + } + + //Security Related Configs + + /** + * Configure the SSL Keystore for mutual authentication + */ + public Builder<K,V> setSSLKeystore(String location, String password) { + return setProp("ssl.keystore.location", location) + .setProp("ssl.keystore.password", password); + } + + /** + * Configure the SSL Keystore for mutual authentication + */ + public Builder<K,V> setSSLKeystore(String location, String password, String keyPassword) { + return setProp("ssl.key.password", keyPassword) + .setSSLKeystore(location, password); + } + + /** + * Configure the SSL Truststore to authenticate with the brokers + */ + public Builder<K,V> setSSLTruststore(String location, String password) { + return setSecurityProtocol("SSL") + .setProp("ssl.truststore.location", location) + .setProp("ssl.truststore.password", password); + } + + /** + * Protocol used to communicate with brokers. + * Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. + */ + public Builder<K, V> setSecurityProtocol(String protocol) { + return setProp("security.protocol", protocol); + } + //Spout Settings /** * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s * @param pollTimeoutMs time in ms @@ -228,22 +365,131 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.firstPollOffsetStrategy = firstPollOffsetStrategy; return this; } + + /** + * Sets the retry service for the spout to use. + * @param retryService the new retry service + * @return the builder (this). + */ + public Builder<K, V> setRetry(KafkaSpoutRetryService retryService) { + if (retryService == null) { + throw new NullPointerException("retryService cannot be null"); + } + this.retryService = retryService; + return this; + } + public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> translator) { + this.translator = translator; + return this; + } + + /** + * Configure a translator with tuples to be emitted on the default stream. + * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted + * @param fields the names of the fields extracted + * @return this to be able to chain configuration + */ + public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) { + return setRecordTranslator(new SimpleRecordTranslator<>(func, fields)); + } + + /** + * Configure a translator with tuples to be emitted to a given stream. + * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted + * @param fields the names of the fields extracted + * @param stream the stream to emit the tuples on + * @return this to be able to chain configuration + */ + public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) { + return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream)); + } + + /** + * Sets partition refresh period in milliseconds. This is how often kafka will be polled + * to check for new topics and/or new partitions. + * This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and + * PatternSubscription rely on kafka to handle this instead. + * @param partitionRefreshPeriodMs time in milliseconds + * @return the builder (this) + */ + public Builder<K, V> setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) { + this.partitionRefreshPeriodMs = partitionRefreshPeriodMs; + return this; + } + public KafkaSpoutConfig<K,V> build() { return new KafkaSpoutConfig<>(this); } } + // Kafka consumer configuration + private final Map<String, Object> kafkaProps; + private final Subscription subscription; + private final SerializableDeserializer<K> keyDes; + private final Class<? extends Deserializer<K>> keyDesClazz; + private final SerializableDeserializer<V> valueDes; + private final Class<? extends Deserializer<V>> valueDesClazz; + private final long pollTimeoutMs; + + // Kafka spout configuration + private final RecordTranslator<K, V> translator; + private final long offsetCommitPeriodMs; + private final int maxRetries; + private final int maxUncommittedOffsets; + private final FirstPollOffsetStrategy firstPollOffsetStrategy; + private final KafkaSpoutRetryService retryService; + private final long partitionRefreshPeriodMs; + + private KafkaSpoutConfig(Builder<K,V> builder) { + this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); + this.subscription = builder.subscription; + this.translator = builder.translator; + this.pollTimeoutMs = builder.pollTimeoutMs; + this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; + this.maxRetries = builder.maxRetries; + this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; + this.maxUncommittedOffsets = builder.maxUncommittedOffsets; + this.retryService = builder.retryService; + this.keyDes = builder.keyDes; + this.keyDesClazz = builder.keyDesClazz; + this.valueDes = builder.valueDes; + this.valueDesClazz = builder.valueDesClazz; + this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; + } + public Map<String, Object> getKafkaProps() { return kafkaProps; } public Deserializer<K> getKeyDeserializer() { - return keyDeserializer; + if (keyDesClazz != null) { + try { + return keyDesClazz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Could not instantiate key deserializer " + keyDesClazz); + } + } + return keyDes; } public Deserializer<V> getValueDeserializer() { - return valueDeserializer; + if (valueDesClazz != null) { + try { + return valueDesClazz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Could not instantiate value deserializer " + valueDesClazz); + } + } + return valueDes; + } + + public Subscription getSubscription() { + return subscription; + } + + public RecordTranslator<K,V> getTranslator() { + return translator; } public long getPollTimeoutMs() { @@ -255,32 +501,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable { } public boolean isConsumerAutoCommitMode() { - return kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT) == null // default is true - || Boolean.valueOf((String)kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT)); + return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is true + || Boolean.valueOf((String)kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); } public String getConsumerGroupId() { - return (String) kafkaProps.get(Consumer.GROUP_ID); - } - - /** - * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}, - * or null if this stream is associated with a wildcard pattern topic - */ - public List<String> getSubscribedTopics() { - return kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics ? - new ArrayList<>(((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics()) : - null; - } - - /** - * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null - * if this stream is associated with a specific named topic - */ - public Pattern getTopicWildcardPattern() { - return kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics ? - ((KafkaSpoutStreamsWildcardTopics)kafkaSpoutStreams).getTopicWildcardPattern() : - null; + return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG); } public int getMaxTupleRetries() { @@ -291,38 +517,32 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return firstPollOffsetStrategy; } - public KafkaSpoutStreams getKafkaSpoutStreams() { - return kafkaSpoutStreams; - } - public int getMaxUncommittedOffsets() { return maxUncommittedOffsets; } - public KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() { - return tuplesBuilder; - } - public KafkaSpoutRetryService getRetryService() { return retryService; } + + public long getPartitionRefreshPeriodMs() { + return partitionRefreshPeriodMs; + } @Override public String toString() { return "KafkaSpoutConfig{" + "kafkaProps=" + kafkaProps + - ", keyDeserializer=" + keyDeserializer + - ", valueDeserializer=" + valueDeserializer + + ", key=" + getKeyDeserializer() + + ", value=" + getValueDeserializer() + ", pollTimeoutMs=" + pollTimeoutMs + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs + ", maxRetries=" + maxRetries + ", maxUncommittedOffsets=" + maxUncommittedOffsets + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy + - ", kafkaSpoutStreams=" + kafkaSpoutStreams + - ", tuplesBuilder=" + tuplesBuilder + + ", subscription=" + subscription + + ", translator=" + translator + ", retryService=" + retryService + - ", topics=" + getSubscribedTopics() + - ", topicWildcardPattern=" + getTopicWildcardPattern() + '}'; } } http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java index 71f8327..3cfad9d 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -26,7 +26,7 @@ public class KafkaSpoutMessageId { private transient long offset; private transient int numFails = 0; - public KafkaSpoutMessageId(ConsumerRecord consumerRecord) { + public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) { this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset()); } http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java deleted file mode 100644 index 0f444b4..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.List; -import java.util.regex.Pattern; - -/** - * Represents the stream and output fields used by a topic - */ -public class KafkaSpoutStream implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStream.class); - - private final Fields outputFields; - private final String streamId; - private final String topic; - private Pattern topicWildcardPattern; - - /** Represents the specified outputFields and topic with the default stream */ - public KafkaSpoutStream(Fields outputFields, String topic) { - this(outputFields, Utils.DEFAULT_STREAM_ID, topic); - } - - /** Represents the specified outputFields and topic with the specified stream */ - public KafkaSpoutStream(Fields outputFields, String streamId, String topic) { - if (outputFields == null || streamId == null || topic == null) { - throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " + - "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic)); - } - this.outputFields = outputFields; - this.streamId = streamId; - this.topic = topic; - this.topicWildcardPattern = null; - } - - /** Represents the specified outputFields and topic wild card with the default stream */ - public KafkaSpoutStream(Fields outputFields, Pattern topicWildcardPattern) { - this(outputFields, Utils.DEFAULT_STREAM_ID, topicWildcardPattern); - } - - /** Represents the specified outputFields and topic wild card with the specified stream */ - public KafkaSpoutStream(Fields outputFields, String streamId, Pattern topicWildcardPattern) { - - if (outputFields == null || streamId == null || topicWildcardPattern == null) { - throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " + - "[outputFields=%s, streamId=%s, topicWildcardPattern=%s]", outputFields, streamId, topicWildcardPattern)); - } - this.outputFields = outputFields; - this.streamId = streamId; - this.topic = null; - this.topicWildcardPattern = topicWildcardPattern; - } - - public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { - collector.emit(streamId, tuple, messageId); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - LOG.info("Declared [streamId = {}], [outputFields = {}] for [topic = {}]", streamId, outputFields, topic); - declarer.declareStream(streamId, outputFields); - } - - - public Fields getOutputFields() { - return outputFields; - } - - public String getStreamId() { - return streamId; - } - - /** - * @return the topic associated with this {@link KafkaSpoutStream}, or null - * if this stream is associated with a wildcard pattern topic - */ - public String getTopic() { - return topic; - } - - /** - * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null - * if this stream is associated with a specific named topic - */ - public Pattern getTopicWildcardPattern() { - return topicWildcardPattern; - } - - @Override - public String toString() { - return "KafkaSpoutStream{" + - "outputFields=" + outputFields + - ", streamId='" + streamId + '\'' + - ", topic='" + topic + '\'' + - ", topicWildcardPattern=" + topicWildcardPattern + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java deleted file mode 100644 index d4178a9..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; - -import java.io.Serializable; -import java.util.List; - -/** - * Represents the {@link KafkaSpoutStream} associated with each topic or topic pattern (wildcard), and provides - * a public API to declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. - */ -public interface KafkaSpoutStreams extends Serializable { - void declareOutputFields(OutputFieldsDeclarer declarer); - - void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId); - - Fields getOutputFields(); -} http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java deleted file mode 100644 index bc2426a..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.OutputFieldsGetter; -import org.apache.storm.tuple.Fields; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to - * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. - */ -public class KafkaSpoutStreamsNamedTopics implements KafkaSpoutStreams { - private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStreamsNamedTopics.class); - - private final Map<String, KafkaSpoutStream> topicToStream; - - private KafkaSpoutStreamsNamedTopics(Builder builder) { - this.topicToStream = builder.topicToStream; - LOG.debug("Built {}", this); - } - - /** - * @param topic the topic for which to get output fields - * @return the declared output fields - */ - public Fields getOutputFields(String topic) { - if (topicToStream.containsKey(topic)) { - final Fields outputFields = topicToStream.get(topic).getOutputFields(); - LOG.trace("Topic [{}] has output fields [{}]", topic, outputFields); - return outputFields; - } - throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); - } - - @Override - public Fields getOutputFields() { - final Set<String> allFields = new LinkedHashSet<>(); - for (KafkaSpoutStream kafkaSpoutStream : topicToStream.values()) { - allFields.addAll(kafkaSpoutStream.getOutputFields().toList()); - } - return new Fields(new ArrayList<>(allFields)); - } - - /** - * @param topic the topic to for which to get the stream id - * @return the id of the stream to where the tuples are emitted - */ - public KafkaSpoutStream getStream(String topic) { - if (topicToStream.containsKey(topic)) { - return topicToStream.get(topic); - } - throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); - } - - /** - * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream} - */ - public List<String> getTopics() { - return new ArrayList<>(topicToStream.keySet()); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (KafkaSpoutStream stream : topicToStream.values()) { - if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) { - stream.declareOutputFields(declarer); - } - } - } - - public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { - getStream(messageId.topic()).emit(collector, tuple, messageId); - } - - @Override - public String toString() { - return "KafkaSpoutStreamsNamedTopics{" + - "topicToStream=" + topicToStream + - '}'; - } - - public static class Builder { - private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();; - - /** - * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified. - * All topics will have the default stream id and the same output fields. - */ - public Builder(Fields outputFields, String... topics) { - addStream(outputFields, topics); - } - - /** - * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified. - * All the topics will have the specified stream id and the same output fields. - */ - public Builder (Fields outputFields, String streamId, String... topics) { - addStream(outputFields, streamId, topics); - } - - /** - * Adds this stream to the state representing the streams associated with each topic - */ - public Builder(KafkaSpoutStream stream) { - addStream(stream); - } - - /** - * Adds this stream to the state representing the streams associated with each topic - */ - public Builder addStream(KafkaSpoutStream stream) { - topicToStream.put(stream.getTopic(), stream); - return this; - } - - /** - * Please refer to javadoc in {@link #Builder(Fields, String...)} - */ - public Builder addStream(Fields outputFields, String... topics) { - addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics); - return this; - } - - /** - * Please refer to javadoc in {@link #Builder(Fields, String, String...)} - */ - public Builder addStream(Fields outputFields, String streamId, String... topics) { - for (String topic : topics) { - topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic)); - } - return this; - } - - public KafkaSpoutStreamsNamedTopics build() { - return new KafkaSpoutStreamsNamedTopics(this); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java deleted file mode 100644 index 64132b3..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; - -import java.util.List; -import java.util.regex.Pattern; - -public class KafkaSpoutStreamsWildcardTopics implements KafkaSpoutStreams { - private KafkaSpoutStream kafkaSpoutStream; - - public KafkaSpoutStreamsWildcardTopics(KafkaSpoutStream kafkaSpoutStream) { - this.kafkaSpoutStream = kafkaSpoutStream; - if (kafkaSpoutStream.getTopicWildcardPattern() == null) { - throw new IllegalStateException("KafkaSpoutStream must be configured for wildcard topic"); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - kafkaSpoutStream.declareOutputFields(declarer); - } - - @Override - public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { - kafkaSpoutStream.emit(collector, tuple, messageId); - } - - @Override - public Fields getOutputFields() { - return kafkaSpoutStream.getOutputFields(); - } - - public KafkaSpoutStream getStream() { - return kafkaSpoutStream; - } - - public Pattern getTopicWildcardPattern() { - return kafkaSpoutStream.getTopicWildcardPattern(); - } - - @Override - public String toString() { - return "KafkaSpoutStreamsWildcardTopics{" + - "kafkaSpoutStream=" + kafkaSpoutStream + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java deleted file mode 100644 index 3bb71a8..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -/** - * Implementations of {@link KafkaSpoutTupleBuilder} contain the logic to build tuples from {@link ConsumerRecord}s. - * Users must subclass this abstract class to provide their implementation. See also {@link KafkaSpoutTuplesBuilder} - */ -public abstract class KafkaSpoutTupleBuilder<K,V> implements Serializable { - private List<String> topics; - - /** - * @param topics list of topics that use this implementation to build tuples - */ - public KafkaSpoutTupleBuilder(String... topics) { - if (topics == null || topics.length == 0) { - throw new IllegalArgumentException("Must specify at least one topic. It cannot be null or empty"); - } - this.topics = Arrays.asList(topics); - } - - /** - * @return list of topics that use this implementation to build tuples - */ - public List<String> getTopics() { - return Collections.unmodifiableList(topics); - } - - /** - * Builds a list of tuples using the ConsumerRecord specified as parameter - * @param consumerRecord whose contents are used to build tuples - * @return list of tuples - */ - public abstract List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord); -} http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java deleted file mode 100644 index 2ba0a79..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.io.Serializable; -import java.util.List; - -/** - * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from {@link ConsumerRecord}s. - * The logic is provided by the user by implementing the appropriate number of {@link KafkaSpoutTupleBuilder} instances - */ -public interface KafkaSpoutTuplesBuilder<K,V> extends Serializable { - List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord); -} http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java deleted file mode 100644 index 80fe543..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class KafkaSpoutTuplesBuilderNamedTopics<K,V> implements KafkaSpoutTuplesBuilder<K,V> { - private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilderNamedTopics.class); - - private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders; - - private KafkaSpoutTuplesBuilderNamedTopics(Builder<K,V> builder) { - this.topicToTupleBuilders = builder.topicToTupleBuilders; - LOG.debug("Instantiated {}", this); - } - - public static class Builder<K,V> { - private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders; - private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders; - - @SafeVarargs - public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) { - if (tupleBuilders == null || tupleBuilders.length == 0) { - throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); - } - - this.tupleBuilders = Arrays.asList(tupleBuilders); - topicToTupleBuilders = new HashMap<>(); - } - - public KafkaSpoutTuplesBuilderNamedTopics<K,V> build() { - for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) { - for (String topic : tupleBuilder.getTopics()) { - if (!topicToTupleBuilders.containsKey(topic)) { - topicToTupleBuilders.put(topic, tupleBuilder); - } - } - } - return new KafkaSpoutTuplesBuilderNamedTopics<>(this); - } - } - - public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) { - final String topic = consumerRecord.topic(); - return topicToTupleBuilders.get(topic).buildTuple(consumerRecord); - } - - @Override - public String toString() { - return "KafkaSpoutTuplesBuilderNamedTopics {" + - "topicToTupleBuilders=" + topicToTupleBuilders + - '}'; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java deleted file mode 100644 index 85d4809..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.List; - -public class KafkaSpoutTuplesBuilderWildcardTopics<K,V> implements KafkaSpoutTuplesBuilder<K,V> { - private KafkaSpoutTupleBuilder<K, V> tupleBuilder; - - public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder<K, V> tupleBuilder) { - this.tupleBuilder = tupleBuilder; - } - - @Override - public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { - return tupleBuilder.buildTuple(consumerRecord); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java new file mode 100644 index 0000000..f5953ad --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import org.apache.storm.tuple.Values; + +/** + * A list of Values in a tuple that can be routed + * to a given stream. {@see org.apache.storm.kafka.spout.RecordTranslator#apply} + */ +public class KafkaTuple extends Values { + private static final long serialVersionUID = 4803794470450587992L; + private String stream = null; + + public KafkaTuple() { + super(); + } + + public KafkaTuple(Object... vals) { + super(vals); + } + + public KafkaTuple routedTo(String stream) { + assert(this.stream == null); + this.stream = stream; + return this; + } + + public String getStream() { + return stream; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java new file mode 100644 index 0000000..df3e800 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +public class ManualPartitionNamedSubscription extends NamedSubscription { + private static final long serialVersionUID = 5633018073527583826L; + private final ManualPartitioner partitioner; + private Set<TopicPartition> currentAssignment = null; + private KafkaConsumer<?, ?> consumer = null; + private ConsumerRebalanceListener listener = null; + private TopologyContext context = null; + + public ManualPartitionNamedSubscription(ManualPartitioner parter, Collection<String> topics) { + super(topics); + this.partitioner = parter; + } + + public ManualPartitionNamedSubscription(ManualPartitioner parter, String ... topics) { + this(parter, Arrays.asList(topics)); + } + + @Override + public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) { + this.consumer = consumer; + this.listener = listener; + this.context = context; + refreshAssignment(); + } + + @Override + public void refreshAssignment() { + List<TopicPartition> allPartitions = new ArrayList<>(); + for (String topic : topics) { + for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + } + Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); + Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); + if (!newAssignment.equals(currentAssignment)) { + if (currentAssignment != null) { + listener.onPartitionsRevoked(currentAssignment); + listener.onPartitionsAssigned(newAssignment); + } + currentAssignment = newAssignment; + consumer.assign(currentAssignment); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/c9f9348e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java new file mode 100644 index 0000000..cf4dfcb --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +public class ManualPartitionPatternSubscription extends PatternSubscription { + private static final long serialVersionUID = 5633018073527583826L; + private final ManualPartitioner parter; + private Set<TopicPartition> currentAssignment = null; + private KafkaConsumer<?, ?> consumer = null; + private ConsumerRebalanceListener listener = null; + private TopologyContext context = null; + + public ManualPartitionPatternSubscription(ManualPartitioner parter, Pattern pattern) { + super(pattern); + this.parter = parter; + } + + @Override + public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) { + this.consumer = consumer; + this.listener = listener; + this.context = context; + refreshAssignment(); + } + + @Override + public void refreshAssignment() { + List<TopicPartition> allPartitions = new ArrayList<>(); + for(Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet()) { + if (pattern.matcher(entry.getKey()).matches()) { + for (PartitionInfo partitionInfo: entry.getValue()) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + } + } + Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); + Set<TopicPartition> newAssignment = new HashSet<>(parter.partition(allPartitions, context)); + if (!newAssignment.equals(currentAssignment)) { + if (currentAssignment != null) { + listener.onPartitionsRevoked(currentAssignment); + listener.onPartitionsAssigned(newAssignment); + } + currentAssignment = newAssignment; + consumer.assign(currentAssignment); + } + } +} \ No newline at end of file