sanha closed pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0
URL: https://github.com/apache/incubator-nemo/pull/91
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java 
b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 75a015bd1..f0ae953de 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -196,6 +196,7 @@ public static void launchDAG(final DAG dag) {
       LOG.warn("Interrupted: " + e);
       // clean up state...
       Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
     }
     LOG.info("DAG execution done");
   }
diff --git a/common/src/main/java/edu/snu/nemo/common/ContextImpl.java 
b/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
index bfe8c0655..df5809f05 100644
--- a/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
+++ b/common/src/main/java/edu/snu/nemo/common/ContextImpl.java
@@ -25,16 +25,18 @@
  */
 public final class ContextImpl implements Transform.Context {
   private final Map sideInputs;
-  private final Map<String, String> additionalTagOutputs;
+  private final Map<String, String> tagToAdditionalChildren;
   private String data;
 
   /**
    * Constructor of Context Implementation.
-   * @param sideInputs side inputs.
+   * @param sideInputs              side inputs.
+   * @param tagToAdditionalChildren tag id to additional vertices id map.
    */
-  public ContextImpl(final Map sideInputs, final Map additionalTagOutputs) {
+  public ContextImpl(final Map sideInputs,
+                     final Map<String, String> tagToAdditionalChildren) {
     this.sideInputs = sideInputs;
-    this.additionalTagOutputs = additionalTagOutputs;
+    this.tagToAdditionalChildren = tagToAdditionalChildren;
     this.data = null;
   }
 
@@ -44,8 +46,8 @@ public Map getSideInputs() {
   }
 
   @Override
-  public Map<String, String> getAdditionalTagOutputs() {
-    return this.additionalTagOutputs;
+  public Map<String, String> getTagToAdditionalChildren() {
+    return this.tagToAdditionalChildren;
   }
 
   @Override
diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
index af4e41cce..5a2ad41dc 100644
--- 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
@@ -41,7 +41,7 @@
    * Constructor for dynamic optimization vertex.
    */
   public MetricCollectionBarrierVertex() {
-    this.metricData = null;
+    this.metricData = new HashMap<>();
     this.blockIds = new ArrayList<>();
     this.dagSnapshot = null;
   }
diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
index 871d08b68..47fa6c81f 100644
--- 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
@@ -61,7 +61,7 @@ default Object getTag() {
      * @return sideInputs.
      */
     Map getSideInputs();
-    Map<String, String> getAdditionalTagOutputs();
+    Map<String, String> getTagToAdditionalChildren();
 
     /**
      * Put serialized data to send to the executor.
diff --git a/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java 
b/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java
index aaf71517a..149cdd13a 100644
--- a/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java
+++ b/common/src/test/java/edu/snu/nemo/common/ContextImplTest.java
@@ -20,8 +20,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -33,7 +32,7 @@
 public class ContextImplTest {
   private Transform.Context context;
   private final Map sideInputs = new HashMap();
-  private final Map taggedOutputs = new HashMap();
+  private final Map<String, String> taggedOutputs = new HashMap();
 
   @Before
   public void setUp() {
@@ -44,7 +43,7 @@ public void setUp() {
   @Test
   public void testContextImpl() {
     assertEquals(this.sideInputs, this.context.getSideInputs());
-    assertEquals(this.taggedOutputs, this.context.getAdditionalTagOutputs());
+    assertEquals(this.taggedOutputs, 
this.context.getTagToAdditionalChildren());
 
     final String sampleText = "test_text";
 
diff --git 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamKeyExtractor.java
 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamKeyExtractor.java
index f82fd83ee..8e1bf271b 100644
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamKeyExtractor.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamKeyExtractor.java
@@ -26,7 +26,9 @@
   @Override
   public Object extractKey(final Object element) {
     if (element instanceof KV) {
-      return ((KV) element).getKey();
+      // Handle null keys, since Beam allows KV with null keys.
+      final Object key = ((KV) element).getKey();
+      return key == null ? 0 : key;
     } else {
       return element;
     }
diff --git 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
index 35370b74e..38b07a3a5 100644
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
@@ -42,10 +42,8 @@
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
+import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * Visits every node in the beam dag to translate the BEAM program to the IR.
@@ -57,7 +55,9 @@
   // loopVertexStack keeps track of where the beam program is: whether it is 
inside a composite transform or it is not.
   private final Stack<LoopVertex> loopVertexStack;
   private final Map<PValue, Pair<BeamEncoderFactory, BeamDecoderFactory>> 
pValueToCoder;
+  private final Map<IRVertex, Pair<BeamEncoderFactory, BeamDecoderFactory>> 
sideInputCoder;
   private final Map<PValue, TupleTag> pValueToTag;
+  private final Map<IRVertex, Set<PValue>> additionalInputs;
 
   /**
    * Constructor of the BEAM Visitor.
@@ -71,7 +71,9 @@ public NemoPipelineVisitor(final DAGBuilder<IRVertex, IREdge> 
builder, final Pip
     this.options = options;
     this.loopVertexStack = new Stack<>();
     this.pValueToCoder = new HashMap<>();
+    this.sideInputCoder = new HashMap<>();
     this.pValueToTag = new HashMap<>();
+    this.additionalInputs = new HashMap<>();
   }
 
   @Override
@@ -97,14 +99,16 @@ public void visitPrimitiveTransform(final 
TransformHierarchy.Node beamNode) {
 //    Print if needed for development
 //    LOG.info("visitp " + beamNode.getTransform());
     final IRVertex irVertex =
-        convertToVertex(beamNode, builder, pValueToVertex, pValueToCoder, 
pValueToTag, options, loopVertexStack);
+        convertToVertex(beamNode, builder, pValueToVertex, sideInputCoder, 
pValueToTag, additionalInputs,
+            options, loopVertexStack);
     beamNode.getOutputs().values().stream().filter(v -> v instanceof 
PCollection).map(v -> (PCollection) v)
         .forEach(output -> pValueToCoder.put(output,
             Pair.of(new BeamEncoderFactory(output.getCoder()), new 
BeamDecoderFactory(output.getCoder()))));
 
     beamNode.getOutputs().values().forEach(output -> 
pValueToVertex.put(output, irVertex));
-
+    final Set<PValue> additionalInputsForThisVertex = 
additionalInputs.getOrDefault(irVertex, new HashSet<>());
     beamNode.getInputs().values().stream().filter(pValueToVertex::containsKey)
+        .filter(pValue -> !additionalInputsForThisVertex.contains(pValue))
         .forEach(pValue -> {
           final boolean isAdditionalOutput = pValueToTag.containsKey(pValue);
           final IRVertex src = pValueToVertex.get(pValue);
@@ -124,23 +128,25 @@ public void visitPrimitiveTransform(final 
TransformHierarchy.Node beamNode) {
   /**
    * Convert Beam node to IR vertex.
    *
-   * @param beamNode        input beam node.
-   * @param builder         the DAG builder to add the vertex to.
-   * @param pValueToVertex  PValue to Vertex map.
-   * @param pValueToCoder   PValue to EncoderFactory and DecoderFactory map.
-   * @param pValueToTag     PValue to Tag map.
-   * @param options         pipeline options.
-   * @param loopVertexStack Stack to get the current loop vertex that the 
operator vertex will be assigned to.
-   * @param <I>             input type.
-   * @param <O>             output type.
+   * @param beamNode         input beam node.
+   * @param builder          the DAG builder to add the vertex to.
+   * @param pValueToVertex   PValue to Vertex map.
+   * @param sideInputCoder   Side input EncoderFactory and DecoderFactory map.
+   * @param pValueToTag      PValue to Tag map.
+   * @param additionalInputs additional inputs.
+   * @param options          pipeline options.
+   * @param loopVertexStack  Stack to get the current loop vertex that the 
operator vertex will be assigned to.
+   * @param <I>              input type.
+   * @param <O>              output type.
    * @return newly created vertex.
    */
   private static <I, O> IRVertex
   convertToVertex(final TransformHierarchy.Node beamNode,
                   final DAGBuilder<IRVertex, IREdge> builder,
                   final Map<PValue, IRVertex> pValueToVertex,
-                  final Map<PValue, Pair<BeamEncoderFactory, 
BeamDecoderFactory>> pValueToCoder,
+                  final Map<IRVertex, Pair<BeamEncoderFactory, 
BeamDecoderFactory>> sideInputCoder,
                   final Map<PValue, TupleTag> pValueToTag,
+                  final Map<IRVertex, Set<PValue>> additionalInputs,
                   final PipelineOptions options,
                   final Stack<LoopVertex> loopVertexStack) {
     final PTransform beamTransform = beamNode.getTransform();
@@ -166,7 +172,7 @@ public void visitPrimitiveTransform(final 
TransformHierarchy.Node beamNode) {
           .orElseThrow(() -> new RuntimeException("No inputs provided to " + 
beamNode.getFullName())).getCoder();
       beamNode.getOutputs().values().stream()
           .forEach(output ->
-              pValueToCoder.put(output, 
getCoderPairForView(view.getView().getViewFn(), beamInputCoder)));
+              sideInputCoder.put(irVertex, 
getCoderPairForView(view.getView().getViewFn(), beamInputCoder)));
     } else if (beamTransform instanceof Window) {
       final Window<I> window = (Window<I>) beamTransform;
       final WindowTransform transform = new 
WindowTransform(window.getWindowFn());
@@ -181,19 +187,22 @@ public void visitPrimitiveTransform(final 
TransformHierarchy.Node beamNode) {
       final ParDo.SingleOutput<I, O> parDo = (ParDo.SingleOutput<I, O>) 
beamTransform;
       final DoTransform transform = new DoTransform(parDo.getFn(), options);
       irVertex = new OperatorVertex(transform);
+      additionalInputs.put(irVertex, 
parDo.getAdditionalInputs().values().stream().collect(Collectors.toSet()));
       builder.addVertex(irVertex, loopVertexStack);
-      connectSideInputs(builder, parDo.getSideInputs(), pValueToVertex, 
pValueToCoder, irVertex);
+      connectSideInputs(builder, parDo.getSideInputs(), pValueToVertex, 
sideInputCoder, irVertex);
     } else if (beamTransform instanceof ParDo.MultiOutput) {
       final ParDo.MultiOutput<I, O> parDo = (ParDo.MultiOutput<I, O>) 
beamTransform;
       final DoTransform transform = new DoTransform(parDo.getFn(), options);
       irVertex = new OperatorVertex(transform);
+      additionalInputs.put(irVertex, 
parDo.getAdditionalInputs().values().stream().collect(Collectors.toSet()));
       if (parDo.getAdditionalOutputTags().size() > 0) {
+        // Store PValue to additional tag id mapping.
         beamNode.getOutputs().entrySet().stream()
             .filter(kv -> !kv.getKey().equals(parDo.getMainOutputTag()))
             .forEach(kv -> pValueToTag.put(kv.getValue(), kv.getKey()));
       }
       builder.addVertex(irVertex, loopVertexStack);
-      connectSideInputs(builder, parDo.getSideInputs(), pValueToVertex, 
pValueToCoder, irVertex);
+      connectSideInputs(builder, parDo.getSideInputs(), pValueToVertex, 
sideInputCoder, irVertex);
     } else if (beamTransform instanceof Flatten.PCollections) {
       irVertex = new OperatorVertex(new FlattenTransform());
       builder.addVertex(irVertex, loopVertexStack);
@@ -209,19 +218,20 @@ public void visitPrimitiveTransform(final 
TransformHierarchy.Node beamNode) {
    * @param builder        the DAG builder to add the vertex to.
    * @param sideInputs     side inputs.
    * @param pValueToVertex PValue to Vertex map.
-   * @param pValueToCoder  PValue to Encoder/Decoder factory map.
+   * @param coderMap       Side input to Encoder/Decoder factory map.
    * @param irVertex       wrapper for a user operation in the IR. (Where the 
side input is headed to)
    */
   private static void connectSideInputs(final DAGBuilder<IRVertex, IREdge> 
builder,
                                         final List<PCollectionView<?>> 
sideInputs,
                                         final Map<PValue, IRVertex> 
pValueToVertex,
-                                        final Map<PValue, 
Pair<BeamEncoderFactory, BeamDecoderFactory>> pValueToCoder,
+                                        final Map<IRVertex, 
Pair<BeamEncoderFactory, BeamDecoderFactory>> coderMap,
                                         final IRVertex irVertex) {
     sideInputs.stream().filter(pValueToVertex::containsKey)
         .forEach(pValue -> {
           final IRVertex src = pValueToVertex.get(pValue);
-          final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, 
irVertex), src, irVertex, true);
-          final Pair<BeamEncoderFactory, BeamDecoderFactory> coder = 
pValueToCoder.get(pValue);
+          final IREdge edge = new IREdge(getEdgeCommunicationPattern(src, 
irVertex),
+              src, irVertex, true);
+          final Pair<BeamEncoderFactory, BeamDecoderFactory> coder = 
coderMap.get(src);
           edge.setProperty(EncoderProperty.of(coder.left()));
           edge.setProperty(DecoderProperty.of(coder.right()));
           edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
diff --git 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index 534751542..059de8137 100644
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -17,12 +17,14 @@
 
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.ViewFn;
-import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 
+import javax.annotation.Nullable;
+import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.List;
 
 /**
  * CreateView transform implementation.
@@ -32,8 +34,8 @@
 public final class CreateViewTransform<I, O> implements Transform<I, O> {
   private final PCollectionView pCollectionView;
   private OutputCollector<O> outputCollector;
-  private List<WindowedValue<I>> windowed;
-  private final ViewFn<Iterable<WindowedValue<I>>, O> viewFn;
+  private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
+  private final MultiView<Object> multiView;
 
   /**
    * Constructor of CreateViewTransform.
@@ -41,8 +43,8 @@
    */
   public CreateViewTransform(final PCollectionView<O> pCollectionView) {
     this.pCollectionView = pCollectionView;
-    this.windowed = new ArrayList<>();
     this.viewFn = this.pCollectionView.getViewFn();
+    this.multiView = new MultiView<>();
   }
 
   @Override
@@ -52,8 +54,11 @@ public void prepare(final Context context, final 
OutputCollector<O> oc) {
 
   @Override
   public void onData(final I element) {
-    WindowedValue<I> data = WindowedValue.valueInGlobalWindow(element);
-    windowed.add(data);
+    // Since CreateViewTransform takes KV(Void, value), this is okay
+    if (element instanceof KV) {
+      final KV<?, ?> kv = (KV<?, ?>) element;
+      multiView.getDataList().add(kv.getValue());
+    }
   }
 
   /**
@@ -67,8 +72,8 @@ public PCollectionView getTag() {
 
   @Override
   public void close() {
-    O output = viewFn.apply(windowed);
-    outputCollector.emit(output);
+    final Object view = viewFn.apply(multiView);
+    outputCollector.emit((O) view);
   }
 
   @Override
@@ -77,4 +82,29 @@ public String toString() {
     sb.append("CreateViewTransform:" + pCollectionView);
     return sb.toString();
   }
+
+  /**
+   * Represents {@code PrimitiveViewT} supplied to the {@link ViewFn}.
+   * @param <T> primitive view type
+   */
+  public final class MultiView<T> implements 
Materializations.MultimapView<Void, T>, Serializable {
+    private final ArrayList<T> dataList;
+
+    /**
+     * Constructor.
+     */
+    MultiView() {
+      // Create a placeholder for side input data. CreateViewTransform#onData 
stores data to this list.
+      dataList = new ArrayList<>();
+    }
+
+    @Override
+    public Iterable<T> get(@Nullable final Void aVoid) {
+      return dataList;
+    }
+
+    public ArrayList<T> getDataList() {
+      return dataList;
+    }
+  }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
index 9d7636793..f0233804c 100644
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
@@ -18,8 +18,10 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -33,6 +35,7 @@
 import org.joda.time.Instant;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -73,7 +76,7 @@ public void prepare(final Context context, final 
OutputCollector<O> oc) {
     this.startBundleContext = new StartBundleContext(doFn, serializedOptions);
     this.finishBundleContext = new FinishBundleContext(doFn, outputCollector, 
serializedOptions);
     this.processContext = new ProcessContext(doFn, outputCollector,
-        context.getSideInputs(), context.getAdditionalTagOutputs(), 
serializedOptions);
+        context.getSideInputs(), context.getTagToAdditionalChildren(), 
serializedOptions);
     this.invoker = DoFnInvokers.invokerFor(doFn);
     invoker.invokeSetup();
     invoker.invokeStartBundle(startBundleContext);
@@ -193,7 +196,7 @@ public void output(final O output, final Instant instant, 
final BoundedWindow bo
     private I input;
     private final OutputCollector<O> outputCollector;
     private final Map sideInputs;
-    private final Map additionalOutputs;
+    private final Map<String, String> additionalOutputs;
     private final ObjectMapper mapper;
     private final PipelineOptions options;
 
@@ -203,13 +206,13 @@ public void output(final O output, final Instant instant, 
final BoundedWindow bo
      * @param fn                Dofn.
      * @param outputCollector   OutputCollector.
      * @param sideInputs        Map for SideInputs.
-     * @param additionalOutputs     Map for TaggedOutputs.
+     * @param additionalOutputs Map for TaggedOutputs.
      * @param serializedOptions Options, serialized.
      */
     ProcessContext(final DoFn<I, O> fn,
                    final OutputCollector<O> outputCollector,
                    final Map sideInputs,
-                   final Map additionalOutputs,
+                   final Map<String, String> additionalOutputs,
                    final String serializedOptions) {
       fn.super();
       this.outputCollector = outputCollector;
@@ -249,7 +252,7 @@ public Instant timestamp() {
 
     @Override
     public PaneInfo pane() {
-      throw new UnsupportedOperationException("pane() in ProcessContext under 
DoTransform");
+      return PaneInfo.createPane(true, true, PaneInfo.Timing.UNKNOWN);
     }
 
     @Override
@@ -269,12 +272,18 @@ public void output(final O output) {
 
     @Override
     public void outputWithTimestamp(final O output, final Instant timestamp) {
-      throw new UnsupportedOperationException("outputWithTimestamp() in 
ProcessContext under DoTransform");
+      outputCollector.emit(output);
     }
 
     @Override
     public <T> void output(final TupleTag<T> tupleTag, final T t) {
-      outputCollector.emit((String) additionalOutputs.get(tupleTag.getId()), 
t);
+      final Object dstVertexId = additionalOutputs.get(tupleTag.getId());
+
+      if (dstVertexId == null) {
+        outputCollector.emit((O) t);
+      } else {
+        outputCollector.emit(additionalOutputs.get(tupleTag.getId()), t);
+      }
     }
 
     @Override
@@ -284,12 +293,18 @@ public void outputWithTimestamp(final O output, final 
Instant timestamp) {
 
     @Override
     public BoundedWindow window() {
-      return new BoundedWindow() {
-        @Override
-        public Instant maxTimestamp() {
-          return GlobalWindow.INSTANCE.maxTimestamp();
-        }
-      };
+      // Unbounded windows are not supported for now.
+      return GlobalWindow.INSTANCE;
+    }
+
+    @Override
+    public PaneInfo paneInfo(final DoFn<I, O> doFn) {
+      return PaneInfo.createPane(true, true, PaneInfo.Timing.UNKNOWN);
+    }
+
+    @Override
+    public PipelineOptions pipelineOptions() {
+      return options;
     }
 
     @Override
@@ -315,10 +330,35 @@ public Instant maxTimestamp() {
     }
 
     @Override
-    public RestrictionTracker<?> restrictionTracker() {
+    public I element(final DoFn<I, O> doFn) {
+      return this.input;
+    }
+
+    @Override
+    public Instant timestamp(final DoFn<I, O> doFn) {
+      return Instant.now();
+    }
+
+    @Override
+    public RestrictionTracker<?, ?> restrictionTracker() {
       throw new UnsupportedOperationException("restrictionTracker() in 
ProcessContext under DoTransform");
     }
 
+    @Override
+    public TimeDomain timeDomain(final DoFn<I, O> doFn) {
+      throw new UnsupportedOperationException("timeDomain() in ProcessContext 
under DoTransform");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<O> outputReceiver(final DoFn<I, O> doFn) {
+      return new OutputReceiver<>((OutputCollectorImpl) outputCollector);
+    }
+
+    @Override
+    public DoFn.MultiOutputReceiver taggedOutputReceiver(final DoFn<I, O> 
doFn) {
+      return new MultiOutputReceiver((OutputCollectorImpl) outputCollector, 
additionalOutputs);
+    }
+
     @Override
     public State state(final String stateId) {
       throw new UnsupportedOperationException("state() in ProcessContext under 
DoTransform");
@@ -336,4 +376,61 @@ public Timer timer(final String timerId) {
   public DoFn getDoFn() {
     return doFn;
   }
+
+  /**
+   * OutputReceiver class.
+   * @param <O> output type
+   */
+  static final class OutputReceiver<O> implements DoFn.OutputReceiver<O> {
+    private final List<O> dataElements;
+
+    OutputReceiver(final OutputCollectorImpl<O> outputCollector) {
+      this.dataElements = outputCollector.getMainTagOutputQueue();
+    }
+
+    OutputReceiver(final OutputCollectorImpl outputCollector,
+                   final TupleTag<O> tupleTag,
+                   final Map<String, String> tagToVertex) {
+      final Object dstVertexId = tagToVertex.get(tupleTag.getId());
+      if (dstVertexId == null) {
+        this.dataElements = outputCollector.getMainTagOutputQueue();
+      } else {
+        this.dataElements = (List<O>) 
outputCollector.getAdditionalTagOutputQueue((String) dstVertexId);
+      }
+    }
+
+    @Override
+    public void output(final O output) {
+      dataElements.add(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(final O output, final Instant timestamp) {
+      dataElements.add(output);
+    }
+  }
+
+  /**
+   * MultiOutputReceiver class.
+   */
+  static final class MultiOutputReceiver implements DoFn.MultiOutputReceiver {
+    private final OutputCollectorImpl outputCollector;
+    private final Map<String, String> tagToVertex;
+
+    /**
+     * Constructor.
+     * @param outputCollector outputCollector
+     * @param tagToVertex     tag to vertex map
+     */
+    MultiOutputReceiver(final OutputCollectorImpl outputCollector,
+                               final Map<String, String> tagToVertex) {
+      this.outputCollector = outputCollector;
+      this.tagToVertex = tagToVertex;
+    }
+
+    @Override
+    public <T> DoFn.OutputReceiver<T> get(final TupleTag<T> tag) {
+      return new OutputReceiver<>(this.outputCollector, tag, tagToVertex);
+    }
+  }
 }
diff --git 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
index 27872c229..67f146a55 100644
--- 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
+++ 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
@@ -38,7 +38,7 @@ public void testALSDAG() throws Exception {
     final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileALSDAG();
 
     assertEquals(producedDAG.getTopologicalSort(), 
producedDAG.getTopologicalSort());
-    assertEquals(32, producedDAG.getVertices().size());
+    assertEquals(38, producedDAG.getVertices().size());
 
 //    producedDAG.getTopologicalSort().forEach(v -> 
System.out.println(v.getId()));
     final IRVertex vertex4 = producedDAG.getTopologicalSort().get(6);
@@ -46,14 +46,14 @@ public void testALSDAG() throws Exception {
     assertEquals(1, producedDAG.getIncomingEdgesOf(vertex4.getId()).size());
     assertEquals(4, producedDAG.getOutgoingEdgesOf(vertex4).size());
 
-    final IRVertex vertex12 = producedDAG.getTopologicalSort().get(10);
-    assertEquals(1, producedDAG.getIncomingEdgesOf(vertex12).size());
-    assertEquals(1, producedDAG.getIncomingEdgesOf(vertex12.getId()).size());
-    assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex12).size());
-
     final IRVertex vertex13 = producedDAG.getTopologicalSort().get(11);
-    assertEquals(2, producedDAG.getIncomingEdgesOf(vertex13).size());
-    assertEquals(2, producedDAG.getIncomingEdgesOf(vertex13.getId()).size());
+    assertEquals(1, producedDAG.getIncomingEdgesOf(vertex13).size());
+    assertEquals(1, producedDAG.getIncomingEdgesOf(vertex13.getId()).size());
     assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex13).size());
+
+    final IRVertex vertex14 = producedDAG.getTopologicalSort().get(12);
+    assertEquals(2, producedDAG.getIncomingEdgesOf(vertex14).size());
+    assertEquals(2, producedDAG.getIncomingEdgesOf(vertex14.getId()).size());
+    assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex14).size());
   }
 }
diff --git 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
index 8553263e6..0cb3a2670 100644
--- 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
+++ 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
@@ -38,21 +38,21 @@ public void testMLRDAG() throws Exception {
     final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileMLRDAG();
 
     assertEquals(producedDAG.getTopologicalSort(), 
producedDAG.getTopologicalSort());
-    assertEquals(33, producedDAG.getVertices().size());
+    assertEquals(36, producedDAG.getVertices().size());
 
     final IRVertex vertex3 = producedDAG.getTopologicalSort().get(0);
     assertEquals(0, producedDAG.getIncomingEdgesOf(vertex3).size());
     assertEquals(0, producedDAG.getIncomingEdgesOf(vertex3.getId()).size());
     assertEquals(3, producedDAG.getOutgoingEdgesOf(vertex3).size());
 
-    final IRVertex vertex12 = producedDAG.getTopologicalSort().get(10);
-    assertEquals(1, producedDAG.getIncomingEdgesOf(vertex12).size());
-    assertEquals(1, producedDAG.getIncomingEdgesOf(vertex12.getId()).size());
-    assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex12).size());
+    final IRVertex vertex13 = producedDAG.getTopologicalSort().get(11);
+    assertEquals(1, producedDAG.getIncomingEdgesOf(vertex13).size());
+    assertEquals(1, producedDAG.getIncomingEdgesOf(vertex13.getId()).size());
+    assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex13).size());
 
-    final IRVertex vertex17 = producedDAG.getTopologicalSort().get(15);
-    assertEquals(2, producedDAG.getIncomingEdgesOf(vertex17).size());
-    assertEquals(2, producedDAG.getIncomingEdgesOf(vertex17.getId()).size());
-    assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex17).size());
+    final IRVertex vertex19 = producedDAG.getTopologicalSort().get(17);
+    assertEquals(2, producedDAG.getIncomingEdgesOf(vertex19).size());
+    assertEquals(2, producedDAG.getIncomingEdgesOf(vertex19.getId()).size());
+    assertEquals(1, producedDAG.getOutgoingEdgesOf(vertex19).size());
   }
 }
diff --git 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
index 4d4649d23..e035241a9 100644
--- 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
+++ 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
@@ -74,16 +74,16 @@ public void testTransientResourcePass() throws Exception {
       assertEquals(DataFlowProperty.Value.Pull, 
irEdge.getPropertyValue(DataFlowProperty.class).get());
     });
 
-    final IRVertex vertex12 = processedDAG.getTopologicalSort().get(10);
-    assertEquals(ResourcePriorityProperty.TRANSIENT, 
vertex12.getPropertyValue(ResourcePriorityProperty.class).get());
-    processedDAG.getIncomingEdgesOf(vertex12).forEach(irEdge -> {
+    final IRVertex vertex13 = processedDAG.getTopologicalSort().get(11);
+    assertEquals(ResourcePriorityProperty.TRANSIENT, 
vertex13.getPropertyValue(ResourcePriorityProperty.class).get());
+    processedDAG.getIncomingEdgesOf(vertex13).forEach(irEdge -> {
       assertEquals(DataStoreProperty.Value.LocalFileStore, 
irEdge.getPropertyValue(DataStoreProperty.class).get());
       assertEquals(DataFlowProperty.Value.Pull, 
irEdge.getPropertyValue(DataFlowProperty.class).get());
     });
 
-    final IRVertex vertex14 = processedDAG.getTopologicalSort().get(12);
-    assertEquals(ResourcePriorityProperty.RESERVED, 
vertex14.getPropertyValue(ResourcePriorityProperty.class).get());
-    processedDAG.getIncomingEdgesOf(vertex14).forEach(irEdge -> {
+    final IRVertex vertex15 = processedDAG.getTopologicalSort().get(13);
+    assertEquals(ResourcePriorityProperty.RESERVED, 
vertex15.getPropertyValue(ResourcePriorityProperty.class).get());
+    processedDAG.getIncomingEdgesOf(vertex15).forEach(irEdge -> {
       assertEquals(DataStoreProperty.Value.LocalFileStore, 
irEdge.getPropertyValue(DataStoreProperty.class).get());
       assertEquals(DataFlowProperty.Value.Push, 
irEdge.getPropertyValue(DataFlowProperty.class).get());
     });
diff --git 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index 54f7933fd..07210ee07 100644
--- 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -60,16 +60,16 @@ public void setUp() throws Exception {
     final LoopVertex alsLoop = alsLoopOpt.get();
 
     final IRVertex vertex7 = groupedDAG.getTopologicalSort().get(3);
-    final IRVertex vertex13 = alsLoop.getDAG().getTopologicalSort().get(3);
+    final IRVertex vertex14 = alsLoop.getDAG().getTopologicalSort().get(4);
 
-    final Set<IREdge> oldDAGIncomingEdges = 
alsLoop.getDagIncomingEdges().get(vertex13);
+    final Set<IREdge> oldDAGIncomingEdges = 
alsLoop.getDagIncomingEdges().get(vertex14);
     final List<IREdge> newDAGIncomingEdge = 
groupedDAG.getIncomingEdgesOf(vertex7);
 
-    alsLoop.getDagIncomingEdges().remove(vertex13);
+    alsLoop.getDagIncomingEdges().remove(vertex14);
     alsLoop.getDagIncomingEdges().putIfAbsent(vertex7, new HashSet<>());
     
newDAGIncomingEdge.forEach(alsLoop.getDagIncomingEdges().get(vertex7)::add);
 
-    alsLoop.getNonIterativeIncomingEdges().remove(vertex13);
+    alsLoop.getNonIterativeIncomingEdges().remove(vertex14);
     alsLoop.getNonIterativeIncomingEdges().putIfAbsent(vertex7, new 
HashSet<>());
     
newDAGIncomingEdge.forEach(alsLoop.getNonIterativeIncomingEdges().get(vertex7)::add);
 
diff --git 
a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/GenericSourceSink.java 
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/GenericSourceSink.java
index 82451caaf..1ed7a9c9a 100644
--- 
a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/GenericSourceSink.java
+++ 
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/GenericSourceSink.java
@@ -94,7 +94,12 @@ public static PDone write(final PCollection<String> 
dataToWrite,
       dataToWrite.apply(ParDo.of(new HDFSWrite(path)));
       return PDone.in(dataToWrite.getPipeline());
     } else {
-      return dataToWrite.apply(TextIO.write().to(path));
+      // (Only relevant to local file writes) withWindowedWrites() is required 
for local file writes.
+      // Without it, the FileResultCoder#encode, which assumes WindowedValue, 
will not be able
+      // to properly handle the FileResult (Beam's file metadata information), 
and hang the job.
+      // The root cause is that the Nemo runtime currently only supports batch 
applications, and
+      // does not use the Beam's WindowedValue by default.
+      return dataToWrite.apply(TextIO.write().to(path).withWindowedWrites());
     }
   }
 
diff --git 
a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
 
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
index e97b33b46..8438052ee 100644
--- 
a/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
+++ 
b/examples/beam/src/main/java/edu/snu/nemo/examples/beam/PartitionWordsByLength.java
@@ -66,9 +66,10 @@ public static void main(final String[] args) {
             .into(TypeDescriptors.strings())
             .via(line -> Arrays.asList(line.split(" "))))
         .apply(ParDo.of(new DoFn<String, String>() {
+          // processElement with Beam OutputReceiver.
           @ProcessElement
           public void processElement(final ProcessContext c) {
-            String word = c.element();
+            final String word = c.element();
             if (word.length() < 6) {
               c.output(shortWordsTag, KV.of(word.length(), word));
             } else if (word.length() < 11) {
@@ -89,12 +90,12 @@ public void processElement(final ProcessContext c) {
         .apply(GroupByKey.create())
         .apply(MapElements.via(new FormatLines()));
     PCollection<String> veryLongWords = results.get(veryLongWordsTag);
-    PCollection<String> veryveryLongWords = results.get(veryVeryLongWordsTag);
+    PCollection<String> veryVeryLongWords = results.get(veryVeryLongWordsTag);
 
     GenericSourceSink.write(shortWords, outputFilePath + "_short");
     GenericSourceSink.write(longWords, outputFilePath + "_long");
     GenericSourceSink.write(veryLongWords, outputFilePath + "_very_long");
-    GenericSourceSink.write(veryveryLongWords, outputFilePath + 
"_very_very_long");
+    GenericSourceSink.write(veryVeryLongWords, outputFilePath + 
"_very_very_long");
     p.run();
   }
 
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
index 1ad85aa9a..5a6a47ae6 100644
--- 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@ -72,13 +72,14 @@ public void testDefault() throws Exception {
         .build());
   }
 
-  @Test (timeout = TIMEOUT)
-  public void testTransientResourceWithPoison() throws Exception {
-    JobLauncher.main(builder
-        .addResourceJson(poisonedResource)
-        .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + 
"_transient_poisoned")
-        .addMaxTaskAttempt(Integer.MAX_VALUE)
-        
.addOptimizationPolicy(TransientResourcePolicyParallelismTen.class.getCanonicalName())
-        .build());
-  }
+  // TODO #137: Retry parent task(s) upon task INPUT_READ_FAILURE
+  // @Test (timeout = TIMEOUT)
+  // public void testTransientResourceWithPoison() throws Exception {
+  //   JobLauncher.main(builder
+  //       .addResourceJson(poisonedResource)
+  //       .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + 
"_transient_poisoned")
+  //       .addMaxTaskAttempt(Integer.MAX_VALUE)
+  //       
.addOptimizationPolicy(TransientResourcePolicyParallelismTen.class.getCanonicalName())
+  //       .build());
+  // }
 }
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java
 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java
index bcaf01ffe..fd4a34afa 100644
--- 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java
@@ -64,20 +64,20 @@ public void tearDown() throws Exception {
   }
 
   @Test (timeout = TIMEOUT)
-  public void test() throws Exception {
+  public void testLargeShuffle() throws Exception {
     JobLauncher.main(builder
         .addResourceJson(executorResourceFileName)
-        .addJobId(PartitionWordsByLengthITCase.class.getSimpleName())
-        
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+        .addJobId(PartitionWordsByLengthITCase.class.getSimpleName() + 
"_largeshuffle")
+        
.addOptimizationPolicy(LargeShufflePolicyParallelismFive.class.getCanonicalName())
         .build());
   }
 
   @Test (timeout = TIMEOUT)
-  public void testSailfish() throws Exception {
+  public void test() throws Exception {
     JobLauncher.main(builder
         .addResourceJson(executorResourceFileName)
-        .addJobId(PartitionWordsByLengthITCase.class.getSimpleName() + 
"_sailfish")
-        
.addOptimizationPolicy(LargeShufflePolicyParallelismFive.class.getCanonicalName())
+        .addJobId(PartitionWordsByLengthITCase.class.getSimpleName())
+        
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .build());
   }
 }
diff --git a/pom.xml b/pom.xml
index 2929ec2c7..dc750ceb3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,7 @@ limitations under the License.
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <beam.version>2.0.0</beam.version>
+        <beam.version>2.5.0</beam.version>
         <spark.version>2.2.0</spark.version>
         <scala.version>2.11.8</scala.version>
         <kryo.version>4.0.1</kryo.version>
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
index e6433fd74..0796b0117 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
@@ -25,15 +25,19 @@
  * @param <O> output type.
  */
 public final class OutputCollectorImpl<O> implements OutputCollector<O> {
+  private final Set<String> mainTagOutputChildren;
   // Use ArrayList (not Queue) to allow 'null' values
   private final ArrayList<O> mainTagElements;
   private final Map<String, ArrayList<Object>> additionalTagElementsMap;
 
   /**
    * Constructor of a new OutputCollectorImpl with tagged outputs.
-   * @param taggedChildren tagged children
+   * @param mainChildren   main children vertices
+   * @param taggedChildren additional children vertices
    */
-  public OutputCollectorImpl(final List<String> taggedChildren) {
+  public OutputCollectorImpl(final Set<String> mainChildren,
+                             final List<String> taggedChildren) {
+    this.mainTagOutputChildren = mainChildren;
     this.mainTagElements = new ArrayList<>(1);
     this.additionalTagElementsMap = new HashMap<>();
     taggedChildren.forEach(child -> this.additionalTagElementsMap.put(child, 
new ArrayList<>(1)));
@@ -46,12 +50,16 @@ public void emit(final O output) {
 
   @Override
   public <T> void emit(final String dstVertexId, final T output) {
-    if (this.additionalTagElementsMap.get(dstVertexId) == null) {
+    if (this.mainTagOutputChildren.contains(dstVertexId)) {
       // This dstVertexId is for the main tag
       emit((O) output);
     } else {
       // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-      this.additionalTagElementsMap.get(dstVertexId).add(output);
+      final List<Object> dataElements = 
this.additionalTagElementsMap.get(dstVertexId);
+      if (dataElements == null) {
+        throw new IllegalArgumentException("Wrong destination vertex id 
passed!");
+      }
+      dataElements.add(output);
     }
   }
 
@@ -60,12 +68,15 @@ public void emit(final O output) {
   }
 
   public Iterable<Object> iterateTag(final String tag) {
-    if (this.additionalTagElementsMap.get(tag) == null) {
+    if (this.mainTagOutputChildren.contains(tag)) {
       // This dstVertexId is for the main tag
       return (Iterable<Object>) iterateMain();
     } else {
-      // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-      return this.additionalTagElementsMap.get(tag);
+      final List<Object> dataElements = this.additionalTagElementsMap.get(tag);
+      if (dataElements == null) {
+        throw new IllegalArgumentException("Wrong destination vertex id 
passed!");
+      }
+      return dataElements;
     }
   }
 
@@ -74,12 +85,32 @@ public void clearMain() {
   }
 
   public void clearTag(final String tag) {
-    if (this.additionalTagElementsMap.get(tag) == null) {
+    if (this.mainTagOutputChildren.contains(tag)) {
       // This dstVertexId is for the main tag
       clearMain();
     } else {
       // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-      this.additionalTagElementsMap.get(tag).clear();
+      final List<Object> dataElements = this.additionalTagElementsMap.get(tag);
+      if (dataElements == null) {
+        throw new IllegalArgumentException("Wrong destination vertex id 
passed!");
+      }
+      dataElements.clear();
+    }
+  }
+
+  public List<O> getMainTagOutputQueue() {
+    return mainTagElements;
+  }
+
+  public List<Object> getAdditionalTagOutputQueue(final String dstVertexId) {
+    if (this.mainTagOutputChildren.contains(dstVertexId)) {
+      return (List<Object>) this.mainTagElements;
+    } else {
+      final List<Object> dataElements = 
this.additionalTagElementsMap.get(dstVertexId);
+      if (dataElements == null) {
+        throw new IllegalArgumentException("Wrong destination vertex id 
passed!");
+      }
+      return dataElements;
     }
   }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 807e5bb84..7d1901daa 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -168,8 +168,12 @@ public TaskExecutor(final Task task,
       // Additional output children task writes
       final Map<String, OutputWriter> additionalChildrenTaskWriters = 
getAdditionalChildrenTaskWriters(
           taskIndex, irVertex, task.getTaskOutgoingEdges(), 
dataTransferFactory, additionalOutputMap);
+      // Find all main vertices and additional vertices
       final List<String> additionalOutputVertices = new 
ArrayList<>(additionalOutputMap.values());
-      final OutputCollectorImpl oci = new 
OutputCollectorImpl(additionalOutputVertices);
+      final Set<String> mainChildren =
+          getMainOutputVertices(irVertex, irVertexDag, 
task.getTaskOutgoingEdges(), additionalOutputVertices);
+      final OutputCollectorImpl oci = new OutputCollectorImpl(mainChildren, 
additionalOutputVertices);
+
       // intra-vertex writes
       final VertexHarness vertexHarness = new VertexHarness(irVertex, oci, 
children,
           isToSideInputs, isToAdditionalTagOutputs, mainChildrenTaskWriters, 
additionalChildrenTaskWriters,
@@ -225,7 +229,7 @@ private void processElementRecursively(final VertexHarness 
vertexHarness, final
     outputCollector.clearMain();
 
     // Recursively process all of the additional output elements.
-    vertexHarness.getContext().getAdditionalTagOutputs().values().forEach(tag 
-> {
+    
vertexHarness.getContext().getTagToAdditionalChildren().values().forEach(tag -> 
{
       outputCollector.iterateTag(tag).forEach(
           element -> handleAdditionalOutputElement(vertexHarness, element, 
tag)); // Recursion
       outputCollector.clearTag(tag);
@@ -465,6 +469,29 @@ private boolean handleDataFetchers(final List<DataFetcher> 
fetchers) {
         .collect(Collectors.toList());
   }
 
+  private Set<String> getMainOutputVertices(final IRVertex irVertex,
+                                             final DAG<IRVertex, 
RuntimeEdge<IRVertex>> irVertexDag,
+                                             final List<StageEdge> 
outEdgesToChildrenTasks,
+                                             final List<String> 
additionalOutputVertices) {
+    // all intra-task children vertices id
+    final List<String> outputVertices = 
irVertexDag.getOutgoingEdgesOf(irVertex).stream()
+        .filter(edge -> edge.getSrc().getId().equals(irVertex.getId()))
+        .map(edge -> edge.getDst().getId())
+        .collect(Collectors.toList());
+
+    // all inter-task children vertices id
+    outputVertices
+        .addAll(outEdgesToChildrenTasks.stream()
+            .filter(edge -> 
edge.getSrcIRVertex().getId().equals(irVertex.getId()))
+            .map(edge -> edge.getDstIRVertex().getId())
+            .collect(Collectors.toList()));
+
+    // return vertices that are not marked as additional tagged outputs
+    return new HashSet<>(outputVertices.stream()
+        .filter(vertexId -> !additionalOutputVertices.contains(vertexId))
+        .collect(Collectors.toList()));
+  }
+
   /**
    * Return inter-task OutputWriters, for single output or output associated 
with main tag.
    * @param taskIndex               current task index
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
index c79b53020..502325feb 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
@@ -54,7 +54,7 @@
     if (children.size() != isSideInputs.size() || children.size() != 
isAdditionalTagOutputs.size()) {
       throw new IllegalStateException(irVertex.toString());
     }
-    final Map<String, String> taggedOutputMap = 
context.getAdditionalTagOutputs();
+    final Map<String, String> taggedOutputMap = 
context.getTagToAdditionalChildren();
     final List<VertexHarness> sides = new ArrayList<>();
     final List<VertexHarness> nonSides = new ArrayList<>();
     final Map<String, VertexHarness> tagged = new HashMap<>();
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index e2fea6cd2..e810989bc 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -505,7 +505,7 @@ public void close() {
     @Override
     public void prepare(final Context context, OutputCollector<Integer> 
outputCollector) {
       this.outputCollector = outputCollector;
-      this.tagToVertex = context.getAdditionalTagOutputs();
+      this.tagToVertex = context.getTagToAdditionalChildren();
     }
 
     @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to