[ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15319260#comment-15319260 ]
ASF GitHub Bot commented on APEXMALHAR-2106: -------------------------------------------- Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r66142732 --- Diff: library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java --- @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.stream; + +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Module; + +/** + * Module that adds functionality to bypass the platform limitations of combining more than two streams at a time with + * Stream Merger. + * + * Usage: + * + * dag.addOperator("Stream_1", op1); + * dag.addOperator("Stream_2", op2); + * dag.addOperator("Stream_3", op3); + * + * MultipleStreamMerger merger = new MultipleStreamMerger(); + * merger.merge(op1.out) + * .merge(op2.out) + * .merge(op3.out) + * .insertInto(dag, conf); + * + * dag.addModule("Merger", merger); + * + * @param <K> + */ +public class MultipleStreamMerger<K> implements Module +{ + public class Stream + { + DefaultInputPort destPort; + DefaultOutputPort sourcePort; + String name; + + public Stream(String name, DefaultOutputPort sourcePort, DefaultInputPort destPort) + { + this.destPort = destPort; + this.sourcePort = sourcePort; + this.name = name; + } + } + + public class NamedMerger + { + StreamMerger<K> merger; + String name; + + public NamedMerger(String name, StreamMerger<K> merger) + { + this.merger = merger; + this.name = name; + } + } + + private int streamCount = 0; + + ArrayList<DefaultOutputPort<K>> streamsToMerge = new ArrayList<>(); + + public transient ProxyOutputPort<K> streamOutput = new ProxyOutputPort<>(); + + /** + * Used to define all the sources to be merged into a single stream. + * + * @param sourcePort - The output port from the upstream operator that provides data + * @return The updated MultipleStreamMerger object that tracks which streams should be unified. + */ + public MultipleStreamMerger<K> merge(DefaultOutputPort<K> sourcePort) + { + streamsToMerge.add(sourcePort); + return this; + } + + /** + * To merge more than two streams at a time, we construct a binary tree of thread-local StreamMerger operators + * E.g. + * + * Tier 0 Tier 1 Tier 2 + * + * Stream 1 -> + * StreamMerger_1 -> + * Stream 2 -> + * StreamMerger_Final -> Out + * Stream 3 -> + * StreamMerger_2 -> + * Stream 4 -> + * + * This function updates the provided DAG with the relevant streams. + */ + public void mergeStreams(DAG dag, Configuration conf) + { + if (streamsToMerge.size() < 2) { + throw new IllegalArgumentException("Not enough streams to merge, at least two streams must be selected for " + + "merging with `.merge()`."); + } + + ArrayList<Stream> streamsToAddToDag = new ArrayList<>(); + ArrayList<NamedMerger> operatorsToAdd = new ArrayList<>(); + + // Determine operators and streams to add to the DAG + constructMergeTree(streamsToAddToDag, operatorsToAdd); + + for (NamedMerger m : operatorsToAdd) { + dag.addOperator(m.name, m.merger); + } + + for (Stream s : streamsToAddToDag) { + dag.addStream(s.name, s.sourcePort, s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL); + } + } + + /** + * Given a set of streams to be merged (defined via {@link #merge(DefaultOutputPort)}), compute the optimal + * structure of cascading mergers that need to be instantiated, added to the dag, and linked together. + * @param streamsToAddToDag - (output) A list that is populated with streams that should be added to the DAG + * @param operatorsToAdd - (output) A list that is populated with operators to be added to the DAG + */ + public void constructMergeTree( + ArrayList<Stream> streamsToAddToDag, + ArrayList<NamedMerger> operatorsToAdd) + { + if (streamsToMerge.size() < 2) { + throw new IllegalArgumentException("Not enough streams to merge. Ensure `.merge` was called for each stream " + + "to be added."); + } + + // Define the final merger in the sequence and connect its output to the module's output + StreamMerger<K> finalMerger = new StreamMerger<>(); + operatorsToAdd.add(new NamedMerger("Merger_Final", finalMerger)); + streamOutput.set(finalMerger.out); + + ArrayList<ArrayList<StreamMerger<K>>> mergers = new ArrayList<>(); + + /** + * First, calculate the number of tiers we need to merge all streams given that each merger can only merge two + * streams at a time. + */ + int numTiers = (int)Math.ceil(Math.log(streamsToMerge.size()) / Math.log(2)); + + // Handle the simple case where we only have a single tier (only two streams to merge) + if (numTiers == 1) { + assert (streamsToMerge.size() == 2); + streamsToAddToDag.add(new Stream("FinalMerge_Stream_0", streamsToMerge.get(0), finalMerger.data1)); + streamsToAddToDag.add(new Stream("FinalMerge_Stream_1", streamsToMerge.get(1), finalMerger.data2)); + + // We don't need to add any operators since we've already added the final merger + } else { + Iterator<DefaultOutputPort<K>> streams = streamsToMerge.iterator(); + + // When assigning streams, we will switch between ports 1 and 2 as we use successive mergers. + boolean usePort1; + + // For each tier, create the mergers in that tier, and connect the relevant streams + for (int i = 0; i < numTiers - 1; i++) { + int streamIdx = 0; + usePort1 = true; + + int numMergers = (int)Math.ceil(streamsToMerge.size() / Math.pow(2, i + 1)); --- End diff -- Should this be floor instead of ceil? The one odd stream can be directly fed to the final tier. > Support merging multiple streams with StreamMerger > --------------------------------------------------- > > Key: APEXMALHAR-2106 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106 > Project: Apache Apex Malhar > Issue Type: New Feature > Reporter: Ilya Ganelin > Assignee: Ilya Ganelin > > To properly implement the Flatten transformation (and other Stream > combination operations), Apex must support merging data from multiple > sources. The StreamMerger operator can be improved to merge multiple streams, > rather than just the two streams it can handle in the present implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)