Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1808#discussion_r91362733
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -60,123 +68,197 @@
          * If no offset has been committed, it behaves as LATEST.</li>
          * </ul>
          * */
    -    public enum FirstPollOffsetStrategy {
    +    public static enum FirstPollOffsetStrategy {
             EARLIEST,
             LATEST,
             UNCOMMITTED_EARLIEST,
             UNCOMMITTED_LATEST }
    -
    -    // Kafka consumer configuration
    -    private final Map<String, Object> kafkaProps;
    -    private final Deserializer<K> keyDeserializer;
    -    private final Deserializer<V> valueDeserializer;
    -    private final long pollTimeoutMs;
    -
    -    // Kafka spout configuration
    -    private final long offsetCommitPeriodMs;
    -    private final int maxRetries;
    -    private final int maxUncommittedOffsets;
    -    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
    -    private final KafkaSpoutStreams kafkaSpoutStreams;
    -    private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
    -    private final KafkaSpoutRetryService retryService;
    -
    -    private KafkaSpoutConfig(Builder<K,V> builder) {
    -        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
    -        this.keyDeserializer = builder.keyDeserializer;
    -        this.valueDeserializer = builder.valueDeserializer;
    -        this.pollTimeoutMs = builder.pollTimeoutMs;
    -        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    -        this.maxRetries = builder.maxRetries;
    -        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    -        this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
    -        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    -        this.tuplesBuilder = builder.tuplesBuilder;
    -        this.retryService = builder.retryService;
    +    
    +    public static Builder<String, String> builder(String bootstrapServers, 
String ... topics) {
    +        return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
         }
    -
    -    private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, 
Object> kafkaProps) {
    +    
    +    public static Builder<String, String> builder(String bootstrapServers, 
Collection<String> topics) {
    +        return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
    +    }
    +    
    +    public static Builder<String, String> builder(String bootstrapServers, 
Pattern topics) {
    +        return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
    +    }
    +    
    +    private static Map<String, Object> 
setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
             // set defaults for properties not specified
    -        if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
    -            kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
    +        if (!kafkaProps.containsKey(ENABLE_AUTO_COMMIT_CONF)) {
    +            kafkaProps.put(ENABLE_AUTO_COMMIT_CONF, "false");
             }
             return kafkaProps;
         }
    -
    +    
         public static class Builder<K,V> {
             private final Map<String, Object> kafkaProps;
    -        private Deserializer<K> keyDeserializer;
    -        private Deserializer<V> valueDeserializer;
    +        private Subscription subscription;
    +        private final Deserializer<K> keyDes;
    +        private final Deserializer<V> valueDes;
    +        private RecordTranslator<K, V> translator;
             private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
             private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
             private int maxRetries = DEFAULT_MAX_RETRIES;
             private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
    -        private final KafkaSpoutStreams kafkaSpoutStreams;
             private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
    -        private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
    -        private final KafkaSpoutRetryService retryService;
    -
    -        /**
    -         * Please refer to javadoc in {@link #Builder(Map, 
KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
    -         * This constructor uses by the default the following 
implementation for {@link KafkaSpoutRetryService}:<p/>
    -         * {@code new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
    -         *           DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
    -         */
    -        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams 
kafkaSpoutStreams,
    -                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
    -            this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
    -                    new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
    -                            DEFAULT_MAX_RETRIES, 
TimeInterval.seconds(10)));
    +        private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
    +        
    +        public Builder(String bootstrapServers, Deserializer<K> keyDes, 
Deserializer<V> valDes, String ... topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
             }
    -
    -        /***
    -         * KafkaSpoutConfig defines the required configuration to connect 
a consumer to a consumer group, as well as the subscribing topics
    -         * The optional configuration can be specified using the set 
methods of this builder
    -         * @param kafkaProps    properties defining consumer connection to 
Kafka broker as specified in @see <a 
href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html";>KafkaConsumer</a>
    -         * @param kafkaSpoutStreams    streams to where the tuples are 
emitted for each tuple. Multiple topics can emit in the same stream.
    -         * @param tuplesBuilder logic to build tuples from {@link 
ConsumerRecord}s.
    -         * @param retryService  logic that manages the retrial of failed 
tuples
    -         */
    -        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams 
kafkaSpoutStreams,
    -                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, 
KafkaSpoutRetryService retryService) {
    -            if (kafkaProps == null || kafkaProps.isEmpty()) {
    -                throw new IllegalArgumentException("Properties defining 
consumer connection to Kafka broker are required: " + kafkaProps);
    -            }
    -
    -            if (kafkaSpoutStreams == null)  {
    -                throw new IllegalArgumentException("Must specify stream 
associated with each topic. Multiple topics can emit to the same stream");
    -            }
    -
    -            if (tuplesBuilder == null) {
    -                throw new IllegalArgumentException("Must specify at last 
one tuple builder per topic declared in KafkaSpoutStreams");
    -            }
    -
    -            if (retryService == null) {
    -                throw new IllegalArgumentException("Must specify at 
implementation of retry service");
    +        
    +        public Builder(String bootstrapServers, Deserializer<K> keyDes, 
Deserializer<V> valDes, Collection<String> topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, Deserializer<K> keyDes, 
Deserializer<V> valDes, Pattern topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, Deserializer<K> keyDes, 
Deserializer<V> valDes, Subscription subscription) {
    +            kafkaProps = new HashMap<>();
    +            if (bootstrapServers == null || bootstrapServers.isEmpty()) {
    +                throw new IllegalArgumentException("bootstrap servers 
cannot be null");
                 }
    +            kafkaProps.put(BOOTSTRAP_SERVERS_CONF, bootstrapServers);
    +            this.keyDes = keyDes;
    +            this.valueDes = valDes;
    +            this.subscription = subscription;
    +            this.translator = new DefaultRecordTranslator<K,V>();
    +        }
     
    -            this.kafkaProps = kafkaProps;
    -            this.kafkaSpoutStreams = kafkaSpoutStreams;
    -            this.tuplesBuilder = tuplesBuilder;
    -            this.retryService = retryService;
    +        private Builder(Builder<?, ?> builder, Deserializer<K> keyDes, 
Deserializer<V> valueDes) {
    +            this.kafkaProps = new HashMap<>(builder.kafkaProps);
    +            this.subscription = builder.subscription;
    +            this.pollTimeoutMs = builder.pollTimeoutMs;
    +            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    +            this.maxRetries = builder.maxRetries;
    +            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    +            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    +            //this could result in a lot of class case exceptions at 
runtime,
    +            // but this is the only way to really maintain API 
compatibility
    +            this.translator = (RecordTranslator<K, V>) builder.translator;
    +            this.retryService = builder.retryService;
    +            this.keyDes = keyDes;
    +            this.valueDes = valueDes;
             }
     
             /**
              * Specifying this key deserializer overrides the property 
key.deserializer
              */
    -        public Builder<K,V> setKeyDeserializer(Deserializer<K> 
keyDeserializer) {
    -            this.keyDeserializer = keyDeserializer;
    -            return this;
    +        public <NK> Builder<NK,V> setKeyDeserializer(Deserializer<NK> 
keyDeserializer) {
    +            return new Builder<>(this, keyDeserializer, valueDes);
             }
     
             /**
              * Specifying this value deserializer overrides the property 
value.deserializer
              */
    -        public Builder<K,V> setValueDeserializer(Deserializer<V> 
valueDeserializer) {
    -            this.valueDeserializer = valueDeserializer;
    +        public <NV> Builder<K,NV> setValueDeserializer(Deserializer<NV> 
valueDeserializer) {
    +            return new Builder<>(this, this.keyDes, valueDeserializer);
    +        }
    +        
    +        /**
    +         * Set a Kafka property config
    +         */
    +        public Builder<K,V> setProp(String key, Object value) {
    +            kafkaProps.put(key, value);
                 return this;
             }
    +        
    +        /**
    +         * Set multiple Kafka property configs
    +         */
    +        public Builder<K,V> setProp(Map<String, Object> props) {
    +            kafkaProps.putAll(props);
    +            return this;
    +        }
    +        
    +        /**
    +         * Set multiple Kafka property configs
    +         */
    +        public Builder<K,V> setProp(Properties props) {
    +            for (String name: props.stringPropertyNames()) {
    +                kafkaProps.put(name, props.get(name));
    +            }
    +            return this;
    +        }
    +        
    +        /**
    +         * Set the group.id for the consumers
    +         */
    +        public Builder<K,V> setGroupId(String id) {
    +            return setProp("group.id", id);
    +        }
    +        
    +        /**
    +         * reset the bootstrap servers for the Consumer
    +         */
    +        public Builder<K,V> setBootstrapServers(String servers) {
    +            return setProp(BOOTSTRAP_SERVERS_CONF, servers);
    +        }
    +        
    +        /**
    +         * The minimum amount of data the broker should return for a fetch 
request.
    +         */
    +        public Builder<K,V> setFetchMinBytes(int bytes) {
    +            return setProp("fetch.min.bytes", bytes);
    +        }
    +        
    +        /**
    +         * The maximum amount of data per-partition the broker will return.
    +         */
    +        public Builder<K,V> setMaxPartitionFectchBytes(int bytes) {
    +            return setProp("max.partition.fetch.bytes", bytes);
    +        }
    +        
    +        /**
    +         * The maximum number of records a poll will return.
    +         * Only will work with Kafak 0.10.0 and above.
    --- End diff --
    
    kafak -> kafka, and "will only work" instead of "only will work"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to