[
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15341485#comment-15341485
]
ASF GitHub Bot commented on APEXMALHAR-2106:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/309#discussion_r67841160
--- Diff:
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.util.ArrayList;
+import java.util.Queue;
+
+import org.eclipse.jetty.util.ArrayQueue;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+/**
+ * A helper class that adds functionality to bypass the platform
limitations of combining more than two
+ * streams at a time with Stream Merger.
+ *
+ * Usage:
+ *
+ * dag.addOperator("Stream_1", op1);
+ * dag.addOperator("Stream_2", op2);
+ * dag.addOperator("Stream_3", op3);
+ *
+ * MultipleStreamMerger merger = new MultipleStreamMerger();
+ * DefaultOutputPort streamOut = merger.merge(op1.out)
+ * .merge(op2.out)
+ * .merge(op3.out)
+ * .mergeStreams(dag, conf);
+ *
+ * dag.addStream("merger-counter", streamOut, counterOp.counter);
+ *
+ * @param <K>
+ */
+public class MultipleStreamMerger<K>
+{
+ public class Stream
+ {
+ DefaultInputPort<K> destPort;
+ SourcedOutputPort sourcePort;
+ String name;
+
+ public Stream(String name, SourcedOutputPort sourcePort,
DefaultInputPort<K> destPort)
+ {
+ this.destPort = destPort;
+ this.sourcePort = sourcePort;
+ this.name = name;
+ }
+ }
+
+ public class NamedMerger
+ {
+ StreamMerger<K> merger;
+ String name;
+
+ public NamedMerger(String name, StreamMerger<K> merger)
+ {
+ this.merger = merger;
+ this.name = name;
+ }
+ }
+
+ /**
+ * A simple class to allow us to track whether the port to be merged is
internal (allowing it to be thread local)
+ * or external
+ */
+ public class SourcedOutputPort
+ {
+ boolean internal;
+ DefaultOutputPort<K> port;
+
+ public SourcedOutputPort(DefaultOutputPort<K> port)
+ {
+ this.internal = false;
+ this.port = port;
+ }
+
+ public SourcedOutputPort(boolean internal, DefaultOutputPort<K> port)
+ {
+ this.internal = internal;
+ this.port = port;
+ }
+ }
+
+ ArrayList<DefaultOutputPort<K>> streamsToMerge = new ArrayList<>();
+
+ private DefaultOutputPort<K> streamOutput = new DefaultOutputPort<>();
+
+ /**
+ * Used to define all the sources to be merged into a single stream.
+ *
+ * @param sourcePort - The output port from the upstream operator that
provides data
+ * @return The updated MultipleStreamMerger object that tracks which
streams should be unified.
+ */
+ public MultipleStreamMerger<K> merge(DefaultOutputPort<K> sourcePort)
+ {
+ streamsToMerge.add(sourcePort);
+ return this;
+ }
+
+
+ /**
+ * Given the streams to merge have been selected with {@link
#merge(DefaultOutputPort)}, create a subDAG and add it
+ * to an existing DAG.
+ *
+ * To merge more than two streams at a time, we construct a tiered
hierarchy of thread-local StreamMerger operators
+ * E.g.
+ *
+ * Stream 0 ->
+ * StreamMerger_1 ->
+ * Stream 1 ->
+ * StreamMerger_Final -> Out
+ * Stream 2 ->
+ * StreamMerger_2 ->
+ * Stream 3 ->
+ * Note that we don't use the populateDAG function because that is only
used to flatten the
--- End diff --
Need to change the docs since it is no more a module.
> Support merging multiple streams with StreamMerger
> ---------------------------------------------------
>
> Key: APEXMALHAR-2106
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Reporter: Ilya Ganelin
> Assignee: Ilya Ganelin
>
> To properly implement the Flatten transformation (and other Stream
> combination operations), Apex must support merging data from multiple
> sources. The StreamMerger operator can be improved to merge multiple streams,
> rather than just the two streams it can handle in the present implementation.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)