[
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15319319#comment-15319319
]
ASF GitHub Bot commented on APEXMALHAR-2106:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/309#discussion_r66146382
--- Diff:
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Module;
+
+/**
+ * Module that adds functionality to bypass the platform limitations of
combining more than two streams at a time with
+ * Stream Merger.
+ *
+ * Usage:
+ *
+ * dag.addOperator("Stream_1", op1);
+ * dag.addOperator("Stream_2", op2);
+ * dag.addOperator("Stream_3", op3);
+ *
+ * MultipleStreamMerger merger = new MultipleStreamMerger();
+ * merger.merge(op1.out)
+ * .merge(op2.out)
+ * .merge(op3.out)
+ * .insertInto(dag, conf);
+ *
+ * dag.addModule("Merger", merger);
+ *
+ * @param <K>
+ */
+public class MultipleStreamMerger<K> implements Module
+{
+ public class Stream
+ {
+ DefaultInputPort destPort;
+ DefaultOutputPort sourcePort;
+ String name;
+
+ public Stream(String name, DefaultOutputPort sourcePort,
DefaultInputPort destPort)
+ {
+ this.destPort = destPort;
+ this.sourcePort = sourcePort;
+ this.name = name;
+ }
+ }
+
+ public class NamedMerger
+ {
+ StreamMerger<K> merger;
+ String name;
+
+ public NamedMerger(String name, StreamMerger<K> merger)
+ {
+ this.merger = merger;
+ this.name = name;
+ }
+ }
+
+ private int streamCount = 0;
+
+ ArrayList<DefaultOutputPort<K>> streamsToMerge = new ArrayList<>();
+
+ public transient ProxyOutputPort<K> streamOutput = new
ProxyOutputPort<>();
+
+ /**
+ * Used to define all the sources to be merged into a single stream.
+ *
+ * @param sourcePort - The output port from the upstream operator that
provides data
+ * @return The updated MultipleStreamMerger object that tracks which
streams should be unified.
+ */
+ public MultipleStreamMerger<K> merge(DefaultOutputPort<K> sourcePort)
+ {
+ streamsToMerge.add(sourcePort);
+ return this;
+ }
+
+ /**
+ * To merge more than two streams at a time, we construct a binary tree
of thread-local StreamMerger operators
+ * E.g.
+ *
+ * Tier 0 Tier 1 Tier 2
+ *
+ * Stream 1 ->
+ * StreamMerger_1 ->
+ * Stream 2 ->
+ * StreamMerger_Final -> Out
+ * Stream 3 ->
+ * StreamMerger_2 ->
+ * Stream 4 ->
+ *
+ * This function updates the provided DAG with the relevant streams.
+ */
+ public void mergeStreams(DAG dag, Configuration conf)
+ {
+ if (streamsToMerge.size() < 2) {
+ throw new IllegalArgumentException("Not enough streams to merge, at
least two streams must be selected for " +
+ "merging with `.merge()`.");
+ }
+
+ ArrayList<Stream> streamsToAddToDag = new ArrayList<>();
+ ArrayList<NamedMerger> operatorsToAdd = new ArrayList<>();
+
+ // Determine operators and streams to add to the DAG
+ constructMergeTree(streamsToAddToDag, operatorsToAdd);
+
+ for (NamedMerger m : operatorsToAdd) {
+ dag.addOperator(m.name, m.merger);
+ }
+
+ for (Stream s : streamsToAddToDag) {
+ dag.addStream(s.name, s.sourcePort,
s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
+ }
+ }
+
+ /**
+ * Given a set of streams to be merged (defined via {@link
#merge(DefaultOutputPort)}), compute the optimal
+ * structure of cascading mergers that need to be instantiated, added to
the dag, and linked together.
+ * @param streamsToAddToDag - (output) A list that is populated with
streams that should be added to the DAG
+ * @param operatorsToAdd - (output) A list that is populated with
operators to be added to the DAG
+ */
+ public void constructMergeTree(
+ ArrayList<Stream> streamsToAddToDag,
+ ArrayList<NamedMerger> operatorsToAdd)
+ {
+ if (streamsToMerge.size() < 2) {
+ throw new IllegalArgumentException("Not enough streams to merge.
Ensure `.merge` was called for each stream " +
+ "to be added.");
+ }
+
+ // Define the final merger in the sequence and connect its output to
the module's output
+ StreamMerger<K> finalMerger = new StreamMerger<>();
+ operatorsToAdd.add(new NamedMerger("Merger_Final", finalMerger));
+ streamOutput.set(finalMerger.out);
+
+ ArrayList<ArrayList<StreamMerger<K>>> mergers = new ArrayList<>();
+
+ /**
+ * First, calculate the number of tiers we need to merge all streams
given that each merger can only merge two
+ * streams at a time.
+ */
+ int numTiers = (int)Math.ceil(Math.log(streamsToMerge.size()) /
Math.log(2));
+
+ // Handle the simple case where we only have a single tier (only two
streams to merge)
+ if (numTiers == 1) {
+ assert (streamsToMerge.size() == 2);
+ streamsToAddToDag.add(new Stream("FinalMerge_Stream_0",
streamsToMerge.get(0), finalMerger.data1));
+ streamsToAddToDag.add(new Stream("FinalMerge_Stream_1",
streamsToMerge.get(1), finalMerger.data2));
+
+ // We don't need to add any operators since we've already added the
final merger
+ } else {
+ Iterator<DefaultOutputPort<K>> streams = streamsToMerge.iterator();
+
+ // When assigning streams, we will switch between ports 1 and 2 as
we use successive mergers.
+ boolean usePort1;
+
+ // For each tier, create the mergers in that tier, and connect the
relevant streams
+ for (int i = 0; i < numTiers - 1; i++) {
+ int streamIdx = 0;
+ usePort1 = true;
+
+ int numMergers = (int)Math.ceil(streamsToMerge.size() /
Math.pow(2, i + 1));
+
+ ArrayList<StreamMerger<K>> mergersTierI = new
ArrayList<>(numMergers);
+
+ // For each merger in the tier, assign the appropriate streams to
that merger
+ for (int mergerIdx = 0; mergerIdx < numMergers; mergerIdx++) {
+ StreamMerger<K> merger = new StreamMerger<>();
+ operatorsToAdd.add(new NamedMerger("Merger_Tier_" + i + "_#_" +
mergerIdx, merger));
+
+ // Each operator has two ports so add a simple inner loop
+ for (int port = 0; port < 2; port++) {
--- End diff --
Do we need a loop here? Can we open up the loop?
> Support merging multiple streams with StreamMerger
> ---------------------------------------------------
>
> Key: APEXMALHAR-2106
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106
> Project: Apache Apex Malhar
> Issue Type: New Feature
> Reporter: Ilya Ganelin
> Assignee: Ilya Ganelin
>
> To properly implement the Flatten transformation (and other Stream
> combination operations), Apex must support merging data from multiple
> sources. The StreamMerger operator can be improved to merge multiple streams,
> rather than just the two streams it can handle in the present implementation.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)