Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2537#discussion_r165852060
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
             return builder;
         }
     
    -    private static void setAutoCommitMode(Builder<?, ?> builder) {
    +    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> 
builder) {
             if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    -            throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
    -                + " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
    +            throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
    +                + " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
             }
    -        if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
    -            
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    -        } else {
    -            String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    -            if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
    -                if (autoOffsetResetPolicy == null) {
    -                    /*
    -                    If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
    -                    for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
    -                    error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
    -                    requests an offset that was deleted.
    -                     */
    -                    
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    -                } else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
    -                    LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
    -                        + " Some messages may be skipped.");
    -                }
    -            } else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
    -                if (autoOffsetResetPolicy != null
    -                    && (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
    -                    LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
    -                        + " Some messages may be processed more than 
once.");
    -                }
    +        String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
    +        if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
    +            if (autoOffsetResetPolicy == null) {
    +                /*
    +                 * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
    +                 * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
    +                 * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
    +                 * requests an offset that was deleted.
    +                 */
    +                
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    --- End diff --
    
    We should print and INFO level log here saying:
    
    LOG.info("Set Kafka property {} to {}, 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


---

Reply via email to