[ 
https://issues.apache.org/jira/browse/STORM-393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14097905#comment-14097905
 ] 

Parth Brahmbhatt edited comment on STORM-393 at 8/14/14 11:29 PM:
------------------------------------------------------------------

Users should be able to do this even right now. Instead of using the default 
Schemes(StringScheme, RawScheme, StringKeyValueScheme etc.), users can specify 
their own scheme implementation which can output both the message and the 
topic. 

Example:

Create your own implementation of Scheme

public class StringWithTopicScheme implements Scheme {

    public static final String MESSAGE_KEY = "message";
    public static final String TOPIC_KEY = "topic";

    private String topicName;

    public StringWithTopicScheme(String topicName) {
              this.topicName = topicName;
    }

    public List<Object> deserialize(byte[] bytes) {
        return new Values(StringScheme.deserializeString(bytes), 
this.topicName);
    }

    public Fields getOutputFields() {
        return new Fields(MESSAGE_KEY,TOPIC_KEY);
    }
}

//provide your implementation as scheme to spoutConfig
String topicName = "topicName";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/"+topicName, 
UUID.randomUUID().toString());
spoutConf.scheme = new StringWithTopicScheme(topicName);
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Do you want this to be added as part of storm-kafka distribution?

If you are talking about dynamically figuring out the topic then there is no 
easy way to do so given the current Scheme and MultiScheme interface. 



was (Author: parth.brahmbhatt):
Users should be able to do this even right now. Instead of using the default 
Schemes(StringScheme, RawScheme, StringKeyValueScheme etc.), users can specify 
their own scheme implementation which can output both the message and the 
topic. 

Example:

Create your own implementation of Scheme

public class StringWithTopicScheme implements Scheme {

    public static final String MESSAGE_KEY = "message";
    public static final String TOPIC_KEY = "topic";

    private String topicName;

    public StringWithTopicScheme(String topicName) {
              this.topicName = topicName;
    }

    public List<Object> deserialize(byte[] bytes) {
        return new Values(StringScheme.deserializeString(bytes), 
this.topicName);
    }

    public Fields getOutputFields() {
        return new Fields(MESSAGE_KEY,TOPIC_KEY);
    }
}

//provide your implementation as scheme to spoutConfig
String topicName = "topicName";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/"+topicName, 
UUID.randomUUID().toString());
spoutConf.scheme = new StringWithTopicScheme(topicName);
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Do you want this to be added as kafka distribution?

If you are talking about dynamically figuring out the topic then there is no 
easy way to do so given the current Scheme and MultiScheme interface. 


> Add topic to KafkaSpout output
> ------------------------------
>
>                 Key: STORM-393
>                 URL: https://issues.apache.org/jira/browse/STORM-393
>             Project: Apache Storm (Incubating)
>          Issue Type: Improvement
>    Affects Versions: 0.9.2-incubating
>            Reporter: Alexey Raga
>              Labels: features
>
> It would be beneficial to have topic as a tuple value emitted from KafkaSpout.
> Not only it is useful if STORM-392 is implemented, but also in case when we 
> have more than one KafkaSpout in a system



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to