Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2537#discussion_r165852904
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
---
@@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription
subscription) {
return builder;
}
- private static void setAutoCommitMode(Builder<?, ?> builder) {
+ private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?>
builder) {
if
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
- throw new IllegalArgumentException("Do not set " +
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
- + " Instead use
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+ throw new IllegalStateException("The KafkaConsumer " +
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ + " setting is not supported. You can configure similar
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
}
- if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- } else {
- String autoOffsetResetPolicy =
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
- if (builder.processingGuarantee ==
ProcessingGuarantee.AT_LEAST_ONCE) {
- if (autoOffsetResetPolicy == null) {
- /*
- If the user wants to explicitly set an auto offset
reset policy, we should respect it, but when the spout is configured
- for at-least-once processing we should default to
seeking to the earliest offset in case there's an offset out of range
- error, rather than seeking to the latest (Kafka's
default). This type of error will typically happen when the consumer
- requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- } else if (!autoOffsetResetPolicy.equals("earliest") &&
!autoOffsetResetPolicy.equals("none")) {
- LOG.warn("Cannot guarantee at-least-once processing
with auto.offset.reset.policy other than 'earliest' or 'none'."
- + " Some messages may be skipped.");
- }
- } else if (builder.processingGuarantee ==
ProcessingGuarantee.AT_MOST_ONCE) {
- if (autoOffsetResetPolicy != null
- && (!autoOffsetResetPolicy.equals("latest") &&
!autoOffsetResetPolicy.equals("none"))) {
- LOG.warn("Cannot guarantee at-most-once processing
with auto.offset.reset.policy other than 'latest' or 'none'."
- + " Some messages may be processed more than
once.");
- }
+ String autoOffsetResetPolicy = (String)
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ if (builder.processingGuarantee ==
ProcessingGuarantee.AT_LEAST_ONCE) {
+ if (autoOffsetResetPolicy == null) {
+ /*
+ * If the user wants to explicitly set an auto offset
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
--- End diff --
Good point, will add the log
---