Repository: storm Updated Branches: refs/heads/1.x-branch b24f5d87c -> 9dc541461
STORM-2825: Fix ClassCastException when storm-kafka-client uses consumer config with String-type 'enable.auto.commit' Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/689d31c6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/689d31c6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/689d31c6 Branch: refs/heads/1.x-branch Commit: 689d31c6ea3e5c52fcc4c18b0b065d66f2d29915 Parents: b24f5d8 Author: Stig Rohde Døssing <[email protected]> Authored: Sun Nov 19 10:36:37 2017 +0100 Committer: Stig Rohde Døssing <[email protected]> Committed: Tue Nov 21 20:05:58 2017 +0100 ---------------------------------------------------------------------- .../storm/kafka/spout/KafkaSpoutConfig.java | 2 +- .../storm/kafka/spout/KafkaSpoutConfigTest.java | 46 ++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/689d31c6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index ffefa29..d89b674 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -718,7 +718,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { + " This will be treated as an error in the next major release." + " For now the spout will be configured to behave like it would have in pre-1.2.0 releases."); - final boolean enableAutoCommit = (boolean)builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + final boolean enableAutoCommit = Boolean.parseBoolean(builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString()); if(enableAutoCommit) { builder.processingGuarantee = ProcessingGuarantee.NONE; } else { http://git-wip-us.apache.org/repos/asf/storm/blob/689d31c6/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index 794bed4..2e2d7ff 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -75,4 +75,50 @@ public class KafkaSpoutConfigTest { assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee", (String)conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none")); } + + @Test + public void testCanConfigureWithExplicitTrueBooleanAutoCommitMode() { + /* + * Since adding setProcessingGuarantee to KafkaSpoutConfig we don't want users to set "enable.auto.commit" in the consumer config, + * because setting the processing guarantee will do it automatically. For backward compatibility we need to be able to handle the + * property being set anyway for a few releases, and try to set a processing guarantee that corresponds to the property. + */ + + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) + .build(); + + assertThat("When setting enable auto commit to true explicitly the spout should use the 'none' processing guarantee", + conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NONE)); + } + + @Test + public void testCanConfigureWithExplicitFalseBooleanAutoCommitMode() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) + .build(); + + assertThat("When setting enable auto commit to false explicitly the spout should use the 'at-least-once' processing guarantee", + conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE)); + } + + @Test + public void testCanConfigureWithExplicitTrueStringAutoCommitMode() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + .build(); + + assertThat("When setting enable auto commit to true explicitly the spout should use the 'none' processing guarantee", + conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NONE)); + } + + @Test + public void testCanConfigureWithExplicitFalseStringAutoCommitMode() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .build(); + + assertThat("When setting enable auto commit explicitly to false the spout should use the 'at-least-once' processing guarantee", + conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE)); + } }
