Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1808#discussion_r96734787
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -61,129 +66,244 @@
          * If no offset has been committed, it behaves as LATEST.</li>
          * </ul>
          * */
    -    public enum FirstPollOffsetStrategy {
    +    public static enum FirstPollOffsetStrategy {
             EARLIEST,
             LATEST,
             UNCOMMITTED_EARLIEST,
             UNCOMMITTED_LATEST }
    -
    -    // Kafka consumer configuration
    -    private final Map<String, Object> kafkaProps;
    -    private final Deserializer<K> keyDeserializer;
    -    private final Deserializer<V> valueDeserializer;
    -    private final long pollTimeoutMs;
    -
    -    // Kafka spout configuration
    -    private final long offsetCommitPeriodMs;
    -    private final int maxRetries;
    -    private final int maxUncommittedOffsets;
    -    private final long partitionRefreshPeriodMs;
    -    private final boolean manualPartitionAssignment;
    -    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
    -    private final KafkaSpoutStreams kafkaSpoutStreams;
    -    private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
    -    private final KafkaSpoutRetryService retryService;
    -
    -    private KafkaSpoutConfig(Builder<K,V> builder) {
    -        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
    -        this.keyDeserializer = builder.keyDeserializer;
    -        this.valueDeserializer = builder.valueDeserializer;
    -        this.pollTimeoutMs = builder.pollTimeoutMs;
    -        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    -        this.maxRetries = builder.maxRetries;
    -        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    -        this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
    -        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    -        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
    -        this.manualPartitionAssignment = builder.manualPartitionAssignment;
    -        this.tuplesBuilder = builder.tuplesBuilder;
    -        this.retryService = builder.retryService;
    +    
    +    public static Builder<String, String> builder(String bootstrapServers, 
String ... topics) {
    +        return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
         }
    -
    -    private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, 
Object> kafkaProps) {
    +    
    +    public static Builder<String, String> builder(String bootstrapServers, 
Collection<String> topics) {
    +        return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
    +    }
    +    
    +    public static Builder<String, String> builder(String bootstrapServers, 
Pattern topics) {
    +        return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
    +    }
    +    
    +    private static Map<String, Object> 
setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
             // set defaults for properties not specified
    -        if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
    -            kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
    +        if 
(!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    +            kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
             }
             return kafkaProps;
         }
    -
    +    
         public static class Builder<K,V> {
             private final Map<String, Object> kafkaProps;
    -        private SerializableDeserializer<K> keyDeserializer;
    -        private SerializableDeserializer<V> valueDeserializer;
    +        private Subscription subscription;
    +        private final SerializableDeserializer<K> keyDes;
    +        private final Class<? extends Deserializer<K>> keyDesClazz;
    +        private final SerializableDeserializer<V> valueDes;
    +        private final Class<? extends Deserializer<V>> valueDesClazz;
    +        private RecordTranslator<K, V> translator;
             private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
             private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
             private int maxRetries = DEFAULT_MAX_RETRIES;
             private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
    -        private final KafkaSpoutStreams kafkaSpoutStreams;
             private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
    +        private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
             private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
    -        private boolean manualPartitionAssignment = false;
    -        private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
    -        private final KafkaSpoutRetryService retryService;
    -
    -        /**
    -         * Please refer to javadoc in {@link #Builder(Map, 
KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
    -         * This constructor uses by the default the following 
implementation for {@link KafkaSpoutRetryService}:<p/>
    -         * {@code new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
    -         *           DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
    -         */
    -        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams 
kafkaSpoutStreams,
    -                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
    -            this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
    -                    new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
    -                            DEFAULT_MAX_RETRIES, 
TimeInterval.seconds(10)));
    +        
    +        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String 
... topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
             }
    -
    -        /***
    -         * KafkaSpoutConfig defines the required configuration to connect 
a consumer to a consumer group, as well as the subscribing topics
    -         * The optional configuration can be specified using the set 
methods of this builder
    -         * @param kafkaProps    properties defining consumer connection to 
Kafka broker as specified in @see <a 
href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html";>KafkaConsumer</a>
    -         * @param kafkaSpoutStreams    streams to where the tuples are 
emitted for each tuple. Multiple topics can emit in the same stream.
    -         * @param tuplesBuilder logic to build tuples from {@link 
ConsumerRecord}s.
    -         * @param retryService  logic that manages the retrial of failed 
tuples
    -         */
    -        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams 
kafkaSpoutStreams,
    -                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, 
KafkaSpoutRetryService retryService) {
    -            if (kafkaProps == null || kafkaProps.isEmpty()) {
    -                throw new IllegalArgumentException("Properties defining 
consumer connection to Kafka broker are required: " + kafkaProps);
    -            }
    -
    -            if (kafkaSpoutStreams == null)  {
    -                throw new IllegalArgumentException("Must specify stream 
associated with each topic. Multiple topics can emit to the same stream");
    -            }
    -
    -            if (tuplesBuilder == null) {
    -                throw new IllegalArgumentException("Must specify at last 
one tuple builder per topic declared in KafkaSpoutStreams");
    -            }
    -
    -            if (retryService == null) {
    -                throw new IllegalArgumentException("Must specify at 
implementation of retry service");
    +        
    +        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, 
Collection<String> topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern 
topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, 
Subscription subscription) {
    +           this(bootstrapServers, keyDes, null, valDes, null, 
subscription);
    +        }
    +        
    +        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String ... 
topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, 
Collection<String> topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern 
topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription 
subscription) {
    +           this(bootstrapServers, null, keyDes, null, valDes, 
subscription);
    +        }
    +        
    +        private Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> 
keyDesClazz,
    +                   SerializableDeserializer<V> valDes, Class<? extends 
Deserializer<V>> valDesClazz, Subscription subscription) {
    +            kafkaProps = new HashMap<>();
    +            if (bootstrapServers == null || bootstrapServers.isEmpty()) {
    +                throw new IllegalArgumentException("bootstrap servers 
cannot be null");
                 }
    +            kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
    +            this.keyDes = keyDes;
    +            this.keyDesClazz = keyDesClazz;
    +            this.valueDes = valDes;
    +            this.valueDesClazz = valDesClazz;
    +            this.subscription = subscription;
    +            this.translator = new DefaultRecordTranslator<K,V>();
    +        }
     
    -            this.kafkaProps = kafkaProps;
    -            this.kafkaSpoutStreams = kafkaSpoutStreams;
    -            this.tuplesBuilder = tuplesBuilder;
    -            this.retryService = retryService;
    +        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> 
keyDes, Class<? extends Deserializer<K>> keyDesClazz,
    +                   SerializableDeserializer<V> valueDes, Class<? extends 
Deserializer<V>> valueDesClazz) {
    +            this.kafkaProps = new HashMap<>(builder.kafkaProps);
    +            this.subscription = builder.subscription;
    +            this.pollTimeoutMs = builder.pollTimeoutMs;
    +            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    +            this.maxRetries = builder.maxRetries;
    +            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    +            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    +            //this could result in a lot of class case exceptions at 
runtime,
    +            // but this is the only way to really maintain API 
compatibility
    +            this.translator = (RecordTranslator<K, V>) builder.translator;
    +            this.retryService = builder.retryService;
    +            this.keyDes = keyDes;
    +            this.keyDesClazz = keyDesClazz;
    +            this.valueDes = valueDes;
    +            this.valueDesClazz = valueDesClazz;
             }
     
             /**
              * Specifying this key deserializer overrides the property 
key.deserializer
              */
    -        public Builder<K,V> setKeyDeserializer(SerializableDeserializer<K> 
keyDeserializer) {
    -            this.keyDeserializer = keyDeserializer;
    -            return this;
    +        public <NK> Builder<NK,V> setKey(SerializableDeserializer<NK> 
keyDeserializer) {
    +            return new Builder<>(this, keyDeserializer, null, valueDes, 
valueDesClazz);
    +        }
    +        
    +        /**
    +         * Specify a class that can be instantiated to create a 
key.deserializer
    +         * This is the same as setting key.deserializer, but overrides it.
    +         */
    +        public <NK> Builder<NK, V> setKey(Class<? extends 
Deserializer<NK>> clazz) {
    +            return new Builder<>(this, null, clazz, valueDes, 
valueDesClazz);
             }
     
             /**
              * Specifying this value deserializer overrides the property 
value.deserializer
              */
    -        public Builder<K,V> 
setValueDeserializer(SerializableDeserializer<V> valueDeserializer) {
    -            this.valueDeserializer = valueDeserializer;
    +        public <NV> Builder<K,NV> setValue(SerializableDeserializer<NV> 
valueDeserializer) {
    +            return new Builder<>(this, keyDes, keyDesClazz, 
valueDeserializer, null);
    +        }
    +        
    +        /**
    +         * Specify a class that can be instantiated to create a 
value.deserializer
    +         * This is the same as setting value.deserializer, but overrides 
it.
    +         */
    +        public <NV> Builder<K,NV> setValue(Class<? extends 
Deserializer<NV>> clazz) {
    +            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
    +        }
    +        
    +        /**
    +         * Set a Kafka property config
    +         */
    +        public Builder<K,V> setProp(String key, Object value) {
    +            kafkaProps.put(key, value);
    +            return this;
    +        }
    +        
    +        /**
    +         * Set multiple Kafka property configs
    +         */
    +        public Builder<K,V> setProp(Map<String, Object> props) {
    +            kafkaProps.putAll(props);
    +            return this;
    +        }
    +        
    +        /**
    +         * Set multiple Kafka property configs
    +         */
    +        public Builder<K,V> setProp(Properties props) {
    +            for (String name: props.stringPropertyNames()) {
    +                kafkaProps.put(name, props.get(name));
    +            }
                 return this;
             }
    +        
    +        /**
    +         * Set the group.id for the consumers
    +         */
    +        public Builder<K,V> setGroupId(String id) {
    +            return setProp("group.id", id);
    +        }
    +        
    +        /**
    +         * reset the bootstrap servers for the Consumer
    +         */
    +        public Builder<K,V> setBootstrapServers(String servers) {
    +            return setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
servers);
    +        }
    +        
    +        /**
    +         * The minimum amount of data the broker should return for a fetch 
request.
    +         */
    +        public Builder<K,V> setFetchMinBytes(int bytes) {
    +            return setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, bytes);
    +        }
    +        
    +        /**
    +         * The maximum amount of data per-partition the broker will return.
    +         */
    +        public Builder<K,V> setMaxPartitionFectchBytes(int bytes) {
    +            return 
setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, bytes);
    +        }
    +        
    +        /**
    +         * The maximum number of records a poll will return.
    +         * Will only work with Kafka 0.10.0 and above.
    +         */
    +        public Builder<K,V> setMaxPoolRecords(int records) {
    --- End diff --
    
    maxPool -> maxPoll


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to