[ 
https://issues.apache.org/jira/browse/STORM-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14033597#comment-14033597
 ] 

Haralds Ulmanis edited comment on STORM-353 at 6/17/14 9:28 AM:
----------------------------------------------------------------

{noformat}
Ok, what about this:
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java 
b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index b9ea948..a36e846 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -55,14 +55,36 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     private OutputCollector collector;
     private String topic;
 
+    private Map boltConfig;
+
+    public KafkaBolt(Map boltConfig) {
+        this.boltConfig = boltConfig;
+    }
+
+    public KafkaBolt() {
+    }
+
     @Override
     public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+        Map configMap;
+        if (boltConfig == null) {
+            configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+            this.topic = (String) stormConf.get(TOPIC);
+        } else {
+            if (boltConfig.containsKey(KAFKA_BROKER_PROPERTIES)) {
+                configMap = (Map) boltConfig.get(KAFKA_BROKER_PROPERTIES);
+            } else {
+                configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+            }
+            if (boltConfig.containsKey(TOPIC))
+                this.topic = (String) boltConfig.get(TOPIC);
+            else
+                this.topic = (String) stormConf.get(TOPIC);
+        }
         Properties properties = new Properties();
         properties.putAll(configMap);
         ProducerConfig config = new ProducerConfig(properties);
         producer = new Producer<K, V>(config);
-        this.topic = (String) stormConf.get(TOPIC);
         this.collector = collector;
     }
 
:) you can set  kafka config or topic or both. 
Also if you could turn this into a github pull request to 
apache/incubator-storm --> this is unknown for me.
{noformat}


was (Author: evilezh):
Ok, what about this:
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java 
b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index b9ea948..a36e846 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -55,14 +55,36 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     private OutputCollector collector;
     private String topic;
 
+    private Map boltConfig;
+
+    public KafkaBolt(Map boltConfig) {
+        this.boltConfig = boltConfig;
+    }
+
+    public KafkaBolt() {
+    }
+
     @Override
     public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
-        Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+        Map configMap;
+        if (boltConfig == null) {
+            configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+            this.topic = (String) stormConf.get(TOPIC);
+        } else {
+            if (boltConfig.containsKey(KAFKA_BROKER_PROPERTIES)) {
+                configMap = (Map) boltConfig.get(KAFKA_BROKER_PROPERTIES);
+            } else {
+                configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+            }
+            if (boltConfig.containsKey(TOPIC))
+                this.topic = (String) boltConfig.get(TOPIC);
+            else
+                this.topic = (String) stormConf.get(TOPIC);
+        }
         Properties properties = new Properties();
         properties.putAll(configMap);
         ProducerConfig config = new ProducerConfig(properties);
         producer = new Producer<K, V>(config);
-        this.topic = (String) stormConf.get(TOPIC);
         this.collector = collector;
     }
 
:) you can set  kafka config or topic or both. 
Also if you could turn this into a github pull request to 
apache/incubator-storm --> this is unknown for me.


> Add configuration per kafka-bolt 
> ---------------------------------
>
>                 Key: STORM-353
>                 URL: https://issues.apache.org/jira/browse/STORM-353
>             Project: Apache Storm (Incubating)
>          Issue Type: Improvement
>            Reporter: Haralds Ulmanis
>            Priority: Trivial
>
> Currently kafka bolt configuration is passed through storm configuration - 
> which limits possibility to use more than one storm bolt/topic per topology.
> I can suggest something like this:
> diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java 
> b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
> index b9ea948..2a78f84 100644
> --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
> +++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
> @@ -55,14 +55,20 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
>      private OutputCollector collector;
>      private String topic;
>  
> +    private Map boltConfig;
> +
> +    public KafkaBolt(Map boltConfig) {
> +        this.boltConfig = boltConfig;
> +    }
> +
>      @Override
>      public void prepare(Map stormConf, TopologyContext context, 
> OutputCollector collector) {
> -        Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
> +        Map configMap = (Map) boltConfig.get(KAFKA_BROKER_PROPERTIES);
>          Properties properties = new Properties();
>          properties.putAll(configMap);
>          ProducerConfig config = new ProducerConfig(properties);
>          producer = new Producer<K, V>(config);
> -        this.topic = (String) stormConf.get(TOPIC);
> +        this.topic = (String) boltConfig.get(TOPIC);
>          this.collector = collector;
>      }
>  
> After which you can initialize each bolt with own topic and even own zk 
> servers.
> Use case: 
> I'm using several streams for output and want each of stream to be published 
> in own topic. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to