Repository: storm
Updated Branches:
  refs/heads/1.x-branch b24f5d87c -> 9dc541461


STORM-2825: Fix ClassCastException when storm-kafka-client uses consumer config 
with String-type 'enable.auto.commit'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/689d31c6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/689d31c6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/689d31c6

Branch: refs/heads/1.x-branch
Commit: 689d31c6ea3e5c52fcc4c18b0b065d66f2d29915
Parents: b24f5d8
Author: Stig Rohde Døssing <[email protected]>
Authored: Sun Nov 19 10:36:37 2017 +0100
Committer: Stig Rohde Døssing <[email protected]>
Committed: Tue Nov 21 20:05:58 2017 +0100

----------------------------------------------------------------------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  2 +-
 .../storm/kafka/spout/KafkaSpoutConfigTest.java | 46 ++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/689d31c6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index ffefa29..d89b674 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -718,7 +718,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
                 + " This will be treated as an error in the next major 
release."
                 + " For now the spout will be configured to behave like it 
would have in pre-1.2.0 releases.");
 
-            final boolean enableAutoCommit = 
(boolean)builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+            final boolean enableAutoCommit = 
Boolean.parseBoolean(builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString());
             if(enableAutoCommit) {
                 builder.processingGuarantee = ProcessingGuarantee.NONE;
             } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/689d31c6/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 794bed4..2e2d7ff 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -75,4 +75,50 @@ public class KafkaSpoutConfigTest {
         assertThat("Should allow users to pick a different auto offset reset 
policy than the one recommended for the at-least-once processing guarantee",
             
(String)conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), 
is("none"));
     }
+    
+    @Test
+    public void testCanConfigureWithExplicitTrueBooleanAutoCommitMode() {
+        /*
+         * Since adding setProcessingGuarantee to KafkaSpoutConfig we don't 
want users to set "enable.auto.commit" in the consumer config,
+         * because setting the processing guarantee will do it automatically. 
For backward compatibility we need to be able to handle the 
+         * property being set anyway for a few releases, and try to set a 
processing guarantee that corresponds to the property.
+         */
+        
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
+            .build();
+        
+        assertThat("When setting enable auto commit to true explicitly the 
spout should use the 'none' processing guarantee",
+            conf.getProcessingGuarantee(), 
is(KafkaSpoutConfig.ProcessingGuarantee.NONE));
+    }
+    
+    @Test
+    public void testCanConfigureWithExplicitFalseBooleanAutoCommitMode() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
+            .build();
+        
+        assertThat("When setting enable auto commit to false explicitly the 
spout should use the 'at-least-once' processing guarantee",
+            conf.getProcessingGuarantee(), 
is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE));
+    }
+    
+    @Test
+    public void testCanConfigureWithExplicitTrueStringAutoCommitMode() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+            .build();
+        
+        assertThat("When setting enable auto commit to true explicitly the 
spout should use the 'none' processing guarantee",
+            conf.getProcessingGuarantee(), 
is(KafkaSpoutConfig.ProcessingGuarantee.NONE));
+    }
+    
+    @Test
+    public void testCanConfigureWithExplicitFalseStringAutoCommitMode() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+            .build();
+        
+        assertThat("When setting enable auto commit explicitly to false the 
spout should use the 'at-least-once' processing guarantee",
+            conf.getProcessingGuarantee(), 
is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE));
+    }
 }

Reply via email to