http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 7c97ac9..7aa836c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -18,232 +18,810 @@ package org.apache.storm.kafka.spout; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; - import java.io.Serializable; -import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.storm.Config; +import org.apache.storm.annotation.InterfaceStability; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics */ public class KafkaSpoutConfig<K, V> implements Serializable { - public static final long DEFAULT_POLL_TIMEOUT_MS = 200; // 200ms - public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; // 30s - public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever - public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 10,000,000 records => 80MBs of memory footprint in the worst case - - // Kafka property names - public interface Consumer { - String GROUP_ID = "group.id"; - String BOOTSTRAP_SERVERS = "bootstrap.servers"; - String ENABLE_AUTO_COMMIT = "enable.auto.commit"; - String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; - String KEY_DESERIALIZER = "key.deserializer"; - String VALUE_DESERIALIZER = "value.deserializer"; - } - /** - * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will - * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/> - * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/> - * <ul> - * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li> - * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li> - * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. - * If no offset has been committed, it behaves as EARLIEST.</li> - * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. - * If no offset has been committed, it behaves as LATEST.</li> - * </ul> - * */ - public enum FirstPollOffsetStrategy { - EARLIEST, - LATEST, - UNCOMMITTED_EARLIEST, - UNCOMMITTED_LATEST } + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class); + private static final long serialVersionUID = 141902646130682494L; + // 200ms + public static final long DEFAULT_POLL_TIMEOUT_MS = 200; + // 30s + public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; + // Retry forever + public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; + // 10,000,000 records => 80MBs of memory footprint in the worst case + public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; + // 2s + public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; + + public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; + + public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), + DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)); + + public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = ProcessingGuarantee.AT_LEAST_ONCE; + + public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener(); + + public static final int DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS = 60; + // Kafka consumer configuration private final Map<String, Object> kafkaProps; - private final Deserializer<K> keyDeserializer; - private final Deserializer<V> valueDeserializer; + private final Subscription subscription; private final long pollTimeoutMs; // Kafka spout configuration + private final RecordTranslator<K, V> translator; private final long offsetCommitPeriodMs; - private final int maxRetries; private final int maxUncommittedOffsets; private final FirstPollOffsetStrategy firstPollOffsetStrategy; - private final KafkaSpoutStreams kafkaSpoutStreams; - private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; private final KafkaSpoutRetryService retryService; + private final KafkaTupleListener tupleListener; + private final long partitionRefreshPeriodMs; + private final boolean emitNullTuples; + private final SerializableDeserializer<K> keyDes; + private final Class<? extends Deserializer<K>> keyDesClazz; + private final SerializableDeserializer<V> valueDes; + private final Class<? extends Deserializer<V>> valueDesClazz; + private final ProcessingGuarantee processingGuarantee; + private final boolean tupleTrackingEnforced; + private final int metricsTimeBucketSizeInSecs; - private KafkaSpoutConfig(Builder<K,V> builder) { - this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); - this.keyDeserializer = builder.keyDeserializer; - this.valueDeserializer = builder.valueDeserializer; + /** + * Creates a new KafkaSpoutConfig using a Builder. + * + * @param builder The Builder to construct the KafkaSpoutConfig from + */ + public KafkaSpoutConfig(Builder<K, V> builder) { + setKafkaPropsForProcessingGuarantee(builder); + this.kafkaProps = builder.kafkaProps; + this.subscription = builder.subscription; + this.translator = builder.translator; this.pollTimeoutMs = builder.pollTimeoutMs; this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; - this.maxRetries = builder.maxRetries; this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; - this.kafkaSpoutStreams = builder.kafkaSpoutStreams; this.maxUncommittedOffsets = builder.maxUncommittedOffsets; - this.tuplesBuilder = builder.tuplesBuilder; this.retryService = builder.retryService; + this.tupleListener = builder.tupleListener; + this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; + this.emitNullTuples = builder.emitNullTuples; + this.keyDes = builder.keyDes; + this.keyDesClazz = builder.keyDesClazz; + this.valueDes = builder.valueDes; + this.valueDesClazz = builder.valueDesClazz; + this.processingGuarantee = builder.processingGuarantee; + this.tupleTrackingEnforced = builder.tupleTrackingEnforced; + this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs; } - private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) { - // set defaults for properties not specified - if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) { - kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false"); + /** + * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment. + * By default this parameter is set to UNCOMMITTED_EARLIEST. + */ + public enum FirstPollOffsetStrategy { + /** + * The kafka spout polls records starting in the first offset of the partition, regardless of previous commits. This setting only + * takes effect on topology deployment + */ + EARLIEST, + /** + * The kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits. This + * setting only takes effect on topology deployment + */ + LATEST, + /** + * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST + */ + UNCOMMITTED_EARLIEST, + /** + * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST + */ + UNCOMMITTED_LATEST; + + @Override + public String toString() { + return "FirstPollOffsetStrategy{" + super.toString() + "}"; } - return kafkaProps; } - public static class Builder<K,V> { + /** + * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed, + * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE. + * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval. + * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep. + */ + @InterfaceStability.Unstable + public enum ProcessingGuarantee { + /** + * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or + * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined + * interval. + */ + AT_LEAST_ONCE, + /** + * Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream + * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by + * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done + */ + AT_MOST_ONCE, + /** + * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may + * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the + * spout to control when commits occur. Commits asynchronously on the defined interval. + */ + NO_GUARANTEE, + } + + public static class Builder<K, V> { + private final Map<String, Object> kafkaProps; - private Deserializer<K> keyDeserializer; - private Deserializer<V> valueDeserializer; + private final Subscription subscription; + private final SerializableDeserializer<K> keyDes; + private final Class<? extends Deserializer<K>> keyDesClazz; + private final SerializableDeserializer<V> valueDes; + private final Class<? extends Deserializer<V>> valueDesClazz; + private RecordTranslator<K, V> translator; private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; - private int maxRetries = DEFAULT_MAX_RETRIES; - private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; - private final KafkaSpoutStreams kafkaSpoutStreams; + private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY; private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; - private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; - private final KafkaSpoutRetryService retryService; - - /** - * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/> - * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/> - * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), - * DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))} - */ - public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams, - KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) { - this(kafkaProps, kafkaSpoutStreams, tuplesBuilder, - new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), - DEFAULT_MAX_RETRIES, TimeInterval.seconds(10))); - } - - /*** - * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics - * The optional configuration can be specified using the set methods of this builder - * @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a> - * @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream. - * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s. - * @param retryService logic that manages the retrial of failed tuples - */ - public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams, - KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) { - if (kafkaProps == null || kafkaProps.isEmpty()) { - throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps); - } + private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE; + private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER; + private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; + private boolean emitNullTuples = false; + private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE; + private boolean tupleTrackingEnforced = false; + private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS; + + public Builder(String bootstrapServers, String... topics) { + this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); + } - if (kafkaSpoutStreams == null) { - throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream"); - } + public Builder(String bootstrapServers, Collection<String> topics) { + this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), + new NamedTopicFilter(new HashSet<String>(topics)))); + } - if (tuplesBuilder == null) { - throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); - } + public Builder(String bootstrapServers, Pattern topics) { + this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); + } - if (retryService == null) { - throw new IllegalArgumentException("Must specify at implementation of retry service"); + /** + * @deprecated Please use {@link #Builder(java.lang.String, java.lang.String...)} instead, and set the deserializer with + * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and + * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + */ + @Deprecated + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String... topics) { + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); + } + + /** + * @deprecated Please use {@link #Builder(java.lang.String, java.util.Collection) } instead, and set the deserializer with + * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and + * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + */ + @Deprecated + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) { + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics)))); + } + + /** + * @deprecated Please use {@link #Builder(java.lang.String, java.util.regex.Pattern) } instead, and set the deserializer with + * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and + * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + */ + @Deprecated + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) { + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); + } + + /** + * @deprecated Please use {@link #Builder(java.lang.String, org.apache.storm.kafka.spout.Subscription) } instead, and set the + * deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and + * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + */ + @Deprecated + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) { + this(bootstrapServers, keyDes, null, valDes, null, subscription); + } + + /** + * @deprecated Please use {@link #Builder(java.lang.String, java.lang.String...)} instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + */ + @Deprecated + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String... topics) { + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics))); + } + + /** + * @deprecated Please use {@link #Builder(java.lang.String, java.util.Collection) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + */ + @Deprecated + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) { + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics)))); + } + + /** + * @deprecated Please use {@link #Builder(java.lang.String, java.util.regex.Pattern) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + */ + @Deprecated + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) { + this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics))); + } + + /** + * @deprecated Please use {@link #Builder(java.lang.String, org.apache.storm.kafka.spout.Subscription) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} + */ + @Deprecated + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) { + this(bootstrapServers, null, keyDes, null, valDes, subscription); + } + + /** + * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers. + * + * @param bootstrapServers The bootstrap servers the consumer will use + * @param subscription The subscription defining which topics and partitions each spout instance will read. + */ + public Builder(String bootstrapServers, Subscription subscription) { + this(bootstrapServers, null, null, null, null, subscription); + } + + private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) { + + this(keyDes, keyDesClazz, valDes, valDesClazz, subscription, + new DefaultRecordTranslator<K, V>(), new HashMap<String, Object>()); + + if (bootstrapServers == null || bootstrapServers.isEmpty()) { + throw new IllegalArgumentException("bootstrap servers cannot be null"); } + kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, valueDesClazz); + } + /** + * This constructor will always be called by one of the methods {@code setKey} or {@code setVal}, which implies + * that only one of its SerDe parameters will be non null, for which the corresponding Kafka property will be set + */ + @SuppressWarnings("unchecked") + private Builder(final Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) { + + this(keyDes, keyDesClazz, valueDes, valueDesClazz, builder.subscription, + (RecordTranslator<K, V>) builder.translator, new HashMap<>(builder.kafkaProps)); + + this.pollTimeoutMs = builder.pollTimeoutMs; + this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; + this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; + this.maxUncommittedOffsets = builder.maxUncommittedOffsets; + this.retryService = builder.retryService; + + setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, valueDesClazz); + } + + private Builder(SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz, + Subscription subscription, RecordTranslator<K, V> translator, Map<String, Object> kafkaProps) { + this.keyDes = keyDes; + this.keyDesClazz = keyDesClazz; + this.valueDes = valueDes; + this.valueDesClazz = valueDesClazz; + this.subscription = subscription; + this.translator = translator; this.kafkaProps = kafkaProps; - this.kafkaSpoutStreams = kafkaSpoutStreams; - this.tuplesBuilder = tuplesBuilder; - this.retryService = retryService; + } + + private void setNonNullSerDeKafkaProp(SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) { + if (keyDesClazz != null) { + kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz); + } + if (keyDes != null) { + kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass()); + } + if (valueDesClazz != null) { + kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz); + } + if (valueDes != null) { + kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDes.getClass()); + } + } + + /** + * Specifying this key deserializer overrides the property key.deserializer. If you have set a custom RecordTranslator before + * calling this it may result in class cast exceptions at runtime. + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead + */ + @Deprecated + public <NK> Builder<NK, V> setKey(SerializableDeserializer<NK> keyDeserializer) { + return new Builder<>(this, keyDeserializer, null, null, null); } /** - * Specifying this key deserializer overrides the property key.deserializer + * Specify a class that can be instantiated to create a key.deserializer This is the same as setting key.deserializer, but overrides + * it. If you have set a custom RecordTranslator before calling this it may result in class cast exceptions at runtime. + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead */ - public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) { - this.keyDeserializer = keyDeserializer; + @Deprecated + public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) { + return new Builder<>(this, null, clazz, null, null); + } + + /** + * Specifying this value deserializer overrides the property value.deserializer. If you have set a custom RecordTranslator before + * calling this it may result in class cast exceptions at runtime. + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead + */ + @Deprecated + public <NV> Builder<K, NV> setValue(SerializableDeserializer<NV> valueDeserializer) { + return new Builder<>(this, null, null, valueDeserializer, null); + } + + /** + * Specify a class that can be instantiated to create a value.deserializer This is the same as setting value.deserializer, but + * overrides it. If you have set a custom RecordTranslator before calling this it may result in class cast exceptions at runtime. + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead + */ + @Deprecated + public <NV> Builder<K, NV> setValue(Class<? extends Deserializer<NV>> clazz) { + return new Builder<>(this, null, null, null, clazz); + } + + /** + * Set a {@link KafkaConsumer} property. + */ + public Builder<K, V> setProp(String key, Object value) { + kafkaProps.put(key, value); return this; } /** - * Specifying this value deserializer overrides the property value.deserializer + * Set multiple {@link KafkaConsumer} properties. */ - public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) { - this.valueDeserializer = valueDeserializer; + public Builder<K, V> setProp(Map<String, Object> props) { + kafkaProps.putAll(props); return this; } /** - * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s - * @param pollTimeoutMs time in ms + * Set multiple {@link KafkaConsumer} properties. */ - public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) { - this.pollTimeoutMs = pollTimeoutMs; + public Builder<K, V> setProp(Properties props) { + for(Entry<Object, Object> entry : props.entrySet()) { + if (entry.getKey() instanceof String) { + kafkaProps.put((String) entry.getKey(), entry.getValue()); + } else { + throw new IllegalArgumentException("Kafka Consumer property keys must be Strings"); + } + } return this; } /** - * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s. - * @param offsetCommitPeriodMs time in ms + * Set the group.id for the consumers + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#GROUP_ID_CONFIG} instead */ - public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) { - this.offsetCommitPeriodMs = offsetCommitPeriodMs; + @Deprecated + public Builder<K, V> setGroupId(String id) { + return setProp("group.id", id); + } + + /** + * reset the bootstrap servers for the Consumer + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} instead + */ + @Deprecated + public Builder<K, V> setBootstrapServers(String servers) { + return setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + } + + /** + * The minimum amount of data the broker should return for a fetch request. + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} instead + */ + @Deprecated + public Builder<K, V> setFetchMinBytes(int bytes) { + return setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, bytes); + } + + /** + * The maximum amount of data per-partition the broker will return. + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#MAX_PARTITION_FETCH_BYTES_CONFIG} instead + */ + @Deprecated + public Builder<K, V> setMaxPartitionFectchBytes(int bytes) { + return setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, bytes); + } + + /** + * The maximum number of records a poll will return. + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG} instead + */ + @Deprecated + public Builder<K, V> setMaxPollRecords(int records) { + return setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records); + } + + //Security Related Configs + /** + * Configure the SSL Keystore for mutual authentication + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "ssl.keystore.location" and "ssl.keystore.password" instead + */ + @Deprecated + public Builder<K, V> setSSLKeystore(String location, String password) { + return setProp("ssl.keystore.location", location) + .setProp("ssl.keystore.password", password); + } + + /** + * Configure the SSL Keystore for mutual authentication + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "ssl.keystore.location", "ssl.keystore.password" and "ssl.key.password" instead + */ + @Deprecated + public Builder<K, V> setSSLKeystore(String location, String password, String keyPassword) { + return setProp("ssl.key.password", keyPassword) + .setSSLKeystore(location, password); + } + + /** + * Configure the SSL Truststore to authenticate with the brokers + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "security.protocol", "ssl.truststore.location" and "ssl.truststore.password" instead + */ + @Deprecated + public Builder<K, V> setSSLTruststore(String location, String password) { + return setSecurityProtocol("SSL") + .setProp("ssl.truststore.location", location) + .setProp("ssl.truststore.password", password); + } + + /** + * Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. + * + * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "security.protocol" instead + */ + @Deprecated + public Builder<K, V> setSecurityProtocol(String protocol) { + return setProp("security.protocol", protocol); + } + + //Spout Settings + /** + * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s. + * + * @param pollTimeoutMs time in ms + */ + public Builder<K, V> setPollTimeoutMs(long pollTimeoutMs) { + this.pollTimeoutMs = pollTimeoutMs; return this; } /** - * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that - * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of - * all the previously polled records. - * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous - * polled records in favor of processing more records. - * @param maxRetries max number of retrials + * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s. + * + * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE} or + * {@link ProcessingGuarantee#NO_GUARANTEE}. + * + * @param offsetCommitPeriodMs time in ms */ - public Builder<K,V> setMaxRetries(int maxRetries) { - this.maxRetries = maxRetries; + public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) { + this.offsetCommitPeriodMs = offsetCommitPeriodMs; return this; } /** * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number - * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. + * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. + * This limit is per partition and may in some cases be exceeded, + * but each partition cannot exceed this limit by more than maxPollRecords - 1. + * + * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}. + * * @param maxUncommittedOffsets max number of records that can be be pending commit */ - public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) { + public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) { this.maxUncommittedOffsets = maxUncommittedOffsets; return this; } /** - * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. - * Please refer to to the documentation in {@link FirstPollOffsetStrategy} + * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the + * documentation in {@link FirstPollOffsetStrategy} + * * @param firstPollOffsetStrategy Offset used by Kafka spout first poll - * */ + */ public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) { this.firstPollOffsetStrategy = firstPollOffsetStrategy; return this; } - public KafkaSpoutConfig<K,V> build() { + /** + * Sets the retry service for the spout to use. + * + * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}. + * + * @param retryService the new retry service + * @return the builder (this). + */ + public Builder<K, V> setRetry(KafkaSpoutRetryService retryService) { + if (retryService == null) { + throw new NullPointerException("retryService cannot be null"); + } + this.retryService = retryService; + return this; + } + + /** + * Sets the tuple listener for the spout to use. + * + * @param tupleListener the tuple listener + * @return the builder (this). + */ + public Builder<K, V> setTupleListener(KafkaTupleListener tupleListener) { + if (tupleListener == null) { + throw new NullPointerException("KafkaTupleListener cannot be null"); + } + this.tupleListener = tupleListener; + return this; + } + + public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> translator) { + this.translator = translator; + return this; + } + + /** + * Configure a translator with tuples to be emitted on the default stream. + * + * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted + * @param fields the names of the fields extracted + * @return this to be able to chain configuration + */ + public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) { + return setRecordTranslator(new SimpleRecordTranslator<>(func, fields)); + } + + /** + * Configure a translator with tuples to be emitted to a given stream. + * + * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted + * @param fields the names of the fields extracted + * @param stream the stream to emit the tuples on + * @return this to be able to chain configuration + */ + public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) { + return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream)); + } + + /** + * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new + * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and + * PatternSubscription rely on kafka to handle this instead. + * + * @param partitionRefreshPeriodMs time in milliseconds + * @return the builder (this) + */ + public Builder<K, V> setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) { + this.partitionRefreshPeriodMs = partitionRefreshPeriodMs; + return this; + } + + /** + * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default + * this parameter is set to false, which means that null tuples are not emitted. + * + * @param emitNullTuples sets if null tuples should or not be emitted downstream + */ + public Builder<K, V> setEmitNullTuples(boolean emitNullTuples) { + this.emitNullTuples = emitNullTuples; + return this; + } + + /** + * Specifies which processing guarantee the spout should offer. Refer to the documentation for {@link ProcessingGuarantee}. + * + * @param processingGuarantee The processing guarantee the spout should offer. + */ + public Builder<K, V> setProcessingGuarantee(ProcessingGuarantee processingGuarantee) { + this.processingGuarantee = processingGuarantee; + return this; + } + + /** + * Specifies whether the spout should require Storm to track emitted tuples when using a {@link ProcessingGuarantee} other than + * {@link ProcessingGuarantee#AT_LEAST_ONCE}. The spout will always track emitted tuples when offering at-least-once guarantees + * regardless of this setting. This setting is false by default. + * + * <p>Enabling tracking can be useful even in cases where reliability is not a concern, because it allows + * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} to have an effect, and enables some spout metrics (e.g. complete-latency) that would + * otherwise be disabled. + * + * @param tupleTrackingEnforced true if Storm should track emitted tuples, false otherwise + */ + public Builder<K, V> setTupleTrackingEnforced(boolean tupleTrackingEnforced) { + this.tupleTrackingEnforced = tupleTrackingEnforced; + return this; + } + + /** + * The time period that metrics data in bucketed into. + * @param metricsTimeBucketSizeInSecs time in seconds + */ + public Builder<K, V> setMetricsTimeBucketSizeInSecs(int metricsTimeBucketSizeInSecs) { + this.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs; + return this; + } + + public KafkaSpoutConfig<K, V> build() { return new KafkaSpoutConfig<>(this); } } + /** + * Factory method that creates a Builder with String key/value deserializers. + * + * @param bootstrapServers The bootstrap servers for the consumer + * @param topics The topics to subscribe to + * @return The new builder + */ + public static Builder<String, String> builder(String bootstrapServers, String... topics) { + return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics)); + } + + /** + * Factory method that creates a Builder with String key/value deserializers. + * + * @param bootstrapServers The bootstrap servers for the consumer + * @param topics The topics to subscribe to + * @return The new builder + */ + public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) { + return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics)); + } + + /** + * Factory method that creates a Builder with String key/value deserializers. + * + * @param bootstrapServers The bootstrap servers for the consumer + * @param topics The topic pattern to subscribe to + * @return The new builder + */ + public static Builder<String, String> builder(String bootstrapServers, Pattern topics) { + return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics)); + } + + private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) { + builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return builder; + } + + private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) { + if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { + LOG.warn("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee." + + "This will be treated as an error in the next major release."); + + final boolean enableAutoCommit = Boolean.parseBoolean(builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString()); + if (enableAutoCommit) { + builder.processingGuarantee = ProcessingGuarantee.NO_GUARANTEE; + } else { + builder.processingGuarantee = ProcessingGuarantee.AT_LEAST_ONCE; + } + } + String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) { + if (autoOffsetResetPolicy == null) { + /* + * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured + * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range + * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer + * requests an offset that was deleted. + */ + LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) { + LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'." + + " Some messages may be skipped."); + } + } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) { + if (autoOffsetResetPolicy != null + && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) { + LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'." + + " Some messages may be processed more than once."); + } + } + LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + } + + /** + * Gets the properties that will be passed to the KafkaConsumer. + * + * @return The Kafka properties map + */ public Map<String, Object> getKafkaProps() { return kafkaProps; } + /** + * @deprecated Please use {@link #getKafkaProps() } and look up the entry for {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead + */ + @Deprecated public Deserializer<K> getKeyDeserializer() { - return keyDeserializer; + if (keyDesClazz != null) { + try { + return keyDesClazz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Could not instantiate key deserializer " + keyDesClazz); + } + } + return keyDes; } + /** + * @deprecated Please use {@link #getKafkaProps() } and look up the entry for {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead + */ + @Deprecated public Deserializer<V> getValueDeserializer() { - return valueDeserializer; + if (valueDesClazz != null) { + try { + return valueDesClazz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Could not instantiate value deserializer " + valueDesClazz); + } + } + return valueDes; + } + + public Subscription getSubscription() { + return subscription; + } + + public RecordTranslator<K, V> getTranslator() { + return translator; } public long getPollTimeoutMs() { @@ -254,75 +832,71 @@ public class KafkaSpoutConfig<K, V> implements Serializable { return offsetCommitPeriodMs; } + /** + * @deprecated Use {@link #getProcessingGuarantee()} instead. + */ + @Deprecated public boolean isConsumerAutoCommitMode() { - return kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT) == null // default is true - || Boolean.valueOf((String)kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT)); + return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false + || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + } + + public ProcessingGuarantee getProcessingGuarantee() { + return processingGuarantee; } - public String getConsumerGroupId() { - return (String) kafkaProps.get(Consumer.GROUP_ID); + public boolean isTupleTrackingEnforced() { + return tupleTrackingEnforced; } - /** - * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}, - * or null if this stream is associated with a wildcard pattern topic - */ - public List<String> getSubscribedTopics() { - return kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics ? - new ArrayList<>(((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics()) : - null; + public String getConsumerGroupId() { + return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG); } - /** - * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null - * if this stream is associated with a specific named topic - */ - public Pattern getTopicWildcardPattern() { - return kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics ? - ((KafkaSpoutStreamsWildcardTopics)kafkaSpoutStreams).getTopicWildcardPattern() : - null; + public FirstPollOffsetStrategy getFirstPollOffsetStrategy() { + return firstPollOffsetStrategy; } - public int getMaxTupleRetries() { - return maxRetries; + public int getMaxUncommittedOffsets() { + return maxUncommittedOffsets; } - public FirstPollOffsetStrategy getFirstPollOffsetStrategy() { - return firstPollOffsetStrategy; + public KafkaSpoutRetryService getRetryService() { + return retryService; } - public KafkaSpoutStreams getKafkaSpoutStreams() { - return kafkaSpoutStreams; + public KafkaTupleListener getTupleListener() { + return tupleListener; } - public int getMaxUncommittedOffsets() { - return maxUncommittedOffsets; + public long getPartitionRefreshPeriodMs() { + return partitionRefreshPeriodMs; } - public KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() { - return tuplesBuilder; + public boolean isEmitNullTuples() { + return emitNullTuples; } - public KafkaSpoutRetryService getRetryService() { - return retryService; + public int getMetricsTimeBucketSizeInSecs() { + return metricsTimeBucketSizeInSecs; } @Override public String toString() { - return "KafkaSpoutConfig{" + - "kafkaProps=" + kafkaProps + - ", keyDeserializer=" + keyDeserializer + - ", valueDeserializer=" + valueDeserializer + - ", pollTimeoutMs=" + pollTimeoutMs + - ", offsetCommitPeriodMs=" + offsetCommitPeriodMs + - ", maxRetries=" + maxRetries + - ", maxUncommittedOffsets=" + maxUncommittedOffsets + - ", firstPollOffsetStrategy=" + firstPollOffsetStrategy + - ", kafkaSpoutStreams=" + kafkaSpoutStreams + - ", tuplesBuilder=" + tuplesBuilder + - ", retryService=" + retryService + - ", topics=" + getSubscribedTopics() + - ", topicWildcardPattern=" + getTopicWildcardPattern() + - '}'; + return "KafkaSpoutConfig{" + + "kafkaProps=" + kafkaProps + + ", key=" + getKeyDeserializer() + + ", value=" + getValueDeserializer() + + ", pollTimeoutMs=" + pollTimeoutMs + + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs + + ", maxUncommittedOffsets=" + maxUncommittedOffsets + + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy + + ", subscription=" + subscription + + ", translator=" + translator + + ", retryService=" + retryService + + ", tupleListener=" + tupleListener + + ", processingGuarantee=" + processingGuarantee + + ", metricsTimeBucketSizeInSecs=" + metricsTimeBucketSizeInSecs + + '}'; } }
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java index 71f8327..1626fee 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java @@ -18,21 +18,42 @@ package org.apache.storm.kafka.spout; +import java.io.Serializable; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; -public class KafkaSpoutMessageId { - private transient TopicPartition topicPart; - private transient long offset; - private transient int numFails = 0; +public class KafkaSpoutMessageId implements Serializable { + private final TopicPartition topicPart; + private final long offset; + private int numFails = 0; + /** + * true if the record was emitted using a form of collector.emit(...). false + * when skipping null tuples as configured by the user in KafkaSpoutConfig + */ + private boolean emitted; - public KafkaSpoutMessageId(ConsumerRecord consumerRecord) { - this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset()); + public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) { + this(consumerRecord, true); + } + + public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, boolean emitted) { + this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), emitted); } public KafkaSpoutMessageId(TopicPartition topicPart, long offset) { + this(topicPart, offset, true); + } + + /** + * Creates a new KafkaSpoutMessageId. + * @param topicPart The topic partition this message belongs to + * @param offset The offset of this message + * @param emitted True iff this message is not being skipped as a null tuple + */ + public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean emitted) { this.topicPart = topicPart; this.offset = offset; + this.emitted = emitted; } public int partition() { @@ -59,22 +80,22 @@ public class KafkaSpoutMessageId { return topicPart; } - public String getMetadata(Thread currThread) { - return "{" + - "topic-partition=" + topicPart + - ", offset=" + offset + - ", numFails=" + numFails + - ", thread='" + currThread.getName() + "'" + - '}'; + public boolean isEmitted() { + return emitted; + } + + public void setEmitted(boolean emitted) { + this.emitted = emitted; } @Override public String toString() { - return "{" + - "topic-partition=" + topicPart + - ", offset=" + offset + - ", numFails=" + numFails + - '}'; + return "{" + + "topic-partition=" + topicPart + + ", offset=" + offset + + ", numFails=" + numFails + + ", emitted=" + emitted + + '}'; } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java index f59367d..68a6f3f 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java @@ -25,49 +25,63 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.Validate; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.utils.Time; /** * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows: - * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ... + * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1) where failCount = 1, 2, 3, ... * nextRetry = Min(nextRetry, currentTime + maxDelay) */ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService { private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class); private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator(); - private TimeInterval initialDelay; - private TimeInterval delayPeriod; - private TimeInterval maxDelay; - private int maxRetries; + private final TimeInterval initialDelay; + private final TimeInterval delayPeriod; + private final TimeInterval maxDelay; + private final int maxRetries; - private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR); - private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups + //This class assumes that there is at most one retry schedule per message id in this set at a time. + private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR); + private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups /** * Comparator ordering by timestamp */ private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> { + @Override public int compare(RetrySchedule entry1, RetrySchedule entry2) { - return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos()); + int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos()); + + if(result == 0) { + //TreeSet uses compareTo instead of equals() for the Set contract + //Ensure that we can save two retry schedules with the same timestamp + result = entry1.hashCode() - entry2.hashCode(); + } + return result; } } private class RetrySchedule { - private KafkaSpoutMessageId msgId; + private final KafkaSpoutMessageId msgId; private long nextRetryTimeNanos; - public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) { + public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) { this.msgId = msgId; - this.nextRetryTimeNanos = nextRetryTime; + this.nextRetryTimeNanos = nextRetryTimeNanos; LOG.debug("Created {}", this); } - public void setNextRetryTime() { + public void setNextRetryTimeNanos() { nextRetryTimeNanos = nextTime(msgId); LOG.debug("Updated {}", this); } @@ -80,7 +94,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService public String toString() { return "RetrySchedule{" + "msgId=" + msgId + - ", nextRetryTime=" + nextRetryTimeNanos + + ", nextRetryTimeNanos=" + nextRetryTimeNanos + '}'; } @@ -94,20 +108,20 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService } public static class TimeInterval implements Serializable { - private long lengthNanos; - private long length; - private TimeUnit timeUnit; + private final long lengthNanos; + private final TimeUnit timeUnit; + private final long length; /** * @param length length of the time interval in the units specified by {@link TimeUnit} * @param timeUnit unit used to specify a time interval on which to specify a time unit */ public TimeInterval(long length, TimeUnit timeUnit) { - this.length = length; - this.timeUnit = timeUnit; this.lengthNanos = timeUnit.toNanos(length); + this.timeUnit = timeUnit; + this.length = length; } - + public static TimeInterval seconds(long length) { return new TimeInterval(length, TimeUnit.SECONDS); } @@ -115,19 +129,15 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService public static TimeInterval milliSeconds(long length) { return new TimeInterval(length, TimeUnit.MILLISECONDS); } - + public static TimeInterval microSeconds(long length) { return new TimeInterval(length, TimeUnit.MICROSECONDS); } - + public long lengthNanos() { return lengthNanos; } - - public long length() { - return length; - } - + public TimeUnit timeUnit() { return timeUnit; } @@ -144,7 +154,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService /** * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression): * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ... - * nextRetry = Min(nextRetry, currentTime + maxDelay) + * nextRetry = Min(nextRetry, currentTime + maxDelay). + * + * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous + * polled records in favor of processing more records. * * @param initialDelay initial delay of the first retry * @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression) @@ -157,35 +170,42 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService this.delayPeriod = delayPeriod; this.maxRetries = maxRetries; this.maxDelay = maxDelay; - LOG.debug("Instantiated {}", this); + LOG.debug("Instantiated {}", this.toStringImpl()); } @Override - public Set<TopicPartition> retriableTopicPartitions() { - final Set<TopicPartition> tps = new HashSet<>(); - final long currentTimeNanos = System.nanoTime(); + public Map<TopicPartition, Long> earliestRetriableOffsets() { + final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>(); + final long currentTimeNanos = Time.nanoTime(); for (RetrySchedule retrySchedule : retrySchedules) { if (retrySchedule.retry(currentTimeNanos)) { final KafkaSpoutMessageId msgId = retrySchedule.msgId; - tps.add(new TopicPartition(msgId.topic(), msgId.partition())); + final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition()); + final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage); + if(currentLowestOffset != null) { + tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset())); + } else { + tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset()); + } } else { break; // Stop searching as soon as passed current time } } - LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps); - return tps; + LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset); + return tpToEarliestRetriableOffset; } @Override public boolean isReady(KafkaSpoutMessageId msgId) { boolean retry = false; - if (toRetryMsgs.contains(msgId)) { - final long currentTimeNanos = System.nanoTime(); + if (isScheduled(msgId)) { + final long currentTimeNanos = Time.nanoTime(); for (RetrySchedule retrySchedule : retrySchedules) { if (retrySchedule.retry(currentTimeNanos)) { if (retrySchedule.msgId.equals(msgId)) { retry = true; LOG.debug("Found entry to retry {}", retrySchedule); + break; //Stop searching if the message is known to be ready for retry } } else { LOG.debug("Entry to retry not found {}", retrySchedule); @@ -204,14 +224,14 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService @Override public boolean remove(KafkaSpoutMessageId msgId) { boolean removed = false; - if (toRetryMsgs.contains(msgId)) { + if (isScheduled(msgId)) { + toRetryMsgs.remove(msgId); for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) { final RetrySchedule retrySchedule = iterator.next(); if (retrySchedule.msgId().equals(msgId)) { iterator.remove(); - toRetryMsgs.remove(msgId); removed = true; - break; + break; //There is at most one schedule per message id } } } @@ -239,38 +259,66 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService } @Override - public void schedule(KafkaSpoutMessageId msgId) { + public boolean schedule(KafkaSpoutMessageId msgId) { if (msgId.numFails() > maxRetries) { LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries); + return false; } else { - if (toRetryMsgs.contains(msgId)) { - for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) { - final RetrySchedule retrySchedule = iterator.next(); - if (retrySchedule.msgId().equals(msgId)) { - iterator.remove(); - toRetryMsgs.remove(msgId); - } - } - } + //Remove existing schedule for the message id + remove(msgId); final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId)); retrySchedules.add(retrySchedule); toRetryMsgs.add(msgId); LOG.debug("Scheduled. {}", retrySchedule); LOG.trace("Current state {}", retrySchedules); + return true; + } + } + + @Override + public int readyMessageCount() { + int count = 0; + final long currentTimeNanos = Time.nanoTime(); + for (RetrySchedule retrySchedule : retrySchedules) { + if (retrySchedule.retry(currentTimeNanos)) { + ++count; + } else { + break; //Stop counting when past current time + } } + return count; + } + + @Override + public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) { + KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); + if (isScheduled(msgId)) { + for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) { + if (originalMsgId.equals(msgId)) { + return originalMsgId; + } + } + } + return msgId; } // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE private long nextTime(KafkaSpoutMessageId msgId) { - final long currentTimeNanos = System.nanoTime(); + Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once"); + final long currentTimeNanos = Time.nanoTime(); final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ... ? currentTimeNanos + initialDelay.lengthNanos - : (currentTimeNanos + delayPeriod.timeUnit.toNanos((long) Math.pow(delayPeriod.length, msgId.numFails() - 1))); + : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1)); return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos); } @Override public String toString() { + return toStringImpl(); + } + + private String toStringImpl() { + //This is here to avoid an overridable call in the constructor return "KafkaSpoutRetryExponentialBackoff{" + "delay=" + initialDelay + ", ratio=" + delayPeriod + http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java index 5aab167..12d26da 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java @@ -18,25 +18,30 @@ package org.apache.storm.kafka.spout; -import org.apache.kafka.common.TopicPartition; import java.io.Serializable; import java.util.Collection; -import java.util.Set; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; /** * Represents the logic that manages the retrial of failed tuples. */ public interface KafkaSpoutRetryService extends Serializable { /** - * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled. + * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or + * updates retry time if it has already been scheduled. It may also indicate + * that the message should not be retried, in which case the message will not be scheduled. * @param msgId message to schedule for retrial + * @return true if the message will be retried, false otherwise */ - void schedule(KafkaSpoutMessageId msgId); + boolean schedule(KafkaSpoutMessageId msgId); /** * Removes a message from the list of messages scheduled for retrial * @param msgId message to remove from retrial + * @return true if the message was scheduled for retrial, false otherwise */ boolean remove(KafkaSpoutMessageId msgId); @@ -50,14 +55,16 @@ public interface KafkaSpoutRetryService extends Serializable { boolean retainAll(Collection<TopicPartition> topicPartitions); /** - * @return set of topic partitions that have offsets that are ready to be retried, i.e., - * for which a tuple has failed and has retry time less than current time + * @return The earliest retriable offset for each TopicPartition that has + * offsets ready to be retried, i.e. for which a tuple has failed + * and has retry time less than current time */ - Set<TopicPartition> retriableTopicPartitions(); + Map<TopicPartition, Long> earliestRetriableOffsets(); /** - * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried, + * Checks if a specific failed {@link KafkaSpoutMessageId} is ready to be retried, * i.e is scheduled and has retry time that is less than current time. + * @param msgId message to check for readiness * @return true if message is ready to be retried, false otherwise */ boolean isReady(KafkaSpoutMessageId msgId); @@ -65,8 +72,23 @@ public interface KafkaSpoutRetryService extends Serializable { /** * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried. * The message may or may not be ready to be retried yet. + * @param msgId message to check for scheduling status * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried. * Returns false is this message is not scheduled for retrial */ boolean isScheduled(KafkaSpoutMessageId msgId); + + /** + * @return The number of messages that are ready for retry + */ + int readyMessageCount(); + + /** + * Gets the {@link KafkaSpoutMessageId} for the given record. + * + * @param record The record to fetch the id for + * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for + * retry. + */ + KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record); } http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java deleted file mode 100644 index 5375f6c..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.List; -import java.util.regex.Pattern; - -/** - * Represents the stream and output fields used by a topic - */ -public class KafkaSpoutStream implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStream.class); - - private final Fields outputFields; - private final String streamId; - private final String topic; - private Pattern topicWildcardPattern; - - /** Represents the specified outputFields and topic with the default stream */ - public KafkaSpoutStream(Fields outputFields, String topic) { - this(outputFields, Utils.DEFAULT_STREAM_ID, topic); - } - - /** Represents the specified outputFields and topic with the specified stream */ - public KafkaSpoutStream(Fields outputFields, String streamId, String topic) { - if (outputFields == null || streamId == null || topic == null) { - throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " + - "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic)); - } - this.outputFields = outputFields; - this.streamId = streamId; - this.topic = topic; - this.topicWildcardPattern = null; - } - - /** Represents the specified outputFields and topic wild card with the default stream */ - KafkaSpoutStream(Fields outputFields, Pattern topicWildcardPattern) { - this(outputFields, Utils.DEFAULT_STREAM_ID, topicWildcardPattern); - } - - /** Represents the specified outputFields and topic wild card with the specified stream */ - public KafkaSpoutStream(Fields outputFields, String streamId, Pattern topicWildcardPattern) { - - if (outputFields == null || streamId == null || topicWildcardPattern == null) { - throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " + - "[outputFields=%s, streamId=%s, topicWildcardPattern=%s]", outputFields, streamId, topicWildcardPattern)); - } - this.outputFields = outputFields; - this.streamId = streamId; - this.topic = null; - this.topicWildcardPattern = topicWildcardPattern; - } - - public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { - collector.emit(streamId, tuple, messageId); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - LOG.info("Declared [streamId = {}], [outputFields = {}] for [topic = {}]", streamId, outputFields, topic); - declarer.declareStream(streamId, outputFields); - } - - - public Fields getOutputFields() { - return outputFields; - } - - public String getStreamId() { - return streamId; - } - - /** - * @return the topic associated with this {@link KafkaSpoutStream}, or null - * if this stream is associated with a wildcard pattern topic - */ - public String getTopic() { - return topic; - } - - /** - * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null - * if this stream is associated with a specific named topic - */ - public Pattern getTopicWildcardPattern() { - return topicWildcardPattern; - } - - @Override - public String toString() { - return "KafkaSpoutStream{" + - "outputFields=" + outputFields + - ", streamId='" + streamId + '\'' + - ", topic='" + topic + '\'' + - ", topicWildcardPattern=" + topicWildcardPattern + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java deleted file mode 100644 index 6910d3c..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; - -import java.io.Serializable; -import java.util.List; - -/** - * Represents the {@link KafkaSpoutStream} associated with each topic or topic pattern (wildcard), and provides - * a public API to declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. - */ -public interface KafkaSpoutStreams extends Serializable { - void declareOutputFields(OutputFieldsDeclarer declarer); - - void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId); -}