Repository: storm Updated Branches: refs/heads/master fec4b53fe -> 154e9ec55
each KafkaBolt have its own properties Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a062a408 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a062a408 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a062a408 Branch: refs/heads/master Commit: a062a40845a808566c8d727a654654a47e8d3eb3 Parents: 528958c Author: renkai <[email protected]> Authored: Wed Aug 26 17:55:16 2015 +0800 Committer: renkai <[email protected]> Committed: Wed Aug 26 17:55:16 2015 +0800 ---------------------------------------------------------------------- .../src/jvm/storm/kafka/bolt/KafkaBolt.java | 15 +++++++-- .../test/storm/kafka/bolt/KafkaBoltTest.java | 32 ++++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a062a408/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java index 738b358..9a020a0 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java @@ -63,7 +63,8 @@ public class KafkaBolt<K, V> extends BaseRichBolt { private OutputCollector collector; private TupleToKafkaMapper<K,V> mapper; private KafkaTopicSelector topicSelector; - /** + private Properties boltSpecfiedProperties = new Properties(); + /** * With default setting for fireAndForget and async, the callback is called when the sending succeeds. * By setting fireAndForget true, the send will not wait at all for kafka to ack. * "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set. @@ -72,6 +73,13 @@ public class KafkaBolt<K, V> extends BaseRichBolt { private boolean fireAndForget = false; private boolean async = true; + public KafkaBolt(Properties boltSpecfiedProperties) { + this.boltSpecfiedProperties = boltSpecfiedProperties; + } + + public KafkaBolt() { + } + public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) { this.mapper = mapper; return this; @@ -96,7 +104,10 @@ public class KafkaBolt<K, V> extends BaseRichBolt { Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES); Properties properties = new Properties(); - properties.putAll(configMap); + if(configMap!= null) + properties.putAll(configMap); + + properties.putAll(boltSpecfiedProperties); producer = new KafkaProducer<K, V>(properties); this.collector = collector; } http://git-wip-us.apache.org/repos/asf/storm/blob/a062a408/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java index 05d138b..673f8ff 100644 --- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java +++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java @@ -166,6 +166,22 @@ public class KafkaBoltTest { verifyMessage(keyString, messageString); } + /* test bolt specified properties */ + @Test + public void executeWithBoltSpecifiedProperties() { + boolean async = false; + boolean fireAndForget = false; + bolt = defaultSerializerBoltWithSpecifiedProperties(async, fireAndForget); + String keyString = "test-key"; + String messageString = "test-message"; + byte[] key = keyString.getBytes(); + byte[] message = messageString.getBytes(); + Tuple tuple = generateTestTuple(key, message); + bolt.execute(tuple); + verify(collector).ack(tuple); + verifyMessage(keyString, messageString); + } + private KafkaBolt generateStringSerializerBolt() { KafkaBolt bolt = new KafkaBolt(); Properties props = new Properties(); @@ -198,6 +214,22 @@ public class KafkaBoltTest { return bolt; } + private KafkaBolt defaultSerializerBoltWithSpecifiedProperties(boolean async, boolean fireAndForget) { + Properties props = new Properties(); + props.put("request.required.acks", "1"); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("bootstrap.servers", broker.getBrokerConnectionString()); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("metadata.fetch.timeout.ms", 1000); + props.put("linger.ms", 0); + KafkaBolt bolt = new KafkaBolt(props); + bolt.prepare(config, null, new OutputCollector(collector)); + bolt.setAsync(async); + bolt.setFireAndForget(fireAndForget); + return bolt; + } + @Test public void executeWithoutKey() throws Exception { String message = "value-234";
