Repository: storm
Updated Branches:
  refs/heads/master fec4b53fe -> 154e9ec55


each KafkaBolt have its own properties


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a062a408
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a062a408
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a062a408

Branch: refs/heads/master
Commit: a062a40845a808566c8d727a654654a47e8d3eb3
Parents: 528958c
Author: renkai <[email protected]>
Authored: Wed Aug 26 17:55:16 2015 +0800
Committer: renkai <[email protected]>
Committed: Wed Aug 26 17:55:16 2015 +0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     | 15 +++++++--
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 32 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a062a408/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java 
b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 738b358..9a020a0 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -63,7 +63,8 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     private OutputCollector collector;
     private TupleToKafkaMapper<K,V> mapper;
     private KafkaTopicSelector topicSelector;
-    /** 
+    private Properties boltSpecfiedProperties = new Properties();
+    /**
      * With default setting for fireAndForget and async, the callback is 
called when the sending succeeds.
      * By setting fireAndForget true, the send will not wait at all for kafka 
to ack.
      * "acks" setting in 0.8.2 Producer API config doesn't matter if 
fireAndForget is set.
@@ -72,6 +73,13 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     private boolean fireAndForget = false;
     private boolean async = true;
 
+    public KafkaBolt(Properties boltSpecfiedProperties) {
+        this.boltSpecfiedProperties = boltSpecfiedProperties;
+    }
+
+    public KafkaBolt() {
+    }
+
     public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> 
mapper) {
         this.mapper = mapper;
         return this;
@@ -96,7 +104,10 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
 
         Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
         Properties properties = new Properties();
-        properties.putAll(configMap);
+        if(configMap!= null)
+            properties.putAll(configMap);
+
+        properties.putAll(boltSpecfiedProperties);
         producer = new KafkaProducer<K, V>(properties);
         this.collector = collector;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a062a408/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java 
b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 05d138b..673f8ff 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -166,6 +166,22 @@ public class KafkaBoltTest {
         verifyMessage(keyString, messageString);
     }
 
+    /* test bolt specified properties */
+    @Test
+    public void executeWithBoltSpecifiedProperties() {
+        boolean async = false;
+        boolean fireAndForget = false;
+        bolt = defaultSerializerBoltWithSpecifiedProperties(async, 
fireAndForget);
+        String keyString = "test-key";
+        String messageString = "test-message";
+        byte[] key = keyString.getBytes();
+        byte[] message = messageString.getBytes();
+        Tuple tuple = generateTestTuple(key, message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(keyString, messageString);
+    }
+
     private KafkaBolt generateStringSerializerBolt() {
         KafkaBolt bolt = new KafkaBolt();
         Properties props = new Properties();
@@ -198,6 +214,22 @@ public class KafkaBoltTest {
         return bolt;
     }
 
+    private KafkaBolt defaultSerializerBoltWithSpecifiedProperties(boolean 
async, boolean fireAndForget) {
+        Properties props = new Properties();
+        props.put("request.required.acks", "1");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("bootstrap.servers", broker.getBrokerConnectionString());
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put("metadata.fetch.timeout.ms", 1000);
+        props.put("linger.ms", 0);
+        KafkaBolt bolt = new KafkaBolt(props);
+        bolt.prepare(config, null, new OutputCollector(collector));
+        bolt.setAsync(async);
+        bolt.setFireAndForget(fireAndForget);
+        return bolt;
+    }
+
     @Test
     public void executeWithoutKey() throws Exception {
         String message = "value-234";

Reply via email to