Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1808#discussion_r96886556 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -61,129 +66,244 @@ * If no offset has been committed, it behaves as LATEST.</li> * </ul> * */ - public enum FirstPollOffsetStrategy { + public static enum FirstPollOffsetStrategy { EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST } - - // Kafka consumer configuration - private final Map<String, Object> kafkaProps; - private final Deserializer<K> keyDeserializer; - private final Deserializer<V> valueDeserializer; - private final long pollTimeoutMs; - - // Kafka spout configuration - private final long offsetCommitPeriodMs; - private final int maxRetries; - private final int maxUncommittedOffsets; - private final long partitionRefreshPeriodMs; - private final boolean manualPartitionAssignment; - private final FirstPollOffsetStrategy firstPollOffsetStrategy; - private final KafkaSpoutStreams kafkaSpoutStreams; - private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; - private final KafkaSpoutRetryService retryService; - - private KafkaSpoutConfig(Builder<K,V> builder) { - this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps); - this.keyDeserializer = builder.keyDeserializer; - this.valueDeserializer = builder.valueDeserializer; - this.pollTimeoutMs = builder.pollTimeoutMs; - this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; - this.maxRetries = builder.maxRetries; - this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; - this.kafkaSpoutStreams = builder.kafkaSpoutStreams; - this.maxUncommittedOffsets = builder.maxUncommittedOffsets; - this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs; - this.manualPartitionAssignment = builder.manualPartitionAssignment; - this.tuplesBuilder = builder.tuplesBuilder; - this.retryService = builder.retryService; + + public static Builder<String, String> builder(String bootstrapServers, String ... topics) { + return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); } - - private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) { + + public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) { + return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); + } + + public static Builder<String, String> builder(String bootstrapServers, Pattern topics) { + return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); + } + + private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) { // set defaults for properties not specified - if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) { - kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false"); + if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { + kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); } return kafkaProps; } - + public static class Builder<K,V> { private final Map<String, Object> kafkaProps; - private SerializableDeserializer<K> keyDeserializer; - private SerializableDeserializer<V> valueDeserializer; + private Subscription subscription; + private final SerializableDeserializer<K> keyDes; + private final Class<? extends Deserializer<K>> keyDesClazz; + private final SerializableDeserializer<V> valueDes; + private final Class<? extends Deserializer<V>> valueDesClazz; + private RecordTranslator<K, V> translator; private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS; private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS; private int maxRetries = DEFAULT_MAX_RETRIES; private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; - private final KafkaSpoutStreams kafkaSpoutStreams; private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS; + private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE; private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS; - private boolean manualPartitionAssignment = false; - private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; - private final KafkaSpoutRetryService retryService; - - /** - * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/> - * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/> - * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), - * DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))} - */ - public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams, - KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) { - this(kafkaProps, kafkaSpoutStreams, tuplesBuilder, - new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2), - DEFAULT_MAX_RETRIES, TimeInterval.seconds(10))); + + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) { + this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); } - - /*** - * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics - * The optional configuration can be specified using the set methods of this builder - * @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a> - * @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream. - * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s. - * @param retryService logic that manages the retrial of failed tuples - */ - public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams, - KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) { - if (kafkaProps == null || kafkaProps.isEmpty()) { - throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps); - } - - if (kafkaSpoutStreams == null) { - throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream"); - } - - if (tuplesBuilder == null) { - throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); - } - - if (retryService == null) { - throw new IllegalArgumentException("Must specify at implementation of retry service"); + + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) { + this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + } + + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) { + this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics)); + } + + public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) { + this(bootstrapServers, keyDes, null, valDes, null, subscription); + } + + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String ... topics) { + this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + } + + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) { + this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + } + + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) { + this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics)); + } + + public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) { + this(bootstrapServers, null, keyDes, null, valDes, subscription); + } + + private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) { + kafkaProps = new HashMap<>(); + if (bootstrapServers == null || bootstrapServers.isEmpty()) { + throw new IllegalArgumentException("bootstrap servers cannot be null"); } + kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + this.keyDes = keyDes; + this.keyDesClazz = keyDesClazz; + this.valueDes = valDes; + this.valueDesClazz = valDesClazz; + this.subscription = subscription; + this.translator = new DefaultRecordTranslator<K,V>(); + } - this.kafkaProps = kafkaProps; - this.kafkaSpoutStreams = kafkaSpoutStreams; - this.tuplesBuilder = tuplesBuilder; - this.retryService = retryService; + private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) { + this.kafkaProps = new HashMap<>(builder.kafkaProps); + this.subscription = builder.subscription; + this.pollTimeoutMs = builder.pollTimeoutMs; + this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; + this.maxRetries = builder.maxRetries; + this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; + this.maxUncommittedOffsets = builder.maxUncommittedOffsets; + //this could result in a lot of class case exceptions at runtime, --- End diff -- Sorry the comment was not updated. I will do that. Originally it was for API compatibility, but now it is to not force someone to reset the translator when they change a key or value type. Specifically the default translator that should work on any types.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---