This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 251440a  [NEMO-235] Refactor TaskExecutor's data processing logic 
(#128)
251440a is described below

commit 251440ad65e932ace9fd16b6d74ea8358d1638b0
Author: Taegeon Um <[email protected]>
AuthorDate: Fri Oct 26 08:57:19 2018 +0900

    [NEMO-235] Refactor TaskExecutor's data processing logic (#128)
    
    JIRA: [NEMO-235: Refactor TaskExecutor's data processing 
logic](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-235)
    
    **Major changes:**
    - Refactor `TaskExecutor` by delegating data transfer logic to 
`OutputCollector`
    
    **Minor changes to note:**
    -
    
    **Tests for the changes:**
    -
    
    **Other comments:**
    -
    
    Closes #128
---
 .../nemo/common/ir/vertex/transform/Transform.java |   2 -
 .../beam/transform/AbstractDoFnTransform.java      |   3 +-
 .../beam/transform/DefaultOutputManager.java       |   8 +-
 .../compiletime/reshaping/SkewReshapingPass.java   |   6 +-
 .../frontend/beam/transform/DoFnTransformTest.java |   1 -
 .../runtime/executor/TransformContextImpl.java     |  12 +-
 .../datatransfer/DynOptDataOutputCollector.java    |  83 ++++++
 .../executor/datatransfer/OutputCollectorImpl.java | 132 +++------
 .../nemo/runtime/executor/task/TaskExecutor.java   | 327 +++++++--------------
 .../nemo/runtime/executor/task/VertexHarness.java  |  85 ++----
 .../runtime/executor/TransformContextImplTest.java |   6 +-
 .../runtime/executor/task/TaskExecutorTest.java    |  21 +-
 12 files changed, 273 insertions(+), 413 deletions(-)

diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
index 87db84f..b59344a 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
@@ -17,7 +17,6 @@ package org.apache.nemo.common.ir.vertex.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import java.io.Serializable;
-import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -54,7 +53,6 @@ public interface Transform<I, O> extends Serializable {
      * @return the broadcast variable.
      */
     Object getBroadcastVariable(Serializable id);
-    Map<String, String> getTagToAdditionalChildren();
 
     /**
      * Put serialized data to send to the executor.
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 8679c73..444dfb4 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -119,8 +119,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, 
OutputT> implements
     this.outputCollector = oc;
 
     // create output manager
-    outputManager = new DefaultOutputManager<>(
-      outputCollector, context, mainOutputTag);
+    outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);
 
     // create side input reader
     if (!sideInputs.isEmpty()) {
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
index 4174c6c..457a128 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
@@ -19,9 +19,6 @@ import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-
-import java.util.Map;
 
 /**
  * Default output emitter that uses outputCollector.
@@ -30,14 +27,11 @@ import java.util.Map;
 public final class DefaultOutputManager<OutputT> implements 
DoFnRunners.OutputManager {
   private final TupleTag<OutputT> mainOutputTag;
   private final OutputCollector<WindowedValue<OutputT>> outputCollector;
-  private final Map<String, String> additionalOutputs;
 
   DefaultOutputManager(final OutputCollector<WindowedValue<OutputT>> 
outputCollector,
-                       final Transform.Context context,
                        final TupleTag<OutputT> mainOutputTag) {
     this.outputCollector = outputCollector;
     this.mainOutputTag = mainOutputTag;
-    this.additionalOutputs = context.getTagToAdditionalChildren();
   }
 
   @Override
@@ -45,7 +39,7 @@ public final class DefaultOutputManager<OutputT> implements 
DoFnRunners.OutputMa
     if (tag.equals(mainOutputTag)) {
       outputCollector.emit((WindowedValue<OutputT>) output);
     } else {
-      outputCollector.emit(additionalOutputs.get(tag.getId()), output);
+      outputCollector.emit(tag.getId(), output);
     }
   }
 }
diff --git 
a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
 
b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index e4743b5..eea5867 100644
--- 
a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++ 
b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -53,7 +53,7 @@ import java.util.function.BiFunction;
 @Requires(CommunicationPatternProperty.class)
 public final class SkewReshapingPass extends ReshapingPass {
   private static final Logger LOG = 
LoggerFactory.getLogger(SkewReshapingPass.class.getName());
-
+  private static final String ADDITIONAL_OUTPUT_TAG = "DynOptData";
   /**
    * Default constructor.
    */
@@ -154,7 +154,7 @@ public final class SkewReshapingPass extends ReshapingPass {
         (dynOptData, outputCollector)-> {
           dynOptData.forEach((k, v) -> {
             final Pair<Object, Object> pairData = Pair.of(k, v);
-            outputCollector.emit(abv.getId(), pairData);
+            outputCollector.emit(ADDITIONAL_OUTPUT_TAG, pairData);
           });
           return dynOptData;
         };
@@ -180,7 +180,7 @@ public final class SkewReshapingPass extends ReshapingPass {
     
newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
     newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
     newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
-    newEdge.setProperty(AdditionalOutputTagProperty.of("DynOptData"));
+    newEdge.setProperty(AdditionalOutputTagProperty.of(ADDITIONAL_OUTPUT_TAG));
 
     // Dynamic optimization handles statistics on key-value data by default.
     // We need to get coders for encoding/decoding the keys to send data to
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index 9a65e7a..db04f1e 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -115,7 +115,6 @@ public final class DoFnTransformTest {
 
     // mock context
     final Transform.Context context = mock(Transform.Context.class);
-    when(context.getTagToAdditionalChildren()).thenReturn(tagsMap);
 
     final OutputCollector<WindowedValue<String>> oc = new 
TestOutputCollector<>();
     doFnTransform.prepare(context, oc);
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
index 682cf3c..bd22271 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
@@ -19,7 +19,6 @@ import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
 
 import java.io.Serializable;
-import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -27,18 +26,14 @@ import java.util.Optional;
  */
 public final class TransformContextImpl implements Transform.Context {
   private final BroadcastManagerWorker broadcastManagerWorker;
-  private final Map<String, String> tagToAdditionalChildren;
   private String data;
 
   /**
    * Constructor of Context Implementation.
    * @param broadcastManagerWorker for broadcast variables.
-   * @param tagToAdditionalChildren tag id to additional vertices id map.
    */
-  public TransformContextImpl(final BroadcastManagerWorker 
broadcastManagerWorker,
-                              final Map<String, String> 
tagToAdditionalChildren) {
+  public TransformContextImpl(final BroadcastManagerWorker 
broadcastManagerWorker) {
     this.broadcastManagerWorker = broadcastManagerWorker;
-    this.tagToAdditionalChildren = tagToAdditionalChildren;
     this.data = null;
   }
 
@@ -48,11 +43,6 @@ public final class TransformContextImpl implements 
Transform.Context {
   }
 
   @Override
-  public Map<String, String> getTagToAdditionalChildren() {
-    return this.tagToAdditionalChildren;
-  }
-
-  @Override
   public void setSerializedData(final String serializedData) {
     this.data = serializedData;
   }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
new file mode 100644
index 0000000..c2a013a
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
+import org.apache.nemo.runtime.executor.task.TaskExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * OutputCollector for dynamic optimization data.
+ *
+ * @param <O> output type.
+ */
+public final class DynOptDataOutputCollector<O> implements OutputCollector<O> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynOptDataOutputCollector.class.getName());
+  private static final String NULL_KEY = "NULL";
+
+  private final IRVertex irVertex;
+  private final PersistentConnectionToMasterMap connectionToMasterMap;
+  private final TaskExecutor taskExecutor;
+
+  public DynOptDataOutputCollector(final IRVertex irVertex,
+                                   final PersistentConnectionToMasterMap 
connectionToMasterMap,
+                                   final TaskExecutor taskExecutor) {
+    this.irVertex = irVertex;
+    this.connectionToMasterMap = connectionToMasterMap;
+    this.taskExecutor = taskExecutor;
+  }
+
+  @Override
+  public void emit(final O output) {
+    final Map<Object, Long> aggregatedDynOptData = (Map<Object, Long>) output;
+    final List<ControlMessage.PartitionSizeEntry> partitionSizeEntries = new 
ArrayList<>();
+    aggregatedDynOptData.forEach((key, size) ->
+      partitionSizeEntries.add(
+        ControlMessage.PartitionSizeEntry.newBuilder()
+          .setKey(key == null ? NULL_KEY : String.valueOf(key))
+          .setSize(size)
+          .build())
+    );
+
+    
connectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+      .send(ControlMessage.Message.newBuilder()
+        .setId(RuntimeIdManager.generateMessageId())
+        .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+        .setType(ControlMessage.MessageType.DataSizeMetric)
+        .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
+          .addAllPartitionSize(partitionSizeEntries)
+        )
+        .build());
+
+    // set the id of this vertex to mark the corresponding stage as put on hold
+    taskExecutor.setIRVertexPutOnHold(irVertex);
+  }
+
+  @Override
+  public <T> void emit(final String dstVertexId, final T output) {
+    throw new IllegalStateException("Dynamic optimization does not emit tagged 
data");
+  }
+}
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
index b489cbb..aa510e9 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
@@ -15,8 +15,9 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -24,110 +25,75 @@ import java.util.*;
 
 /**
  * OutputCollector implementation.
+ * This emits four types of outputs
+ * 1) internal main outputs: this output becomes the input of internal 
Transforms
+ * 2) internal additional outputs: this additional output becomes the input of 
internal Transforms
+ * 3) external main outputs: this external output is emitted to OutputWriter
+ * 4) external additional outputs: this external output is emitted to 
OutputWriter
  *
  * @param <O> output type.
  */
 public final class OutputCollectorImpl<O> implements OutputCollector<O> {
   private static final Logger LOG = 
LoggerFactory.getLogger(OutputCollectorImpl.class.getName());
-  private final Set<String> mainTagOutputChildren;
-  // Use ArrayList (not Queue) to allow 'null' values
-  private final ArrayList<O> mainTagElements;
-  // Key: Pair of tag and destination vertex id
-  // Value: data elements which will be input to the tagged destination vertex
-  private final Map<Pair<String, String>, ArrayList<Object>> 
additionalTaggedChildToElementsMap;
+
+  private final IRVertex irVertex;
+  private final List<OperatorVertex> internalMainOutputs;
+  private final Map<String, List<OperatorVertex>> internalAdditionalOutputs;
+  private final List<OutputWriter> externalMainOutputs;
+  private final Map<String, List<OutputWriter>> externalAdditionalOutputs;
 
   /**
-   * Constructor of a new OutputCollectorImpl with tagged outputs.
-   * @param mainChildren   main children vertices
-   * @param tagToChildren additional children vertices
+   * Constructor of the output collector.
+   * @param irVertex the ir vertex that emits the output
+   * @param internalMainOutputs internal main outputs
+   * @param internalAdditionalOutputs internal additional outputs
+   * @param externalMainOutputs external main outputs
+   * @param externalAdditionalOutputs external additional outputs
    */
-  public OutputCollectorImpl(final Set<String> mainChildren,
-                             final Map<String, String> tagToChildren) {
-    this.mainTagOutputChildren = mainChildren;
-    this.mainTagElements = new ArrayList<>(1);
-    this.additionalTaggedChildToElementsMap = new HashMap<>();
-    tagToChildren.forEach((tag, child) ->
-      this.additionalTaggedChildToElementsMap.put(Pair.of(tag, child), new 
ArrayList<>(1)));
-  }
-
-  @Override
-  public void emit(final O output) {
-    mainTagElements.add(output);
+  public OutputCollectorImpl(final IRVertex irVertex,
+                             final List<OperatorVertex> internalMainOutputs,
+                             final Map<String, List<OperatorVertex>> 
internalAdditionalOutputs,
+                             final List<OutputWriter> externalMainOutputs,
+                             final Map<String, List<OutputWriter>> 
externalAdditionalOutputs) {
+    this.irVertex = irVertex;
+    this.internalMainOutputs = internalMainOutputs;
+    this.internalAdditionalOutputs = internalAdditionalOutputs;
+    this.externalMainOutputs = externalMainOutputs;
+    this.externalAdditionalOutputs = externalAdditionalOutputs;
   }
 
-  @Override
-  public <T> void emit(final String dstVertexId, final T output) {
-    if (this.mainTagOutputChildren.contains(dstVertexId)) {
-      // This dstVertexId is for the main tag
-      emit((O) output);
-    } else {
-      // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-      final List<Object> dataElements = 
getAdditionalTaggedDataFromDstVertexId(dstVertexId);
-      dataElements.add(output);
-    }
+  private void emit(final OperatorVertex vertex, final O output) {
+    vertex.getTransform().onData(output);
   }
 
-  public Iterable<O> iterateMain() {
-    return mainTagElements;
+  private void emit(final OutputWriter writer, final O output) {
+    writer.write(output);
   }
 
-  public Iterable<Object> iterateTag(final String tag) {
-    if (this.mainTagOutputChildren.contains(tag)) {
-      // This dstVertexId is for the main tag
-      return (Iterable<Object>) iterateMain();
-    } else {
-      return getAdditionalTaggedDataFromTag(tag);
+  @Override
+  public void emit(final O output) {
+    for (final OperatorVertex internalVertex : internalMainOutputs) {
+      emit(internalVertex, output);
     }
-  }
-
-  public void clearMain() {
-    mainTagElements.clear();
-  }
 
-  public void clearTag(final String tag) {
-    if (this.mainTagOutputChildren.contains(tag)) {
-      // This dstVertexId is for the main tag
-      clearMain();
-    } else {
-      // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-      final List<Object> dataElements = getAdditionalTaggedDataFromTag(tag);
-      dataElements.clear();
+    for (final OutputWriter externalWriter : externalMainOutputs) {
+      emit(externalWriter, output);
     }
   }
 
-  public List<O> getMainTagOutputQueue() {
-    return mainTagElements;
-  }
-
-  public List<Object> getAdditionalTagOutputQueue(final String dstVertexId) {
-    if (this.mainTagOutputChildren.contains(dstVertexId)) {
-      return (List<Object>) this.mainTagElements;
-    } else {
-      return getAdditionalTaggedDataFromDstVertexId(dstVertexId);
-    }
-  }
+  @Override
+  public <T> void emit(final String dstVertexId, final T output) {
 
-  private List<Object> getAdditionalTaggedDataFromDstVertexId(final String 
dstVertexId) {
-    final Pair<String, String> tagAndChild =
-      this.additionalTaggedChildToElementsMap.keySet().stream()
-        .filter(key -> key.right().equals(dstVertexId))
-        .findAny().orElseThrow(() -> new RuntimeException("Wrong destination 
vertex id passed!"));
-    final List<Object> dataElements = 
this.additionalTaggedChildToElementsMap.get(tagAndChild);
-    if (dataElements == null) {
-      throw new IllegalArgumentException("Wrong destination vertex id 
passed!");
+    if (internalAdditionalOutputs.containsKey(dstVertexId)) {
+      for (final OperatorVertex internalVertex : 
internalAdditionalOutputs.get(dstVertexId)) {
+        emit(internalVertex, (O) output);
+      }
     }
-    return dataElements;
-  }
 
-  private List<Object> getAdditionalTaggedDataFromTag(final String tag) {
-    final Pair<String, String> tagAndChild =
-      this.additionalTaggedChildToElementsMap.keySet().stream()
-        .filter(key -> key.left().equals(tag))
-        .findAny().orElseThrow(() -> new RuntimeException("Wrong tag " + tag + 
" passed!"));
-    final List<Object> dataElements = 
this.additionalTaggedChildToElementsMap.get(tagAndChild);
-    if (dataElements == null) {
-      throw new IllegalArgumentException("Wrong tag " + tag + " passed!");
+    if (externalAdditionalOutputs.containsKey(dstVertexId)) {
+      for (final OutputWriter externalWriter : 
externalAdditionalOutputs.get(dstVertexId)) {
+        emit(externalWriter, (O) output);
+      }
     }
-    return dataElements;
   }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 9035cbe..4cf789e 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -18,6 +18,7 @@ package org.apache.nemo.runtime.executor.task;
 import com.google.common.collect.Lists;
 import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import 
org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
 import 
org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
@@ -44,6 +45,8 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.nemo.runtime.executor.datatransfer.DynOptDataOutputCollector;
+import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +60,6 @@ import javax.annotation.concurrent.NotThreadSafe;
 public final class TaskExecutor {
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class.getName());
   private static final int NONE_FINISHED = -1;
-  private static final String NULL_KEY = "NULL";
 
   // Essential information
   private boolean isExecuted;
@@ -149,38 +151,39 @@ public final class TaskExecutor {
     final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
     final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
     reverseTopologicallySorted.forEach(irVertex -> {
-      final List<VertexHarness> children = getChildrenHarnesses(irVertex, 
irVertexDag, vertexIdToHarness);
       final Optional<Readable> sourceReader = getSourceVertexReader(irVertex, 
task.getIrVertexIdToReadable());
       if (sourceReader.isPresent() != irVertex instanceof SourceVertex) {
         throw new IllegalStateException(irVertex.toString());
       }
 
-      // Prepare data WRITE
-      // Child-task writes
-      final Map<String, String> additionalOutputMap =
-          getAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), 
irVertexDag);
-
-      final List<Boolean> isToAdditionalTagOutputs = children.stream()
-          .map(harness -> harness.getIRVertex().getId())
-          .map(additionalOutputMap::containsValue)
-          .collect(Collectors.toList());
-
-      // Handle writes
-      // Main output children task writes
-      final List<OutputWriter> mainChildrenTaskWriters = 
getMainChildrenTaskWriters(
-        irVertex, task.getTaskOutgoingEdges(), dataTransferFactory, 
additionalOutputMap);
-      final Map<String, OutputWriter> additionalChildrenTaskWriters = 
getAdditionalChildrenTaskWriters(
-        irVertex, task.getTaskOutgoingEdges(), dataTransferFactory, 
additionalOutputMap);
-      // Intra-task writes
-      final List<String> additionalOutputVertices = new 
ArrayList<>(additionalOutputMap.values());
-      final Set<String> mainChildren =
-          getMainOutputVertices(irVertex, irVertexDag, 
task.getTaskOutgoingEdges(), additionalOutputVertices);
-      final OutputCollectorImpl oci = new OutputCollectorImpl(mainChildren, 
additionalOutputMap);
+      // Additional outputs
+      final Map<String, List<OperatorVertex>> internalAdditionalOutputMap =
+        getInternalAdditionalOutputMap(irVertex, irVertexDag);
+      final Map<String, List<OutputWriter>> externalAdditionalOutputMap =
+        getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), 
dataTransferFactory);
+
+      // Main outputs
+      final List<OperatorVertex> internalMainOutputs = 
getInternalMainOutputs(irVertex, irVertexDag);
+      final List<OutputWriter> externalMainOutputs =
+        getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), 
dataTransferFactory);
+
+      final OutputCollector outputCollector;
+
+      if (irVertex instanceof OperatorVertex
+        && ((OperatorVertex) irVertex).getTransform() instanceof 
AggregateMetricTransform) {
+        outputCollector = new DynOptDataOutputCollector(
+          irVertex, persistentConnectionToMasterMap, this);
+      } else {
+        outputCollector = new OutputCollectorImpl(
+          irVertex, internalMainOutputs, internalAdditionalOutputMap,
+          externalMainOutputs, externalAdditionalOutputMap);
+      }
 
       // Create VERTEX HARNESS
       final VertexHarness vertexHarness = new VertexHarness(
-        irVertex, oci, children, isToAdditionalTagOutputs, 
mainChildrenTaskWriters, additionalChildrenTaskWriters,
-        new TransformContextImpl(broadcastManagerWorker, additionalOutputMap));
+        irVertex, outputCollector, new 
TransformContextImpl(broadcastManagerWorker),
+        externalMainOutputs, externalAdditionalOutputMap);
+
       prepareTransform(vertexHarness);
       vertexIdToHarness.put(irVertex.getId(), vertexHarness);
 
@@ -190,6 +193,7 @@ public final class TaskExecutor {
         // Source vertex read
         nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex, 
sourceReader.get(), vertexHarness));
       }
+
       // Parent-task read (broadcasts)
       final List<StageEdge> inEdgesForThisVertex = task.getTaskIncomingEdges()
         .stream()
@@ -229,15 +233,16 @@ public final class TaskExecutor {
   }
 
   /**
-   * Recursively process a data element down the DAG dependency.
+   * Process a data element down the DAG dependency.
    *
    * @param vertexHarness VertexHarness of a vertex to execute.
    * @param dataElement   input data element to process.
    */
-  private void processElementRecursively(final VertexHarness vertexHarness, 
final Object dataElement) {
+  private void processElement(final VertexHarness vertexHarness, final Object 
dataElement) {
     final IRVertex irVertex = vertexHarness.getIRVertex();
-    final OutputCollectorImpl outputCollector = 
vertexHarness.getOutputCollector();
+    final OutputCollector outputCollector = vertexHarness.getOutputCollector();
 
+    // TODO #XXX: optimize processElement (do not check instanceof for each 
data element)
     if (irVertex instanceof SourceVertex) {
       outputCollector.emit(dataElement);
     } else if (irVertex instanceof OperatorVertex) {
@@ -246,19 +251,6 @@ public final class TaskExecutor {
     } else {
       throw new UnsupportedOperationException("This type of IRVertex is not 
supported");
     }
-
-    // Given a single input element, a vertex can produce many output elements.
-    // Here, we recursively process all of the main output elements.
-    outputCollector.iterateMain().forEach(element ->
-      handleMainOutputElement(vertexHarness, element)); // Recursion
-    outputCollector.clearMain();
-
-    // Recursively process all of the additional output elements.
-    vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
-      outputCollector.iterateTag(tag).forEach(element ->
-        handleAdditionalOutputElement(vertexHarness, element, tag)); // 
Recursion
-      outputCollector.clearTag(tag);
-    });
   }
 
   /**
@@ -315,85 +307,9 @@ public final class TaskExecutor {
     }
   }
 
-  /**
-   * Send aggregated statistics for dynamic optimization to master.
-   * @param dynOptData the statistics to send.
-   */
-  public void sendDynOptData(final Object dynOptData) {
-    Map<Object, Long> aggregatedDynOptData = (Map<Object, Long>) dynOptData;
-    final List<ControlMessage.PartitionSizeEntry> partitionSizeEntries = new 
ArrayList<>();
-    aggregatedDynOptData.forEach((key, size) ->
-      partitionSizeEntries.add(
-        ControlMessage.PartitionSizeEntry.newBuilder()
-          .setKey(key == null ? NULL_KEY : String.valueOf(key))
-          .setSize(size)
-          .build())
-    );
-
-    
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
-      .send(ControlMessage.Message.newBuilder()
-        .setId(RuntimeIdManager.generateMessageId())
-        .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
-        .setType(ControlMessage.MessageType.DataSizeMetric)
-        .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
-          .addAllPartitionSize(partitionSizeEntries)
-        )
-        .build());
-  }
-
   private void finalizeVertex(final VertexHarness vertexHarness) {
     closeTransform(vertexHarness);
-
-    final OutputCollectorImpl outputCollector = 
vertexHarness.getOutputCollector();
-    final IRVertex v = vertexHarness.getIRVertex();
-    if (v instanceof OperatorVertex
-      && ((OperatorVertex) v).getTransform() instanceof 
AggregateMetricTransform) {
-      // send aggregated dynamic optimization data to master
-      final Object aggregatedDynOptData = 
outputCollector.iterateMain().iterator().next();
-      sendDynOptData(aggregatedDynOptData);
-      // set the id of this vertex to mark the corresponding stage as put on 
hold
-      setIRVertexPutOnHold(v);
-    } else {
-      // handle main outputs
-      outputCollector.iterateMain().forEach(element -> {
-        handleMainOutputElement(vertexHarness, element);
-      }); // Recursion
-      outputCollector.clearMain();
-
-      // handle intra-task additional tagged outputs
-      vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
-        outputCollector.iterateTag(tag).forEach(
-          element -> handleAdditionalOutputElement(vertexHarness, element, 
tag)); // Recursion
-        outputCollector.clearTag(tag);
-      });
-
-      // handle inter-task additional tagged outputs
-      vertexHarness.getTagToAdditionalChildrenId().keySet().forEach(tag -> {
-        outputCollector.iterateTag(tag).forEach(
-          element -> handleAdditionalOutputElement(vertexHarness, element, 
tag)); // Recursion
-        outputCollector.clearTag(tag);
-      });
-
-      finalizeOutputWriters(vertexHarness);
-    }
-  }
-
-  private void handleMainOutputElement(final VertexHarness harness, final 
Object element) {
-    // writes to children tasks
-    harness.getWritersToMainChildrenTasks().forEach(outputWriter -> 
outputWriter.write(element));
-    // process elements in the next vertices within a task
-    harness.getMainTagChildren().forEach(child -> 
processElementRecursively(child, element));
-  }
-
-  private void handleAdditionalOutputElement(final VertexHarness harness, 
final Object element, final String tag) {
-    // writes to additional children tasks
-    harness.getWritersToAdditionalChildrenTasks().entrySet().stream()
-      .filter(kv -> kv.getKey().equals(tag))
-      .forEach(kv -> kv.getValue().write(element));
-    // process elements in the next vertices within a task
-    harness.getAdditionalTagOutputChildren().entrySet().stream()
-      .filter(kv -> kv.getKey().equals(tag))
-      .forEach(kv -> processElementRecursively(kv.getValue(), element));
+    finalizeOutputWriters(vertexHarness);
   }
 
   /**
@@ -429,7 +345,7 @@ public final class TaskExecutor {
         }
 
         // Successfully fetched an element
-        processElementRecursively(dataFetcher.getChild(), element);
+        processElement(dataFetcher.getChild(), element);
       }
 
       // Remove the finished fetcher from the list
@@ -441,76 +357,54 @@ public final class TaskExecutor {
   }
 
   ////////////////////////////////////////////// Helper methods for setting up 
initial data structures
-
-  private Map<String, String> getAdditionalOutputMap(final IRVertex irVertex,
-                                                     final List<StageEdge> 
outEdgesToChildrenTasks,
-                                                     final DAG<IRVertex, 
RuntimeEdge<IRVertex>> irVertexDag) {
-    final Map<String, String> additionalOutputMap = new HashMap<>();
-
-    // Add all intra-task additional tags to additional output map.
-    irVertexDag.getOutgoingEdgesOf(irVertex.getId())
-      .stream()
-      .filter(edge -> 
edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
-      .map(edge ->
-        
Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), 
edge.getDst().getId()))
-      .forEach(pair -> additionalOutputMap.put(pair.left(), pair.right()));
-
+  private Map<String, List<OutputWriter>> getExternalAdditionalOutputMap(
+    final IRVertex irVertex,
+    final List<StageEdge> outEdgesToChildrenTasks,
+    final DataTransferFactory dataTransferFactory) {
     // Add all inter-task additional tags to additional output map.
+    final Map<String, List<OutputWriter>> map = new HashMap<>();
+
     outEdgesToChildrenTasks
       .stream()
       .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
       .filter(edge -> 
edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
       .map(edge ->
-        
Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), 
edge.getDstIRVertex().getId()))
-      .forEach(pair -> additionalOutputMap.put(pair.left(), pair.right()));
+        Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(),
+          dataTransferFactory.createWriter(taskId, edge.getDstIRVertex(), 
edge)))
+      .forEach(pair -> {
+        map.putIfAbsent(pair.left(), new ArrayList<>());
+        map.get(pair.left()).add(pair.right());
+      });
 
-    return additionalOutputMap;
+    return map;
   }
 
-  private Optional<Readable> getSourceVertexReader(final IRVertex irVertex,
-                                                   final Map<String, Readable> 
irVertexIdToReadable) {
-    if (irVertex instanceof SourceVertex) {
-      final Readable readable = irVertexIdToReadable.get(irVertex.getId());
-      if (readable == null) {
-        throw new IllegalStateException(irVertex.toString());
-      }
-      return Optional.of(readable);
-    } else {
-      return Optional.empty();
-    }
-  }
+  private Map<String, List<OperatorVertex>> getInternalAdditionalOutputMap(
+    final IRVertex irVertex,
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+    // Add all intra-task additional tags to additional output map.
+    final Map<String, List<OperatorVertex>> map = new HashMap<>();
 
-  private List<InputReader> getParentTaskReaders(final int taskIndex,
-                                                 final List<StageEdge> 
inEdgesFromParentTasks,
-                                                 final DataTransferFactory 
dataTransferFactory) {
-    return inEdgesFromParentTasks
+    irVertexDag.getOutgoingEdgesOf(irVertex.getId())
       .stream()
-      .map(inEdgeForThisVertex -> dataTransferFactory
-        .createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(), 
inEdgeForThisVertex))
-      .collect(Collectors.toList());
+      .filter(edge -> 
edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
+      .map(edge ->
+        
Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), 
(OperatorVertex) edge.getDst()))
+      .forEach(pair -> {
+        map.putIfAbsent(pair.left(), new ArrayList<>());
+        map.get(pair.left()).add(pair.right());
+      });
+
+    return map;
   }
 
-  private Set<String> getMainOutputVertices(final IRVertex irVertex,
-                                            final DAG<IRVertex, 
RuntimeEdge<IRVertex>> irVertexDag,
-                                            final List<StageEdge> 
outEdgesToChildrenTasks,
-                                            final List<String> 
additionalOutputVertices) {
-    // all intra-task children vertices id
-    final List<String> outputVertices = 
irVertexDag.getOutgoingEdgesOf(irVertex).stream()
-      .filter(edge -> edge.getSrc().getId().equals(irVertex.getId()))
-      .map(edge -> edge.getDst().getId())
+  private List<OperatorVertex> getInternalMainOutputs(final IRVertex irVertex,
+                                                     final DAG<IRVertex, 
RuntimeEdge<IRVertex>> irVertexDag) {
+    return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
+      .stream()
+      .filter(edge -> 
!edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
+      .map(edge -> (OperatorVertex) edge.getDst())
       .collect(Collectors.toList());
-
-    // all inter-task children vertices id
-    outputVertices
-      .addAll(outEdgesToChildrenTasks.stream()
-        .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
-        .map(edge -> edge.getDstIRVertex().getId())
-        .collect(Collectors.toList()));
-
-    // return vertices that are not marked as additional tagged outputs
-    return new HashSet<>(outputVertices.stream()
-      .filter(vertexId -> !additionalOutputVertices.contains(vertexId))
-      .collect(Collectors.toList()));
   }
 
   /**
@@ -519,66 +413,42 @@ public final class TaskExecutor {
    * @param irVertex                source irVertex
    * @param outEdgesToChildrenTasks outgoing edges to child tasks
    * @param dataTransferFactory     dataTransferFactory
-   * @param taggedOutputs           tag to vertex id map
    * @return OutputWriters for main children tasks
    */
-  private List<OutputWriter> getMainChildrenTaskWriters(final IRVertex 
irVertex,
-                                                        final List<StageEdge> 
outEdgesToChildrenTasks,
-                                                        final 
DataTransferFactory dataTransferFactory,
-                                                        final Map<String, 
String> taggedOutputs) {
+  private List<OutputWriter> getExternalMainOutputs(final IRVertex irVertex,
+                                                   final List<StageEdge> 
outEdgesToChildrenTasks,
+                                                   final DataTransferFactory 
dataTransferFactory) {
     return outEdgesToChildrenTasks
       .stream()
-      .filter(outEdge -> 
outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
-      .filter(outEdge -> 
!taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
+      .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
+      .filter(edge -> 
!edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
       .map(outEdgeForThisVertex -> dataTransferFactory
         .createWriter(taskId, outEdgeForThisVertex.getDstIRVertex(), 
outEdgeForThisVertex))
       .collect(Collectors.toList());
   }
 
-  /**
-   * Return inter-task OutputWriters associated with additional output tags.
-   *
-   * @param irVertex                source irVertex
-   * @param outEdgesToChildrenTasks outgoing edges to child tasks
-   * @param dataTransferFactory     dataTransferFactory
-   * @param taggedOutputs           tag to vertex id map
-   * @return additional tag to OutputWriters map.
-   */
-  private Map<String, OutputWriter> getAdditionalChildrenTaskWriters(final 
IRVertex irVertex,
-                                                                     final 
List<StageEdge> outEdgesToChildrenTasks,
-                                                                     final 
DataTransferFactory dataTransferFactory,
-                                                                     final 
Map<String, String> taggedOutputs) {
-    final Map<String, OutputWriter> additionalChildrenTaskWriters = new 
HashMap<>();
 
-    outEdgesToChildrenTasks
-        .stream()
-        .filter(outEdge -> 
outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
-        .filter(outEdge -> 
taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
-        .forEach(outEdgeForThisVertex -> {
-          final String tag = taggedOutputs.entrySet().stream()
-            .filter(e -> 
e.getValue().equals(outEdgeForThisVertex.getDstIRVertex().getId()))
-            .findAny().orElseThrow(() -> new RuntimeException("Unexpected 
error while finding tag"))
-            .getKey();
-          additionalChildrenTaskWriters.put(tag,
-              dataTransferFactory.createWriter(taskId, 
outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex));
-        });
-
-    return additionalChildrenTaskWriters;
+  private Optional<Readable> getSourceVertexReader(final IRVertex irVertex,
+                                                   final Map<String, Readable> 
irVertexIdToReadable) {
+    if (irVertex instanceof SourceVertex) {
+      final Readable readable = irVertexIdToReadable.get(irVertex.getId());
+      if (readable == null) {
+        throw new IllegalStateException(irVertex.toString());
+      }
+      return Optional.of(readable);
+    } else {
+      return Optional.empty();
+    }
   }
 
-  private List<VertexHarness> getChildrenHarnesses(final IRVertex irVertex,
-                                                   final DAG<IRVertex, 
RuntimeEdge<IRVertex>> irVertexDag,
-                                                   final Map<String, 
VertexHarness> vertexIdToHarness) {
-    final List<VertexHarness> childrenHandlers = 
irVertexDag.getChildren(irVertex.getId())
+  private List<InputReader> getParentTaskReaders(final int taskIndex,
+                                                 final List<StageEdge> 
inEdgesFromParentTasks,
+                                                 final DataTransferFactory 
dataTransferFactory) {
+    return inEdgesFromParentTasks
       .stream()
-      .map(IRVertex::getId)
-      .map(vertexIdToHarness::get)
+      .map(inEdgeForThisVertex -> dataTransferFactory
+        .createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(), 
inEdgeForThisVertex))
       .collect(Collectors.toList());
-    if (childrenHandlers.stream().anyMatch(harness -> harness == null)) {
-      // Sanity check: there shouldn't be a null harness.
-      throw new IllegalStateException(childrenHandlers.toString());
-    }
-    return childrenHandlers;
   }
 
   ////////////////////////////////////////////// Transform-specific helper 
methods
@@ -612,7 +482,7 @@ public final class TaskExecutor {
 
   ////////////////////////////////////////////// Misc
 
-  private void setIRVertexPutOnHold(final IRVertex irVertex) {
+  public void setIRVertexPutOnHold(final IRVertex irVertex) {
     idOfVertexPutOnHold = irVertex.getId();
   }
 
@@ -634,17 +504,20 @@ public final class TaskExecutor {
     });
 
     // finalize OutputWriters for additional tagged children
-    
vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriter
 -> {
-      outputWriter.close();
-
-      final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
-      writtenBytes.ifPresent(writtenBytesList::add);
+    
vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriters
 -> {
+      outputWriters.forEach(outputWriter -> {
+        outputWriter.close();
+        final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
+        writtenBytes.ifPresent(writtenBytesList::add);
+      });
     });
 
     long totalWrittenBytes = 0;
     for (final Long writtenBytes : writtenBytesList) {
       totalWrittenBytes += writtenBytes;
     }
+
+    // TODO #236: Decouple metric collection and sending logic
     metricMessageSender.send("TaskMetric", taskId,
       "writtenBytes", SerializationUtils.serialize(totalWrittenBytes));
   }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
index 39cb88d..a6a73d4 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
@@ -15,17 +15,14 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
 import org.apache.nemo.runtime.executor.datatransfer.OutputWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * Captures the relationship between a non-source IRVertex's outputCollector, 
and mainTagChildren vertices.
@@ -35,48 +32,20 @@ final class VertexHarness {
 
   // IRVertex and transform-specific information
   private final IRVertex irVertex;
-  private final OutputCollectorImpl outputCollector;
+  private final OutputCollector outputCollector;
   private final Transform.Context context;
-  private final List<VertexHarness> mainTagChildren;
-
-  // These lists can be empty
-  private final Map<String, VertexHarness> additionalTagOutputChildren;
-  private final Map<String, String> tagToAdditionalChildrenId;
-  private final List<OutputWriter> writersToMainChildrenTasks;
-  private final Map<String, OutputWriter> writersToAdditionalChildrenTasks;
+  private final List<OutputWriter> externalOutputWriter;
+  private final Map<String, List<OutputWriter>> externalAdditionalOutputWriter;
 
   VertexHarness(final IRVertex irVertex,
-                final OutputCollectorImpl outputCollector,
-                final List<VertexHarness> children,
-                final List<Boolean> isAdditionalTagOutputs,
-                final List<OutputWriter> writersToMainChildrenTasks,
-                final Map<String, OutputWriter> 
writersToAdditionalChildrenTasks,
-                final Transform.Context context) {
+                final OutputCollector outputCollector,
+                final Transform.Context context,
+                final List<OutputWriter> externalOutputWriter,
+                final Map<String, List<OutputWriter>> 
externalAdditionalOutputWriter) {
     this.irVertex = irVertex;
     this.outputCollector = outputCollector;
-    if (children.size() != isAdditionalTagOutputs.size()) {
-      throw new IllegalStateException(irVertex.toString());
-    }
-    final Map<String, String> taggedOutputMap = 
context.getTagToAdditionalChildren();
-    final Map<String, VertexHarness> tagged = new HashMap<>();
-
-    // Classify input type for intra-task children
-    for (int i = 0; i < children.size(); i++) {
-      final VertexHarness child = children.get(i);
-      if (isAdditionalTagOutputs.get(i)) {
-        taggedOutputMap.entrySet().stream()
-          .filter(kv -> child.getIRVertex().getId().equals(kv.getValue()))
-          .forEach(kv -> tagged.put(kv.getKey(), child));
-      }
-    }
-
-    this.tagToAdditionalChildrenId = context.getTagToAdditionalChildren();
-    this.additionalTagOutputChildren = tagged;
-    final List<VertexHarness> mainTagChildrenTmp = new ArrayList<>(children);
-    mainTagChildrenTmp.removeAll(additionalTagOutputChildren.values());
-    this.mainTagChildren = mainTagChildrenTmp;
-    this.writersToMainChildrenTasks = writersToMainChildrenTasks;
-    this.writersToAdditionalChildrenTasks = writersToAdditionalChildrenTasks;
+    this.externalOutputWriter = externalOutputWriter;
+    this.externalAdditionalOutputWriter = externalAdditionalOutputWriter;
     this.context = context;
   }
 
@@ -88,45 +57,31 @@ final class VertexHarness {
   }
 
   /**
-   * @return OutputCollector of this irVertex.
+   * @return id of irVertex.
    */
-  OutputCollectorImpl getOutputCollector() {
-    return outputCollector;
+  String getId() {
+    return irVertex.getId();
   }
 
   /**
-   * @return mainTagChildren harnesses.
-   */
-  List<VertexHarness> getMainTagChildren() {
-    return mainTagChildren;
-  }
-
-  /**
-   * @return map of tagged output mainTagChildren. (empty if none exists)
-   */
-  public Map<String, VertexHarness> getAdditionalTagOutputChildren() {
-    return additionalTagOutputChildren;
-  }
-
-  /**
-   * @return map of tag to additional children id.
+   * @return OutputCollector of this irVertex.
    */
-  public Map<String, String> getTagToAdditionalChildrenId() {
-    return tagToAdditionalChildrenId;
+  OutputCollector getOutputCollector() {
+    return outputCollector;
   }
 
   /**
    * @return OutputWriters for main outputs of this irVertex. (empty if none 
exists)
    */
   List<OutputWriter> getWritersToMainChildrenTasks() {
-    return writersToMainChildrenTasks;
+    return externalOutputWriter;
   }
 
   /**
    * @return OutputWriters for additional tagged outputs of this irVertex. 
(empty if none exists)
    */
-  Map<String, OutputWriter> getWritersToAdditionalChildrenTasks() {
-    return writersToAdditionalChildrenTasks;
+  Map<String, List<OutputWriter>> getWritersToAdditionalChildrenTasks() {
+    return externalAdditionalOutputWriter;
   }
 
   /**
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
index 1261569..8ccb633 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
@@ -24,8 +24,6 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.*;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -39,19 +37,17 @@ import static org.mockito.Mockito.when;
 @PrepareForTest({BroadcastManagerWorker.class})
 public class TransformContextImplTest {
   private Transform.Context context;
-  private final Map<String, String> taggedOutputs = new HashMap<>();
 
   @Before
   public void setUp() {
     final BroadcastManagerWorker broadcastManagerWorker = 
mock(BroadcastManagerWorker.class);
     when(broadcastManagerWorker.get("a")).thenReturn("b");
-    this.context = new TransformContextImpl(broadcastManagerWorker, 
taggedOutputs);
+    this.context = new TransformContextImpl(broadcastManagerWorker);
   }
 
   @Test
   public void testContextImpl() {
     assertEquals("b", this.context.getBroadcastVariable("a"));
-    assertEquals(this.taggedOutputs, 
this.context.getTagToAdditionalChildren());
 
     final String sampleText = "test_text";
 
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 8e62947..9cd2b46 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -284,9 +284,13 @@ public final class TaskExecutorTest {
    * emit(element) and emit(dstVertexId, element) used together. emit(element) 
routes results to main output children,
    * and emit(dstVertexId, element) routes results to corresponding additional 
output children.
    */
-  @Test(timeout = 5000)
+  @Test(timeout=5000)
   public void testAdditionalOutputs() throws Exception {
-    final IRVertex routerVertex = new OperatorVertex(new RoutingTransform());
+    final String additionalTag1 = "bonus1";
+    final String additionalTag2 = "bonus2";
+
+    final IRVertex routerVertex = new OperatorVertex(
+      new RoutingTransform(Arrays.asList(additionalTag1, additionalTag2)));
     final IRVertex mainVertex= new OperatorVertex(new RelayTransform());
     final IRVertex bonusVertex1 = new OperatorVertex(new RelayTransform());
     final IRVertex bonusVertex2 = new OperatorVertex(new RelayTransform());
@@ -295,8 +299,8 @@ public final class TaskExecutorTest {
     final RuntimeEdge<IRVertex> edge2 = createEdge(routerVertex, bonusVertex1, 
"edge-2");
     final RuntimeEdge<IRVertex> edge3 = createEdge(routerVertex, bonusVertex2, 
"edge-3");
 
-    
edge2.getExecutionProperties().put(AdditionalOutputTagProperty.of("bonus1"));
-    
edge3.getExecutionProperties().put(AdditionalOutputTagProperty.of("bonus2"));
+    
edge2.getExecutionProperties().put(AdditionalOutputTagProperty.of(additionalTag1));
+    
edge3.getExecutionProperties().put(AdditionalOutputTagProperty.of(additionalTag2));
 
     final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new 
DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
         .addVertex(routerVertex)
@@ -514,12 +518,15 @@ public final class TaskExecutorTest {
    */
   private class RoutingTransform implements Transform<Integer, Integer> {
     private OutputCollector<Integer> outputCollector;
-    private Map<String, String> tagToVertex;
+    private final Collection<String> additionalTags;
+
+    public RoutingTransform(final Collection<String> additionalTags) {
+      this.additionalTags = additionalTags;
+    }
 
     @Override
     public void prepare(final Context context, OutputCollector<Integer> 
outputCollector) {
       this.outputCollector = outputCollector;
-      this.tagToVertex = context.getTagToAdditionalChildren();
     }
 
     @Override
@@ -530,7 +537,7 @@ public final class TaskExecutorTest {
         outputCollector.emit(i);
       } else {
         // route to all additional outputs. Invoked if user calls 
c.output(tupleTag, element)
-        tagToVertex.values().forEach(vertex -> outputCollector.emit(vertex, 
i));
+        additionalTags.forEach(tag -> outputCollector.emit(tag, i));
       }
     }
 

Reply via email to