Repository: storm
Updated Branches:
  refs/heads/master 1bdc922e2 -> 5e5e83cab


Emit to _spoutConfig.outputStreamId

Even though KafkaSpout.declareOutputFields declaresStream using outputStreamId 
(if present), the message gets emitted to a stream matching the Kafka topic it 
was read from. Looks like it may have been a merge conflict between the fix for 
STORM-1210 and STORM-1379.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/09d9ea10
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/09d9ea10
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/09d9ea10

Branch: refs/heads/master
Commit: 09d9ea101b4dbaf49335907aaaf3167d29140c59
Parents: 84641f6
Author: aichow <aic...@users.noreply.github.com>
Authored: Wed Sep 21 14:51:50 2016 -0500
Committer: GitHub <nore...@github.com>
Committed: Wed Sep 21 14:51:50 2016 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/kafka/PartitionManager.java           | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/09d9ea10/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index c51f36e..793d227 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -164,7 +164,7 @@ public class PartitionManager {
             if ((tups != null) && tups.iterator().hasNext()) {
                if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
                     for (List<Object> tup : tups) {
-                        collector.emit(_spoutConfig.topic, tup, new 
KafkaMessageId(_partition, toEmit.offset()));
+                        collector.emit(_spoutConfig.outputStreamId, tup, new 
KafkaMessageId(_partition, toEmit.offset()));
                     }
                 } else {
                     for (List<Object> tup : tups) {

Reply via email to