Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/2537#discussion_r165852060 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription subscription) { return builder; } - private static void setAutoCommitMode(Builder<?, ?> builder) { + private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) { if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { - throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually." - + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee"); + throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee"); } - if (builder.processingGuarantee == ProcessingGuarantee.NONE) { - builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - } else { - String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); - if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) { - if (autoOffsetResetPolicy == null) { - /* - If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured - for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range - error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer - requests an offset that was deleted. - */ - builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) { - LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'." - + " Some messages may be skipped."); - } - } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) { - if (autoOffsetResetPolicy != null - && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) { - LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'." - + " Some messages may be processed more than once."); - } + String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) { + if (autoOffsetResetPolicy == null) { + /* + * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured + * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range + * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer + * requests an offset that was deleted. + */ + builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); --- End diff -- We should print and INFO level log here saying: LOG.info("Set Kafka property {} to {}, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
---