Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/309#discussion_r66146655
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
    @@ -0,0 +1,277 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.stream;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Module;
    +
    +/**
    + * Module that adds functionality to bypass the platform limitations of 
combining more than two streams at a time with
    + * Stream Merger.
    + *
    + * Usage:
    + *
    + * dag.addOperator("Stream_1", op1);
    + * dag.addOperator("Stream_2", op2);
    + * dag.addOperator("Stream_3", op3);
    + *
    + * MultipleStreamMerger merger = new MultipleStreamMerger();
    + * merger.merge(op1.out)
    + * .merge(op2.out)
    + * .merge(op3.out)
    + * .insertInto(dag, conf);
    + *
    + * dag.addModule("Merger", merger);
    + *
    + * @param <K>
    + */
    +public class MultipleStreamMerger<K> implements Module
    +{
    +  public class Stream
    +  {
    +    DefaultInputPort destPort;
    +    DefaultOutputPort sourcePort;
    +    String name;
    +
    +    public Stream(String name, DefaultOutputPort sourcePort, 
DefaultInputPort destPort)
    +    {
    +      this.destPort = destPort;
    +      this.sourcePort = sourcePort;
    +      this.name = name;
    +    }
    +  }
    +
    +  public class NamedMerger
    +  {
    +    StreamMerger<K> merger;
    +    String name;
    +
    +    public NamedMerger(String name, StreamMerger<K> merger)
    +    {
    +      this.merger = merger;
    +      this.name = name;
    +    }
    +  }
    +
    +  private int streamCount = 0;
    +
    +  ArrayList<DefaultOutputPort<K>> streamsToMerge = new ArrayList<>();
    +
    +  public transient ProxyOutputPort<K> streamOutput = new 
ProxyOutputPort<>();
    +
    +  /**
    +   * Used to define all the sources to be merged into a single stream.
    +   *
    +   * @param sourcePort - The output port from the upstream operator that 
provides data
    +   * @return The updated MultipleStreamMerger object that tracks which 
streams should be unified.
    +   */
    +  public MultipleStreamMerger<K> merge(DefaultOutputPort<K> sourcePort)
    +  {
    +    streamsToMerge.add(sourcePort);
    +    return this;
    +  }
    +
    +  /**
    +   * To merge more than two streams at a time, we construct a binary tree 
of thread-local StreamMerger operators
    +   * E.g.
    +   *
    +   * Tier 0          Tier 1              Tier 2
    +   *
    +   * Stream 1 ->
    +   * StreamMerger_1 ->
    +   * Stream 2 ->
    +   * StreamMerger_Final -> Out
    +   * Stream 3 ->
    +   * StreamMerger_2 ->
    +   * Stream 4 ->
    +   *
    +   * This function updates the provided DAG with the relevant streams.
    +   */
    +  public void mergeStreams(DAG dag, Configuration conf)
    +  {
    +    if (streamsToMerge.size() < 2) {
    +      throw new IllegalArgumentException("Not enough streams to merge, at 
least two streams must be selected for " +
    +          "merging with `.merge()`.");
    +    }
    +
    +    ArrayList<Stream> streamsToAddToDag = new ArrayList<>();
    +    ArrayList<NamedMerger> operatorsToAdd = new ArrayList<>();
    +
    +    // Determine operators and streams to add to the DAG
    +    constructMergeTree(streamsToAddToDag, operatorsToAdd);
    +
    +    for (NamedMerger m : operatorsToAdd) {
    +      dag.addOperator(m.name, m.merger);
    +    }
    +
    +    for (Stream s : streamsToAddToDag) {
    +      dag.addStream(s.name, s.sourcePort, 
s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
    +    }
    +  }
    +
    +  /**
    +   * Given a set of streams to be merged (defined via {@link 
#merge(DefaultOutputPort)}), compute the optimal
    +   * structure of cascading mergers that need to be instantiated, added to 
the dag, and linked together.
    +   * @param streamsToAddToDag - (output)  A list that is populated with 
streams that should be added to the  DAG
    +   * @param operatorsToAdd - (output) A list that is populated with 
operators to be added to the DAG
    +   */
    +  public void constructMergeTree(
    +      ArrayList<Stream> streamsToAddToDag,
    +      ArrayList<NamedMerger> operatorsToAdd)
    +  {
    +    if (streamsToMerge.size() < 2) {
    +      throw new IllegalArgumentException("Not enough streams to merge. 
Ensure `.merge` was called for each stream " +
    +          "to be added.");
    +    }
    +
    +    // Define the final merger in the sequence and connect its output to 
the module's output
    +    StreamMerger<K> finalMerger = new StreamMerger<>();
    +    operatorsToAdd.add(new NamedMerger("Merger_Final", finalMerger));
    +    streamOutput.set(finalMerger.out);
    +
    +    ArrayList<ArrayList<StreamMerger<K>>> mergers = new ArrayList<>();
    +
    +    /**
    +     * First, calculate the number of tiers we need to merge all streams 
given that each merger can only merge two
    +     * streams at a time.
    +     */
    +    int numTiers = (int)Math.ceil(Math.log(streamsToMerge.size()) / 
Math.log(2));
    +
    +    // Handle the simple case where we only have a single tier (only two 
streams to merge)
    +    if (numTiers == 1) {
    +      assert (streamsToMerge.size() == 2);
    +      streamsToAddToDag.add(new Stream("FinalMerge_Stream_0", 
streamsToMerge.get(0), finalMerger.data1));
    +      streamsToAddToDag.add(new Stream("FinalMerge_Stream_1", 
streamsToMerge.get(1), finalMerger.data2));
    +
    +      // We don't need to add any operators since we've already added the 
final merger
    +    } else {
    +      Iterator<DefaultOutputPort<K>> streams = streamsToMerge.iterator();
    +
    +      // When assigning streams, we will switch between ports 1 and 2 as 
we use successive mergers.
    +      boolean usePort1;
    +
    +      // For each tier, create the mergers in that tier, and connect the 
relevant streams
    +      for (int i = 0; i < numTiers - 1; i++) {
    --- End diff --
    
    A suggestion: Can the overall logic in this loop be simplified? This will 
help understand the code better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to