[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15319304#comment-15319304
 ] 

ASF GitHub Bot commented on APEXMALHAR-2106:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/309#discussion_r66145536
  
    --- Diff: 
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
    @@ -0,0 +1,277 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.stream;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +
    +import org.apache.hadoop.conf.Configuration;
    +
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.api.Module;
    +
    +/**
    + * Module that adds functionality to bypass the platform limitations of 
combining more than two streams at a time with
    + * Stream Merger.
    + *
    + * Usage:
    + *
    + * dag.addOperator("Stream_1", op1);
    + * dag.addOperator("Stream_2", op2);
    + * dag.addOperator("Stream_3", op3);
    + *
    + * MultipleStreamMerger merger = new MultipleStreamMerger();
    + * merger.merge(op1.out)
    + * .merge(op2.out)
    + * .merge(op3.out)
    + * .insertInto(dag, conf);
    + *
    + * dag.addModule("Merger", merger);
    + *
    + * @param <K>
    + */
    +public class MultipleStreamMerger<K> implements Module
    +{
    +  public class Stream
    +  {
    +    DefaultInputPort destPort;
    +    DefaultOutputPort sourcePort;
    +    String name;
    +
    +    public Stream(String name, DefaultOutputPort sourcePort, 
DefaultInputPort destPort)
    +    {
    +      this.destPort = destPort;
    +      this.sourcePort = sourcePort;
    +      this.name = name;
    +    }
    +  }
    +
    +  public class NamedMerger
    +  {
    +    StreamMerger<K> merger;
    +    String name;
    +
    +    public NamedMerger(String name, StreamMerger<K> merger)
    +    {
    +      this.merger = merger;
    +      this.name = name;
    +    }
    +  }
    +
    +  private int streamCount = 0;
    +
    +  ArrayList<DefaultOutputPort<K>> streamsToMerge = new ArrayList<>();
    +
    +  public transient ProxyOutputPort<K> streamOutput = new 
ProxyOutputPort<>();
    +
    +  /**
    +   * Used to define all the sources to be merged into a single stream.
    +   *
    +   * @param sourcePort - The output port from the upstream operator that 
provides data
    +   * @return The updated MultipleStreamMerger object that tracks which 
streams should be unified.
    +   */
    +  public MultipleStreamMerger<K> merge(DefaultOutputPort<K> sourcePort)
    +  {
    +    streamsToMerge.add(sourcePort);
    +    return this;
    +  }
    +
    +  /**
    +   * To merge more than two streams at a time, we construct a binary tree 
of thread-local StreamMerger operators
    +   * E.g.
    +   *
    +   * Tier 0          Tier 1              Tier 2
    +   *
    +   * Stream 1 ->
    +   * StreamMerger_1 ->
    +   * Stream 2 ->
    +   * StreamMerger_Final -> Out
    +   * Stream 3 ->
    +   * StreamMerger_2 ->
    +   * Stream 4 ->
    +   *
    +   * This function updates the provided DAG with the relevant streams.
    +   */
    +  public void mergeStreams(DAG dag, Configuration conf)
    +  {
    +    if (streamsToMerge.size() < 2) {
    +      throw new IllegalArgumentException("Not enough streams to merge, at 
least two streams must be selected for " +
    +          "merging with `.merge()`.");
    +    }
    +
    +    ArrayList<Stream> streamsToAddToDag = new ArrayList<>();
    +    ArrayList<NamedMerger> operatorsToAdd = new ArrayList<>();
    +
    +    // Determine operators and streams to add to the DAG
    +    constructMergeTree(streamsToAddToDag, operatorsToAdd);
    +
    +    for (NamedMerger m : operatorsToAdd) {
    +      dag.addOperator(m.name, m.merger);
    +    }
    +
    +    for (Stream s : streamsToAddToDag) {
    +      dag.addStream(s.name, s.sourcePort, 
s.destPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
    --- End diff --
    
    Making all streams container local will force all operators upstream to the 
merger as well as down stream to the merger also to be in the same container. 
Only the streams which are within the merger (the ones between two tiers of the 
mergers ) must be made CONTAINER_LOCAL. Even THREAD_LOCAL should work here.


> Support merging multiple streams with StreamMerger 
> ---------------------------------------------------
>
>                 Key: APEXMALHAR-2106
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>            Reporter: Ilya Ganelin
>            Assignee: Ilya Ganelin
>
> To properly implement the Flatten transformation (and other Stream 
> combination operations), Apex must support merging data from multiple 
> sources. The StreamMerger operator can be improved to merge multiple streams, 
> rather than just the two streams it can handle in the present implementation. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to