[
https://issues.apache.org/jira/browse/STORM-393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14097905#comment-14097905
]
Parth Brahmbhatt commented on STORM-393:
----------------------------------------
Users should be able to do this even right now. Instead of using the default
Schemes(StringScheme, RawScheme, StringKeyValueScheme etc.), users can specify
their own scheme implementation which can output both the message and the
topic.
Example:
Create your own implementation of Scheme
public class StringWithTopicScheme implements Scheme {
public static final String MESSAGE_KEY = "message";
public static final String TOPIC_KEY = "topic";
private String topicName;
public StringWithTopicScheme(String topicName) {
this.topicName = topicName;
}
public List<Object> deserialize(byte[] bytes) {
return new Values(StringScheme.deserializeString(bytes),
this.topicName);
}
public Fields getOutputFields() {
return new Fields(MESSAGE_KEY,TOPIC_KEY);
}
}
//provide your implementation as scheme to spoutConfig
String topicName = "topicName";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/"+topicName,
UUID.randomUUID().toString());
spoutConf.scheme = new StringWithTopicScheme(topicName);
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Do you want this to be added as kafka distribution?
If you are talking about dynamically figuring out the topic then there is no
easy way to do so given the current Scheme and MultiScheme interface.
> Add topic to KafkaSpout output
> ------------------------------
>
> Key: STORM-393
> URL: https://issues.apache.org/jira/browse/STORM-393
> Project: Apache Storm (Incubating)
> Issue Type: Improvement
> Affects Versions: 0.9.2-incubating
> Reporter: Alexey Raga
> Labels: features
>
> It would be beneficial to have topic as a tuple value emitted from KafkaSpout.
> Not only it is useful if STORM-392 is implemented, but also in case when we
> have more than one KafkaSpout in a system
--
This message was sent by Atlassian JIRA
(v6.2#6252)