Repository: samza
Updated Branches:
  refs/heads/master f8cce6e15 -> 7b0a65b14


SAMZA-1292: Merge operator can be no-op when there are no streams to merge

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jacob Maes <[email protected]>

Closes #320 from prateekm/small-fixes


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7b0a65b1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7b0a65b1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7b0a65b1

Branch: refs/heads/master
Commit: 7b0a65b140dbd5e02a783b53e4808b1d8d27e00e
Parents: f8cce6e
Author: Prateek Maheshwari <[email protected]>
Authored: Mon Oct 9 14:48:05 2017 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Mon Oct 9 14:48:05 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/operators/MessageStreamImpl.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7b0a65b1/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 8460ada..dc91d19 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -129,10 +129,10 @@ public class MessageStreamImpl<M> implements 
MessageStream<M> {
 
   @Override
   public MessageStream<M> merge(Collection<? extends MessageStream<? extends 
M>> otherStreams) {
+    if (otherStreams.isEmpty()) return this;
     StreamOperatorSpec<M, M> opSpec = 
OperatorSpecs.createMergeOperatorSpec(this.graph.getNextOpId());
     this.operatorSpec.registerNextOperatorSpec(opSpec);
-    otherStreams.forEach(other ->
-        ((MessageStreamImpl<M>) 
other).getOperatorSpec().registerNextOperatorSpec(opSpec));
+    otherStreams.forEach(other -> ((MessageStreamImpl<M>) 
other).getOperatorSpec().registerNextOperatorSpec(opSpec));
     return new MessageStreamImpl<>(this.graph, opSpec);
   }
 

Reply via email to