Repository: samza Updated Branches: refs/heads/master f8cce6e15 -> 7b0a65b14
SAMZA-1292: Merge operator can be no-op when there are no streams to merge Author: Prateek Maheshwari <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #320 from prateekm/small-fixes Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7b0a65b1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7b0a65b1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7b0a65b1 Branch: refs/heads/master Commit: 7b0a65b140dbd5e02a783b53e4808b1d8d27e00e Parents: f8cce6e Author: Prateek Maheshwari <[email protected]> Authored: Mon Oct 9 14:48:05 2017 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Mon Oct 9 14:48:05 2017 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/samza/operators/MessageStreamImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7b0a65b1/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 8460ada..dc91d19 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -129,10 +129,10 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) { + if (otherStreams.isEmpty()) return this; StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(this.graph.getNextOpId()); this.operatorSpec.registerNextOperatorSpec(opSpec); - otherStreams.forEach(other -> - ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(opSpec)); + otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(opSpec)); return new MessageStreamImpl<>(this.graph, opSpec); }
