[
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15319288#comment-15319288
]
ASF GitHub Bot commented on APEXMALHAR-2106:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/309#discussion_r66144530
--- Diff:
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Module;
+
+/**
+ * Module that adds functionality to bypass the platform limitations of
combining more than two streams at a time with
+ * Stream Merger.
+ *
+ * Usage:
+ *
+ * dag.addOperator("Stream_1", op1);
+ * dag.addOperator("Stream_2", op2);
+ * dag.addOperator("Stream_3", op3);
+ *
+ * MultipleStreamMerger merger = new MultipleStreamMerger();
+ * merger.merge(op1.out)
+ * .merge(op2.out)
+ * .merge(op3.out)
+ * .insertInto(dag, conf);
+ *
+ * dag.addModule("Merger", merger);
+ *
+ * @param <K>
+ */
+public class MultipleStreamMerger<K> implements Module
+{
+ public class Stream
+ {
+ DefaultInputPort destPort;
+ DefaultOutputPort sourcePort;
+ String name;
+
+ public Stream(String name, DefaultOutputPort sourcePort,
DefaultInputPort destPort)
+ {
+ this.destPort = destPort;
+ this.sourcePort = sourcePort;
+ this.name = name;
+ }
+ }
+
+ public class NamedMerger
+ {
+ StreamMerger<K> merger;
+ String name;
+
+ public NamedMerger(String name, StreamMerger<K> merger)
+ {
+ this.merger = merger;
+ this.name = name;
+ }
+ }
+
+ private int streamCount = 0;
+
+ ArrayList<DefaultOutputPort<K>> streamsToMerge = new ArrayList<>();
+
+ public transient ProxyOutputPort<K> streamOutput = new
ProxyOutputPort<>();
+
+ /**
+ * Used to define all the sources to be merged into a single stream.
+ *
+ * @param sourcePort - The output port from the upstream operator that
provides data
+ * @return The updated MultipleStreamMerger object that tracks which
streams should be unified.
+ */
+ public MultipleStreamMerger<K> merge(DefaultOutputPort<K> sourcePort)
+ {
+ streamsToMerge.add(sourcePort);
+ return this;
+ }
+
+ /**
+ * To merge more than two streams at a time, we construct a binary tree
of thread-local StreamMerger operators
+ * E.g.
+ *
+ * Tier 0 Tier 1 Tier 2
+ *
+ * Stream 1 ->
+ * StreamMerger_1 ->
+ * Stream 2 ->
+ * StreamMerger_Final -> Out
+ * Stream 3 ->
+ * StreamMerger_2 ->
+ * Stream 4 ->
+ *
+ * This function updates the provided DAG with the relevant streams.
+ */
+ public void mergeStreams(DAG dag, Configuration conf)
+ {
+ if (streamsToMerge.size() < 2) {
+ throw new IllegalArgumentException("Not enough streams to merge, at
least two streams must be selected for " +
+ "merging with `.merge()`.");
+ }
+
+ ArrayList<Stream> streamsToAddToDag = new ArrayList<>();
+ ArrayList<NamedMerger> operatorsToAdd = new ArrayList<>();
+
+ // Determine operators and streams to add to the DAG
+ constructMergeTree(streamsToAddToDag, operatorsToAdd);
+
+ for (NamedMerger m : operatorsToAdd) {
+ dag.addOperator(m.name, m.merger);
+ }
+
+ for (Stream s : streamsToAddToDag) {
+ dag.addStream(s.name, s.sourcePort,
s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
+ }
+ }
+
+ /**
+ * Given a set of streams to be merged (defined via {@link
#merge(DefaultOutputPort)}), compute the optimal
+ * structure of cascading mergers that need to be instantiated, added to
the dag, and linked together.
+ * @param streamsToAddToDag - (output) A list that is populated with
streams that should be added to the DAG
+ * @param operatorsToAdd - (output) A list that is populated with
operators to be added to the DAG
+ */
+ public void constructMergeTree(
+ ArrayList<Stream> streamsToAddToDag,
+ ArrayList<NamedMerger> operatorsToAdd)
+ {
+ if (streamsToMerge.size() < 2) {
+ throw new IllegalArgumentException("Not enough streams to merge.
Ensure `.merge` was called for each stream " +
+ "to be added.");
+ }
+
+ // Define the final merger in the sequence and connect its output to
the module's output
+ StreamMerger<K> finalMerger = new StreamMerger<>();
+ operatorsToAdd.add(new NamedMerger("Merger_Final", finalMerger));
+ streamOutput.set(finalMerger.out);
+
+ ArrayList<ArrayList<StreamMerger<K>>> mergers = new ArrayList<>();
+
+ /**
+ * First, calculate the number of tiers we need to merge all streams
given that each merger can only merge two
+ * streams at a time.
+ */
+ int numTiers = (int)Math.ceil(Math.log(streamsToMerge.size()) /
Math.log(2));
+
+ // Handle the simple case where we only have a single tier (only two
streams to merge)
+ if (numTiers == 1) {
+ assert (streamsToMerge.size() == 2);
+ streamsToAddToDag.add(new Stream("FinalMerge_Stream_0",
streamsToMerge.get(0), finalMerger.data1));
+ streamsToAddToDag.add(new Stream("FinalMerge_Stream_1",
streamsToMerge.get(1), finalMerger.data2));
+
+ // We don't need to add any operators since we've already added the
final merger
+ } else {
+ Iterator<DefaultOutputPort<K>> streams = streamsToMerge.iterator();
+
+ // When assigning streams, we will switch between ports 1 and 2 as
we use successive mergers.
+ boolean usePort1;
+
+ // For each tier, create the mergers in that tier, and connect the
relevant streams
+ for (int i = 0; i < numTiers - 1; i++) {
+ int streamIdx = 0;
+ usePort1 = true;
+
+ int numMergers = (int)Math.ceil(streamsToMerge.size() /
Math.pow(2, i + 1));
+
+ ArrayList<StreamMerger<K>> mergersTierI = new
ArrayList<>(numMergers);
+
+ // For each merger in the tier, assign the appropriate streams to
that merger
+ for (int mergerIdx = 0; mergerIdx < numMergers; mergerIdx++) {
+ StreamMerger<K> merger = new StreamMerger<>();
+ operatorsToAdd.add(new NamedMerger("Merger_Tier_" + i + "_#_" +
mergerIdx, merger));
+
+ // Each operator has two ports so add a simple inner loop
+ for (int port = 0; port < 2; port++) {
+ /**
+ * Assign streams. At the first tier, we assign the streams
from "streamsToMerger". On successive tiers,
+ * we assign streams from the previous tier.
+ */
+ DefaultInputPort<K> mergerInputPort = usePort1 ? merger.data1
: merger.data2;
+ DefaultOutputPort<K> nextStream = new DefaultOutputPort<>();
+ String name = "";
+
+ usePort1 = !usePort1;
+
+ // Process first tier
+ if (i == 0) {
+ if (streams.hasNext()) {
+ nextStream = streams.next();
+ name = "Stream_" + streamIdx + " -> Tier_" + i +
"_Merger_" + mergerIdx;
+ streamsToAddToDag.add(new Stream(name, nextStream,
mergerInputPort));
+ }
+ } else {
+ // Process subsequent tiers
+ ArrayList<StreamMerger<K>> previousTier = mergers.get(i - 1);
+ if (streamIdx < previousTier.size()) {
+ nextStream = previousTier.get(streamIdx).out;
+ name = "Tier" + (i - 1) + "_Stream_" + streamIdx + " ->
Tier_" + i;
+ streamsToAddToDag.add(new Stream(name, nextStream,
mergerInputPort));
+ }
+ }
+
+ streamIdx++;
+ }
+
+ mergersTierI.add(merger);
+ } // End tier loop
+
+ mergers.add(mergersTierI);
+ } // End cross-tier loop
+
+ // We've now added streams connecting the input streams to cascading
mergers. Lastly, we need to connect the final
+ // tier to the output stream merger.
+
+ ArrayList<StreamMerger<K>> finalTier = mergers.get(mergers.size() -
1);
+
+ // If we're here, we're guaranteed to have had more than 2 streams
to merge, so the final tier must contain
+ // two merge operators
+ assert (finalTier.size() == 2);
+
+ streamsToAddToDag.add(new Stream("FinalMerge_0",
finalTier.get(0).out, finalMerger.data1));
+ streamsToAddToDag.add(new Stream("FinalMerge_1",
finalTier.get(1).out, finalMerger.data2));
+ }
+ }
+
+ /**
+ * Given the streams to merge have been selected with {@link
#merge(DefaultOutputPort)}, create a subDAG and add it
+ * to an existing DAG.
+ *
+ * To merge more than two streams at a time, we construct a tiered
hierarchy of thread-local StreamMerger operators
+ * E.g.
+ *
+ * Stream 0 ->
+ * StreamMerger_1 ->
+ * Stream 1 ->
+ * StreamMerger_Final -> Out
+ * Stream 2 ->
+ * StreamMerger_2 ->
+ * Stream 3 ->
+ *
+ * @param dag
+ * @param conf Note that we don't use the populateDAG function because
that is only used to flatten the
+ * module when what we really need to do is to define the
connections to the parent DAG.
+ * The populateDAG does not contain operators outside of the
module and thus cannot connect
+ * to the external operators.
+ */
+ public void insertInto(DAG dag, Configuration conf)
--- End diff --
Is this needed? Can you directly call mergeStreams() from populateDag()?
> Support merging multiple streams with StreamMerger
> ---------------------------------------------------
>
> Key: APEXMALHAR-2106
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Reporter: Ilya Ganelin
> Assignee: Ilya Ganelin
>
> To properly implement the Flatten transformation (and other Stream
> combination operations), Apex must support merging data from multiple
> sources. The StreamMerger operator can be improved to merge multiple streams,
> rather than just the two streams it can handle in the present implementation.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)