Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1808#discussion_r96730185
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -235,41 +355,116 @@ public Builder(Map<String, Object> kafkaProps, 
KafkaSpoutStreams kafkaSpoutStrea
                 this.firstPollOffsetStrategy = firstPollOffsetStrategy;
                 return this;
             }
    -
    +        
             /**
    -         * Sets partition refresh period in milliseconds in manual 
partition assignment model. Default is 2s.
    -         * @param partitionRefreshPeriodMs time in milliseconds
    +         * Sets the retry service for the spout to use.
    +         * @param retryService the new retry service
    +         * @return the builder (this).
              */
    -        public Builder<K, V> setPartitionRefreshPeriodMs(long 
partitionRefreshPeriodMs) {
    -            this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
    +        public Builder<K, V> setRetry(KafkaSpoutRetryService retryService) 
{
    +            if (retryService == null) {
    +                throw new NullPointerException("retryService cannot be 
null");
    +            }
    +            this.retryService = retryService;
                 return this;
             }
     
    +        public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> 
translator) {
    +            this.translator = translator;
    +            return this;
    +        }
    +        
    +        public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, 
V>, List<Object>> func, Fields fields) {
    +            return setRecordTranslator(new SimpleRecordTranslator<>(func, 
fields));
    +        }
    +        
    +        public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, 
V>, List<Object>> func, Fields fields, String stream) {
    +            return setRecordTranslator(new SimpleRecordTranslator<>(func, 
fields, stream));
    +        }
    +        
             /**
    -         * Defines whether the consumer manages partition manually.
    -         * If set to true, the consumer manage partition manually, 
otherwise it will rely on kafka to do partition assignment.
    -         * @param manualPartitionAssignment True if using manual partition 
assignment.
    +         * Sets partition refresh period in milliseconds. This is how 
often the subscription is refreshed
    +         * For most subscriptions that go through the 
KafkaConsumer.subscribe this is ignored.
    +         * @param partitionRefreshPeriodMs time in milliseconds
    +         * @return the builder (this)
              */
    -        public Builder<K, V> setManualPartitionAssignment(boolean 
manualPartitionAssignment) {
    -            this.manualPartitionAssignment = manualPartitionAssignment;
    -            return this;
    +        public Builder<K, V> setPartitionRefreshPeriodMs(long 
partitionRefreshPeriodMs) {
    +           this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
    +           return this;
             }
    -
    +        
             public KafkaSpoutConfig<K,V> build() {
                 return new KafkaSpoutConfig<>(this);
             }
         }
     
    +    // Kafka consumer configuration
    +    private final Map<String, Object> kafkaProps;
    +    private final Subscription subscription;
    +    private final SerializableDeserializer<K> keyDes;
    +    private final Class<? extends Deserializer<K>> keyDesClazz;
    +    private final SerializableDeserializer<V> valueDes;
    +    private final Class<? extends Deserializer<V>> valueDesClazz;
    +    private final long pollTimeoutMs;
    +
    +    // Kafka spout configuration
    +    private final RecordTranslator<K, V> translator;
    +    private final long offsetCommitPeriodMs;
    +    private final int maxRetries;
    +    private final int maxUncommittedOffsets;
    +    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
    +    private final KafkaSpoutRetryService retryService;
    +    private final long partitionRefreshPeriodMs;
    +
    +    private KafkaSpoutConfig(Builder<K,V> builder) {
    +        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
    +        this.subscription = builder.subscription;
    +        this.translator = builder.translator;
    +        this.pollTimeoutMs = builder.pollTimeoutMs;
    +        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    +        this.maxRetries = builder.maxRetries;
    +        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    +        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    +        this.retryService = builder.retryService;
    +        this.keyDes = builder.keyDes;
    +        this.keyDesClazz = builder.keyDesClazz;
    +        this.valueDes = builder.valueDes;
    +        this.valueDesClazz = builder.valueDesClazz;
    +        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
    +    }
    +
         public Map<String, Object> getKafkaProps() {
             return kafkaProps;
         }
     
         public Deserializer<K> getKeyDeserializer() {
    -        return keyDeserializer;
    +           if (keyDesClazz != null) {
    +                   try {
    --- End diff --
    
    Tabs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to