Repository: storm Updated Branches: refs/heads/master 1bdc922e2 -> 5e5e83cab
Emit to _spoutConfig.outputStreamId Even though KafkaSpout.declareOutputFields declaresStream using outputStreamId (if present), the message gets emitted to a stream matching the Kafka topic it was read from. Looks like it may have been a merge conflict between the fix for STORM-1210 and STORM-1379. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/09d9ea10 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/09d9ea10 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/09d9ea10 Branch: refs/heads/master Commit: 09d9ea101b4dbaf49335907aaaf3167d29140c59 Parents: 84641f6 Author: aichow <aic...@users.noreply.github.com> Authored: Wed Sep 21 14:51:50 2016 -0500 Committer: GitHub <nore...@github.com> Committed: Wed Sep 21 14:51:50 2016 -0500 ---------------------------------------------------------------------- .../src/jvm/org/apache/storm/kafka/PartitionManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/09d9ea10/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index c51f36e..793d227 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -164,7 +164,7 @@ public class PartitionManager { if ((tups != null) && tups.iterator().hasNext()) { if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { for (List<Object> tup : tups) { - collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset())); + collector.emit(_spoutConfig.outputStreamId, tup, new KafkaMessageId(_partition, toEmit.offset())); } } else { for (List<Object> tup : tups) {