Repository: storm
Updated Branches:
  refs/heads/1.x-branch 43653ac8d -> 8cb227445


Emit to _spoutConfig.outputStreamId

Even though KafkaSpout.declareOutputFields declaresStream using outputStreamId 
(if present), the message gets emitted to a stream matching the Kafka topic it 
was read from. Looks like it may have been a merge conflict between the fix for 
STORM-1210 and STORM-1379.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/98cd74e0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/98cd74e0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/98cd74e0

Branch: refs/heads/1.x-branch
Commit: 98cd74e06d9ccacc624bb3db84f817d4b0545a9b
Parents: 43653ac
Author: aichow <aic...@users.noreply.github.com>
Authored: Wed Sep 21 14:51:50 2016 -0500
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Fri Sep 23 10:49:19 2016 +0900

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/kafka/PartitionManager.java           | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/98cd74e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index e04b4f2..79e7c3d 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -164,7 +164,7 @@ public class PartitionManager {
             if ((tups != null) && tups.iterator().hasNext()) {
                if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
                     for (List<Object> tup : tups) {
-                        collector.emit(_spoutConfig.topic, tup, new 
KafkaMessageId(_partition, toEmit.offset()));
+                        collector.emit(_spoutConfig.outputStreamId, tup, new 
KafkaMessageId(_partition, toEmit.offset()));
                     }
                 } else {
                     for (List<Object> tup : tups) {

Reply via email to