This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 251440a [NEMO-235] Refactor TaskExecutor's data processing logic
(#128)
251440a is described below
commit 251440ad65e932ace9fd16b6d74ea8358d1638b0
Author: Taegeon Um <[email protected]>
AuthorDate: Fri Oct 26 08:57:19 2018 +0900
[NEMO-235] Refactor TaskExecutor's data processing logic (#128)
JIRA: [NEMO-235: Refactor TaskExecutor's data processing
logic](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-235)
**Major changes:**
- Refactor `TaskExecutor` by delegating data transfer logic to
`OutputCollector`
**Minor changes to note:**
-
**Tests for the changes:**
-
**Other comments:**
-
Closes #128
---
.../nemo/common/ir/vertex/transform/Transform.java | 2 -
.../beam/transform/AbstractDoFnTransform.java | 3 +-
.../beam/transform/DefaultOutputManager.java | 8 +-
.../compiletime/reshaping/SkewReshapingPass.java | 6 +-
.../frontend/beam/transform/DoFnTransformTest.java | 1 -
.../runtime/executor/TransformContextImpl.java | 12 +-
.../datatransfer/DynOptDataOutputCollector.java | 83 ++++++
.../executor/datatransfer/OutputCollectorImpl.java | 132 +++------
.../nemo/runtime/executor/task/TaskExecutor.java | 327 +++++++--------------
.../nemo/runtime/executor/task/VertexHarness.java | 85 ++----
.../runtime/executor/TransformContextImplTest.java | 6 +-
.../runtime/executor/task/TaskExecutorTest.java | 21 +-
12 files changed, 273 insertions(+), 413 deletions(-)
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
index 87db84f..b59344a 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
@@ -17,7 +17,6 @@ package org.apache.nemo.common.ir.vertex.transform;
import org.apache.nemo.common.ir.OutputCollector;
import java.io.Serializable;
-import java.util.Map;
import java.util.Optional;
/**
@@ -54,7 +53,6 @@ public interface Transform<I, O> extends Serializable {
* @return the broadcast variable.
*/
Object getBroadcastVariable(Serializable id);
- Map<String, String> getTagToAdditionalChildren();
/**
* Put serialized data to send to the executor.
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 8679c73..444dfb4 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -119,8 +119,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT,
OutputT> implements
this.outputCollector = oc;
// create output manager
- outputManager = new DefaultOutputManager<>(
- outputCollector, context, mainOutputTag);
+ outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);
// create side input reader
if (!sideInputs.isEmpty()) {
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
index 4174c6c..457a128 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
@@ -19,9 +19,6 @@ import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-
-import java.util.Map;
/**
* Default output emitter that uses outputCollector.
@@ -30,14 +27,11 @@ import java.util.Map;
public final class DefaultOutputManager<OutputT> implements
DoFnRunners.OutputManager {
private final TupleTag<OutputT> mainOutputTag;
private final OutputCollector<WindowedValue<OutputT>> outputCollector;
- private final Map<String, String> additionalOutputs;
DefaultOutputManager(final OutputCollector<WindowedValue<OutputT>>
outputCollector,
- final Transform.Context context,
final TupleTag<OutputT> mainOutputTag) {
this.outputCollector = outputCollector;
this.mainOutputTag = mainOutputTag;
- this.additionalOutputs = context.getTagToAdditionalChildren();
}
@Override
@@ -45,7 +39,7 @@ public final class DefaultOutputManager<OutputT> implements
DoFnRunners.OutputMa
if (tag.equals(mainOutputTag)) {
outputCollector.emit((WindowedValue<OutputT>) output);
} else {
- outputCollector.emit(additionalOutputs.get(tag.getId()), output);
+ outputCollector.emit(tag.getId(), output);
}
}
}
diff --git
a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index e4743b5..eea5867 100644
---
a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++
b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -53,7 +53,7 @@ import java.util.function.BiFunction;
@Requires(CommunicationPatternProperty.class)
public final class SkewReshapingPass extends ReshapingPass {
private static final Logger LOG =
LoggerFactory.getLogger(SkewReshapingPass.class.getName());
-
+ private static final String ADDITIONAL_OUTPUT_TAG = "DynOptData";
/**
* Default constructor.
*/
@@ -154,7 +154,7 @@ public final class SkewReshapingPass extends ReshapingPass {
(dynOptData, outputCollector)-> {
dynOptData.forEach((k, v) -> {
final Pair<Object, Object> pairData = Pair.of(k, v);
- outputCollector.emit(abv.getId(), pairData);
+ outputCollector.emit(ADDITIONAL_OUTPUT_TAG, pairData);
});
return dynOptData;
};
@@ -180,7 +180,7 @@ public final class SkewReshapingPass extends ReshapingPass {
newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
- newEdge.setProperty(AdditionalOutputTagProperty.of("DynOptData"));
+ newEdge.setProperty(AdditionalOutputTagProperty.of(ADDITIONAL_OUTPUT_TAG));
// Dynamic optimization handles statistics on key-value data by default.
// We need to get coders for encoding/decoding the keys to send data to
diff --git
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index 9a65e7a..db04f1e 100644
---
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -115,7 +115,6 @@ public final class DoFnTransformTest {
// mock context
final Transform.Context context = mock(Transform.Context.class);
- when(context.getTagToAdditionalChildren()).thenReturn(tagsMap);
final OutputCollector<WindowedValue<String>> oc = new
TestOutputCollector<>();
doFnTransform.prepare(context, oc);
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
index 682cf3c..bd22271 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/TransformContextImpl.java
@@ -19,7 +19,6 @@ import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
import java.io.Serializable;
-import java.util.Map;
import java.util.Optional;
/**
@@ -27,18 +26,14 @@ import java.util.Optional;
*/
public final class TransformContextImpl implements Transform.Context {
private final BroadcastManagerWorker broadcastManagerWorker;
- private final Map<String, String> tagToAdditionalChildren;
private String data;
/**
* Constructor of Context Implementation.
* @param broadcastManagerWorker for broadcast variables.
- * @param tagToAdditionalChildren tag id to additional vertices id map.
*/
- public TransformContextImpl(final BroadcastManagerWorker
broadcastManagerWorker,
- final Map<String, String>
tagToAdditionalChildren) {
+ public TransformContextImpl(final BroadcastManagerWorker
broadcastManagerWorker) {
this.broadcastManagerWorker = broadcastManagerWorker;
- this.tagToAdditionalChildren = tagToAdditionalChildren;
this.data = null;
}
@@ -48,11 +43,6 @@ public final class TransformContextImpl implements
Transform.Context {
}
@Override
- public Map<String, String> getTagToAdditionalChildren() {
- return this.tagToAdditionalChildren;
- }
-
- @Override
public void setSerializedData(final String serializedData) {
this.data = serializedData;
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
new file mode 100644
index 0000000..c2a013a
--- /dev/null
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
+import org.apache.nemo.runtime.executor.task.TaskExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * OutputCollector for dynamic optimization data.
+ *
+ * @param <O> output type.
+ */
+public final class DynOptDataOutputCollector<O> implements OutputCollector<O> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynOptDataOutputCollector.class.getName());
+ private static final String NULL_KEY = "NULL";
+
+ private final IRVertex irVertex;
+ private final PersistentConnectionToMasterMap connectionToMasterMap;
+ private final TaskExecutor taskExecutor;
+
+ public DynOptDataOutputCollector(final IRVertex irVertex,
+ final PersistentConnectionToMasterMap
connectionToMasterMap,
+ final TaskExecutor taskExecutor) {
+ this.irVertex = irVertex;
+ this.connectionToMasterMap = connectionToMasterMap;
+ this.taskExecutor = taskExecutor;
+ }
+
+ @Override
+ public void emit(final O output) {
+ final Map<Object, Long> aggregatedDynOptData = (Map<Object, Long>) output;
+ final List<ControlMessage.PartitionSizeEntry> partitionSizeEntries = new
ArrayList<>();
+ aggregatedDynOptData.forEach((key, size) ->
+ partitionSizeEntries.add(
+ ControlMessage.PartitionSizeEntry.newBuilder()
+ .setKey(key == null ? NULL_KEY : String.valueOf(key))
+ .setSize(size)
+ .build())
+ );
+
+
connectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+ .send(ControlMessage.Message.newBuilder()
+ .setId(RuntimeIdManager.generateMessageId())
+ .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+ .setType(ControlMessage.MessageType.DataSizeMetric)
+ .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
+ .addAllPartitionSize(partitionSizeEntries)
+ )
+ .build());
+
+ // set the id of this vertex to mark the corresponding stage as put on hold
+ taskExecutor.setIRVertexPutOnHold(irVertex);
+ }
+
+ @Override
+ public <T> void emit(final String dstVertexId, final T output) {
+ throw new IllegalStateException("Dynamic optimization does not emit tagged
data");
+ }
+}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
index b489cbb..aa510e9 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
@@ -15,8 +15,9 @@
*/
package org.apache.nemo.runtime.executor.datatransfer;
-import org.apache.nemo.common.Pair;
import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,110 +25,75 @@ import java.util.*;
/**
* OutputCollector implementation.
+ * This emits four types of outputs
+ * 1) internal main outputs: this output becomes the input of internal
Transforms
+ * 2) internal additional outputs: this additional output becomes the input of
internal Transforms
+ * 3) external main outputs: this external output is emitted to OutputWriter
+ * 4) external additional outputs: this external output is emitted to
OutputWriter
*
* @param <O> output type.
*/
public final class OutputCollectorImpl<O> implements OutputCollector<O> {
private static final Logger LOG =
LoggerFactory.getLogger(OutputCollectorImpl.class.getName());
- private final Set<String> mainTagOutputChildren;
- // Use ArrayList (not Queue) to allow 'null' values
- private final ArrayList<O> mainTagElements;
- // Key: Pair of tag and destination vertex id
- // Value: data elements which will be input to the tagged destination vertex
- private final Map<Pair<String, String>, ArrayList<Object>>
additionalTaggedChildToElementsMap;
+
+ private final IRVertex irVertex;
+ private final List<OperatorVertex> internalMainOutputs;
+ private final Map<String, List<OperatorVertex>> internalAdditionalOutputs;
+ private final List<OutputWriter> externalMainOutputs;
+ private final Map<String, List<OutputWriter>> externalAdditionalOutputs;
/**
- * Constructor of a new OutputCollectorImpl with tagged outputs.
- * @param mainChildren main children vertices
- * @param tagToChildren additional children vertices
+ * Constructor of the output collector.
+ * @param irVertex the ir vertex that emits the output
+ * @param internalMainOutputs internal main outputs
+ * @param internalAdditionalOutputs internal additional outputs
+ * @param externalMainOutputs external main outputs
+ * @param externalAdditionalOutputs external additional outputs
*/
- public OutputCollectorImpl(final Set<String> mainChildren,
- final Map<String, String> tagToChildren) {
- this.mainTagOutputChildren = mainChildren;
- this.mainTagElements = new ArrayList<>(1);
- this.additionalTaggedChildToElementsMap = new HashMap<>();
- tagToChildren.forEach((tag, child) ->
- this.additionalTaggedChildToElementsMap.put(Pair.of(tag, child), new
ArrayList<>(1)));
- }
-
- @Override
- public void emit(final O output) {
- mainTagElements.add(output);
+ public OutputCollectorImpl(final IRVertex irVertex,
+ final List<OperatorVertex> internalMainOutputs,
+ final Map<String, List<OperatorVertex>>
internalAdditionalOutputs,
+ final List<OutputWriter> externalMainOutputs,
+ final Map<String, List<OutputWriter>>
externalAdditionalOutputs) {
+ this.irVertex = irVertex;
+ this.internalMainOutputs = internalMainOutputs;
+ this.internalAdditionalOutputs = internalAdditionalOutputs;
+ this.externalMainOutputs = externalMainOutputs;
+ this.externalAdditionalOutputs = externalAdditionalOutputs;
}
- @Override
- public <T> void emit(final String dstVertexId, final T output) {
- if (this.mainTagOutputChildren.contains(dstVertexId)) {
- // This dstVertexId is for the main tag
- emit((O) output);
- } else {
- // Note that String#hashCode() can be cached, thus accessing additional
output queues can be fast.
- final List<Object> dataElements =
getAdditionalTaggedDataFromDstVertexId(dstVertexId);
- dataElements.add(output);
- }
+ private void emit(final OperatorVertex vertex, final O output) {
+ vertex.getTransform().onData(output);
}
- public Iterable<O> iterateMain() {
- return mainTagElements;
+ private void emit(final OutputWriter writer, final O output) {
+ writer.write(output);
}
- public Iterable<Object> iterateTag(final String tag) {
- if (this.mainTagOutputChildren.contains(tag)) {
- // This dstVertexId is for the main tag
- return (Iterable<Object>) iterateMain();
- } else {
- return getAdditionalTaggedDataFromTag(tag);
+ @Override
+ public void emit(final O output) {
+ for (final OperatorVertex internalVertex : internalMainOutputs) {
+ emit(internalVertex, output);
}
- }
-
- public void clearMain() {
- mainTagElements.clear();
- }
- public void clearTag(final String tag) {
- if (this.mainTagOutputChildren.contains(tag)) {
- // This dstVertexId is for the main tag
- clearMain();
- } else {
- // Note that String#hashCode() can be cached, thus accessing additional
output queues can be fast.
- final List<Object> dataElements = getAdditionalTaggedDataFromTag(tag);
- dataElements.clear();
+ for (final OutputWriter externalWriter : externalMainOutputs) {
+ emit(externalWriter, output);
}
}
- public List<O> getMainTagOutputQueue() {
- return mainTagElements;
- }
-
- public List<Object> getAdditionalTagOutputQueue(final String dstVertexId) {
- if (this.mainTagOutputChildren.contains(dstVertexId)) {
- return (List<Object>) this.mainTagElements;
- } else {
- return getAdditionalTaggedDataFromDstVertexId(dstVertexId);
- }
- }
+ @Override
+ public <T> void emit(final String dstVertexId, final T output) {
- private List<Object> getAdditionalTaggedDataFromDstVertexId(final String
dstVertexId) {
- final Pair<String, String> tagAndChild =
- this.additionalTaggedChildToElementsMap.keySet().stream()
- .filter(key -> key.right().equals(dstVertexId))
- .findAny().orElseThrow(() -> new RuntimeException("Wrong destination
vertex id passed!"));
- final List<Object> dataElements =
this.additionalTaggedChildToElementsMap.get(tagAndChild);
- if (dataElements == null) {
- throw new IllegalArgumentException("Wrong destination vertex id
passed!");
+ if (internalAdditionalOutputs.containsKey(dstVertexId)) {
+ for (final OperatorVertex internalVertex :
internalAdditionalOutputs.get(dstVertexId)) {
+ emit(internalVertex, (O) output);
+ }
}
- return dataElements;
- }
- private List<Object> getAdditionalTaggedDataFromTag(final String tag) {
- final Pair<String, String> tagAndChild =
- this.additionalTaggedChildToElementsMap.keySet().stream()
- .filter(key -> key.left().equals(tag))
- .findAny().orElseThrow(() -> new RuntimeException("Wrong tag " + tag +
" passed!"));
- final List<Object> dataElements =
this.additionalTaggedChildToElementsMap.get(tagAndChild);
- if (dataElements == null) {
- throw new IllegalArgumentException("Wrong tag " + tag + " passed!");
+ if (externalAdditionalOutputs.containsKey(dstVertexId)) {
+ for (final OutputWriter externalWriter :
externalAdditionalOutputs.get(dstVertexId)) {
+ emit(externalWriter, (O) output);
+ }
}
- return dataElements;
}
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 9035cbe..4cf789e 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -18,6 +18,7 @@ package org.apache.nemo.runtime.executor.task;
import com.google.common.collect.Lists;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.Readable;
import
org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
import
org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
@@ -44,6 +45,8 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.nemo.runtime.executor.datatransfer.DynOptDataOutputCollector;
+import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +60,6 @@ import javax.annotation.concurrent.NotThreadSafe;
public final class TaskExecutor {
private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutor.class.getName());
private static final int NONE_FINISHED = -1;
- private static final String NULL_KEY = "NULL";
// Essential information
private boolean isExecuted;
@@ -149,38 +151,39 @@ public final class TaskExecutor {
final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
reverseTopologicallySorted.forEach(irVertex -> {
- final List<VertexHarness> children = getChildrenHarnesses(irVertex,
irVertexDag, vertexIdToHarness);
final Optional<Readable> sourceReader = getSourceVertexReader(irVertex,
task.getIrVertexIdToReadable());
if (sourceReader.isPresent() != irVertex instanceof SourceVertex) {
throw new IllegalStateException(irVertex.toString());
}
- // Prepare data WRITE
- // Child-task writes
- final Map<String, String> additionalOutputMap =
- getAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(),
irVertexDag);
-
- final List<Boolean> isToAdditionalTagOutputs = children.stream()
- .map(harness -> harness.getIRVertex().getId())
- .map(additionalOutputMap::containsValue)
- .collect(Collectors.toList());
-
- // Handle writes
- // Main output children task writes
- final List<OutputWriter> mainChildrenTaskWriters =
getMainChildrenTaskWriters(
- irVertex, task.getTaskOutgoingEdges(), dataTransferFactory,
additionalOutputMap);
- final Map<String, OutputWriter> additionalChildrenTaskWriters =
getAdditionalChildrenTaskWriters(
- irVertex, task.getTaskOutgoingEdges(), dataTransferFactory,
additionalOutputMap);
- // Intra-task writes
- final List<String> additionalOutputVertices = new
ArrayList<>(additionalOutputMap.values());
- final Set<String> mainChildren =
- getMainOutputVertices(irVertex, irVertexDag,
task.getTaskOutgoingEdges(), additionalOutputVertices);
- final OutputCollectorImpl oci = new OutputCollectorImpl(mainChildren,
additionalOutputMap);
+ // Additional outputs
+ final Map<String, List<OperatorVertex>> internalAdditionalOutputMap =
+ getInternalAdditionalOutputMap(irVertex, irVertexDag);
+ final Map<String, List<OutputWriter>> externalAdditionalOutputMap =
+ getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(),
dataTransferFactory);
+
+ // Main outputs
+ final List<OperatorVertex> internalMainOutputs =
getInternalMainOutputs(irVertex, irVertexDag);
+ final List<OutputWriter> externalMainOutputs =
+ getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(),
dataTransferFactory);
+
+ final OutputCollector outputCollector;
+
+ if (irVertex instanceof OperatorVertex
+ && ((OperatorVertex) irVertex).getTransform() instanceof
AggregateMetricTransform) {
+ outputCollector = new DynOptDataOutputCollector(
+ irVertex, persistentConnectionToMasterMap, this);
+ } else {
+ outputCollector = new OutputCollectorImpl(
+ irVertex, internalMainOutputs, internalAdditionalOutputMap,
+ externalMainOutputs, externalAdditionalOutputMap);
+ }
// Create VERTEX HARNESS
final VertexHarness vertexHarness = new VertexHarness(
- irVertex, oci, children, isToAdditionalTagOutputs,
mainChildrenTaskWriters, additionalChildrenTaskWriters,
- new TransformContextImpl(broadcastManagerWorker, additionalOutputMap));
+ irVertex, outputCollector, new
TransformContextImpl(broadcastManagerWorker),
+ externalMainOutputs, externalAdditionalOutputMap);
+
prepareTransform(vertexHarness);
vertexIdToHarness.put(irVertex.getId(), vertexHarness);
@@ -190,6 +193,7 @@ public final class TaskExecutor {
// Source vertex read
nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex,
sourceReader.get(), vertexHarness));
}
+
// Parent-task read (broadcasts)
final List<StageEdge> inEdgesForThisVertex = task.getTaskIncomingEdges()
.stream()
@@ -229,15 +233,16 @@ public final class TaskExecutor {
}
/**
- * Recursively process a data element down the DAG dependency.
+ * Process a data element down the DAG dependency.
*
* @param vertexHarness VertexHarness of a vertex to execute.
* @param dataElement input data element to process.
*/
- private void processElementRecursively(final VertexHarness vertexHarness,
final Object dataElement) {
+ private void processElement(final VertexHarness vertexHarness, final Object
dataElement) {
final IRVertex irVertex = vertexHarness.getIRVertex();
- final OutputCollectorImpl outputCollector =
vertexHarness.getOutputCollector();
+ final OutputCollector outputCollector = vertexHarness.getOutputCollector();
+ // TODO #XXX: optimize processElement (do not check instanceof for each
data element)
if (irVertex instanceof SourceVertex) {
outputCollector.emit(dataElement);
} else if (irVertex instanceof OperatorVertex) {
@@ -246,19 +251,6 @@ public final class TaskExecutor {
} else {
throw new UnsupportedOperationException("This type of IRVertex is not
supported");
}
-
- // Given a single input element, a vertex can produce many output elements.
- // Here, we recursively process all of the main output elements.
- outputCollector.iterateMain().forEach(element ->
- handleMainOutputElement(vertexHarness, element)); // Recursion
- outputCollector.clearMain();
-
- // Recursively process all of the additional output elements.
- vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
- outputCollector.iterateTag(tag).forEach(element ->
- handleAdditionalOutputElement(vertexHarness, element, tag)); //
Recursion
- outputCollector.clearTag(tag);
- });
}
/**
@@ -315,85 +307,9 @@ public final class TaskExecutor {
}
}
- /**
- * Send aggregated statistics for dynamic optimization to master.
- * @param dynOptData the statistics to send.
- */
- public void sendDynOptData(final Object dynOptData) {
- Map<Object, Long> aggregatedDynOptData = (Map<Object, Long>) dynOptData;
- final List<ControlMessage.PartitionSizeEntry> partitionSizeEntries = new
ArrayList<>();
- aggregatedDynOptData.forEach((key, size) ->
- partitionSizeEntries.add(
- ControlMessage.PartitionSizeEntry.newBuilder()
- .setKey(key == null ? NULL_KEY : String.valueOf(key))
- .setSize(size)
- .build())
- );
-
-
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
- .send(ControlMessage.Message.newBuilder()
- .setId(RuntimeIdManager.generateMessageId())
- .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
- .setType(ControlMessage.MessageType.DataSizeMetric)
- .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
- .addAllPartitionSize(partitionSizeEntries)
- )
- .build());
- }
-
private void finalizeVertex(final VertexHarness vertexHarness) {
closeTransform(vertexHarness);
-
- final OutputCollectorImpl outputCollector =
vertexHarness.getOutputCollector();
- final IRVertex v = vertexHarness.getIRVertex();
- if (v instanceof OperatorVertex
- && ((OperatorVertex) v).getTransform() instanceof
AggregateMetricTransform) {
- // send aggregated dynamic optimization data to master
- final Object aggregatedDynOptData =
outputCollector.iterateMain().iterator().next();
- sendDynOptData(aggregatedDynOptData);
- // set the id of this vertex to mark the corresponding stage as put on
hold
- setIRVertexPutOnHold(v);
- } else {
- // handle main outputs
- outputCollector.iterateMain().forEach(element -> {
- handleMainOutputElement(vertexHarness, element);
- }); // Recursion
- outputCollector.clearMain();
-
- // handle intra-task additional tagged outputs
- vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
- outputCollector.iterateTag(tag).forEach(
- element -> handleAdditionalOutputElement(vertexHarness, element,
tag)); // Recursion
- outputCollector.clearTag(tag);
- });
-
- // handle inter-task additional tagged outputs
- vertexHarness.getTagToAdditionalChildrenId().keySet().forEach(tag -> {
- outputCollector.iterateTag(tag).forEach(
- element -> handleAdditionalOutputElement(vertexHarness, element,
tag)); // Recursion
- outputCollector.clearTag(tag);
- });
-
- finalizeOutputWriters(vertexHarness);
- }
- }
-
- private void handleMainOutputElement(final VertexHarness harness, final
Object element) {
- // writes to children tasks
- harness.getWritersToMainChildrenTasks().forEach(outputWriter ->
outputWriter.write(element));
- // process elements in the next vertices within a task
- harness.getMainTagChildren().forEach(child ->
processElementRecursively(child, element));
- }
-
- private void handleAdditionalOutputElement(final VertexHarness harness,
final Object element, final String tag) {
- // writes to additional children tasks
- harness.getWritersToAdditionalChildrenTasks().entrySet().stream()
- .filter(kv -> kv.getKey().equals(tag))
- .forEach(kv -> kv.getValue().write(element));
- // process elements in the next vertices within a task
- harness.getAdditionalTagOutputChildren().entrySet().stream()
- .filter(kv -> kv.getKey().equals(tag))
- .forEach(kv -> processElementRecursively(kv.getValue(), element));
+ finalizeOutputWriters(vertexHarness);
}
/**
@@ -429,7 +345,7 @@ public final class TaskExecutor {
}
// Successfully fetched an element
- processElementRecursively(dataFetcher.getChild(), element);
+ processElement(dataFetcher.getChild(), element);
}
// Remove the finished fetcher from the list
@@ -441,76 +357,54 @@ public final class TaskExecutor {
}
////////////////////////////////////////////// Helper methods for setting up
initial data structures
-
- private Map<String, String> getAdditionalOutputMap(final IRVertex irVertex,
- final List<StageEdge>
outEdgesToChildrenTasks,
- final DAG<IRVertex,
RuntimeEdge<IRVertex>> irVertexDag) {
- final Map<String, String> additionalOutputMap = new HashMap<>();
-
- // Add all intra-task additional tags to additional output map.
- irVertexDag.getOutgoingEdgesOf(irVertex.getId())
- .stream()
- .filter(edge ->
edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
- .map(edge ->
-
Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(),
edge.getDst().getId()))
- .forEach(pair -> additionalOutputMap.put(pair.left(), pair.right()));
-
+ private Map<String, List<OutputWriter>> getExternalAdditionalOutputMap(
+ final IRVertex irVertex,
+ final List<StageEdge> outEdgesToChildrenTasks,
+ final DataTransferFactory dataTransferFactory) {
// Add all inter-task additional tags to additional output map.
+ final Map<String, List<OutputWriter>> map = new HashMap<>();
+
outEdgesToChildrenTasks
.stream()
.filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
.filter(edge ->
edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
.map(edge ->
-
Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(),
edge.getDstIRVertex().getId()))
- .forEach(pair -> additionalOutputMap.put(pair.left(), pair.right()));
+ Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(),
+ dataTransferFactory.createWriter(taskId, edge.getDstIRVertex(),
edge)))
+ .forEach(pair -> {
+ map.putIfAbsent(pair.left(), new ArrayList<>());
+ map.get(pair.left()).add(pair.right());
+ });
- return additionalOutputMap;
+ return map;
}
- private Optional<Readable> getSourceVertexReader(final IRVertex irVertex,
- final Map<String, Readable>
irVertexIdToReadable) {
- if (irVertex instanceof SourceVertex) {
- final Readable readable = irVertexIdToReadable.get(irVertex.getId());
- if (readable == null) {
- throw new IllegalStateException(irVertex.toString());
- }
- return Optional.of(readable);
- } else {
- return Optional.empty();
- }
- }
+ private Map<String, List<OperatorVertex>> getInternalAdditionalOutputMap(
+ final IRVertex irVertex,
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+ // Add all intra-task additional tags to additional output map.
+ final Map<String, List<OperatorVertex>> map = new HashMap<>();
- private List<InputReader> getParentTaskReaders(final int taskIndex,
- final List<StageEdge>
inEdgesFromParentTasks,
- final DataTransferFactory
dataTransferFactory) {
- return inEdgesFromParentTasks
+ irVertexDag.getOutgoingEdgesOf(irVertex.getId())
.stream()
- .map(inEdgeForThisVertex -> dataTransferFactory
- .createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(),
inEdgeForThisVertex))
- .collect(Collectors.toList());
+ .filter(edge ->
edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
+ .map(edge ->
+
Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(),
(OperatorVertex) edge.getDst()))
+ .forEach(pair -> {
+ map.putIfAbsent(pair.left(), new ArrayList<>());
+ map.get(pair.left()).add(pair.right());
+ });
+
+ return map;
}
- private Set<String> getMainOutputVertices(final IRVertex irVertex,
- final DAG<IRVertex,
RuntimeEdge<IRVertex>> irVertexDag,
- final List<StageEdge>
outEdgesToChildrenTasks,
- final List<String>
additionalOutputVertices) {
- // all intra-task children vertices id
- final List<String> outputVertices =
irVertexDag.getOutgoingEdgesOf(irVertex).stream()
- .filter(edge -> edge.getSrc().getId().equals(irVertex.getId()))
- .map(edge -> edge.getDst().getId())
+ private List<OperatorVertex> getInternalMainOutputs(final IRVertex irVertex,
+ final DAG<IRVertex,
RuntimeEdge<IRVertex>> irVertexDag) {
+ return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
+ .stream()
+ .filter(edge ->
!edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
+ .map(edge -> (OperatorVertex) edge.getDst())
.collect(Collectors.toList());
-
- // all inter-task children vertices id
- outputVertices
- .addAll(outEdgesToChildrenTasks.stream()
- .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
- .map(edge -> edge.getDstIRVertex().getId())
- .collect(Collectors.toList()));
-
- // return vertices that are not marked as additional tagged outputs
- return new HashSet<>(outputVertices.stream()
- .filter(vertexId -> !additionalOutputVertices.contains(vertexId))
- .collect(Collectors.toList()));
}
/**
@@ -519,66 +413,42 @@ public final class TaskExecutor {
* @param irVertex source irVertex
* @param outEdgesToChildrenTasks outgoing edges to child tasks
* @param dataTransferFactory dataTransferFactory
- * @param taggedOutputs tag to vertex id map
* @return OutputWriters for main children tasks
*/
- private List<OutputWriter> getMainChildrenTaskWriters(final IRVertex
irVertex,
- final List<StageEdge>
outEdgesToChildrenTasks,
- final
DataTransferFactory dataTransferFactory,
- final Map<String,
String> taggedOutputs) {
+ private List<OutputWriter> getExternalMainOutputs(final IRVertex irVertex,
+ final List<StageEdge>
outEdgesToChildrenTasks,
+ final DataTransferFactory
dataTransferFactory) {
return outEdgesToChildrenTasks
.stream()
- .filter(outEdge ->
outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
- .filter(outEdge ->
!taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
+ .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
+ .filter(edge ->
!edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
.map(outEdgeForThisVertex -> dataTransferFactory
.createWriter(taskId, outEdgeForThisVertex.getDstIRVertex(),
outEdgeForThisVertex))
.collect(Collectors.toList());
}
- /**
- * Return inter-task OutputWriters associated with additional output tags.
- *
- * @param irVertex source irVertex
- * @param outEdgesToChildrenTasks outgoing edges to child tasks
- * @param dataTransferFactory dataTransferFactory
- * @param taggedOutputs tag to vertex id map
- * @return additional tag to OutputWriters map.
- */
- private Map<String, OutputWriter> getAdditionalChildrenTaskWriters(final
IRVertex irVertex,
- final
List<StageEdge> outEdgesToChildrenTasks,
- final
DataTransferFactory dataTransferFactory,
- final
Map<String, String> taggedOutputs) {
- final Map<String, OutputWriter> additionalChildrenTaskWriters = new
HashMap<>();
- outEdgesToChildrenTasks
- .stream()
- .filter(outEdge ->
outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
- .filter(outEdge ->
taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
- .forEach(outEdgeForThisVertex -> {
- final String tag = taggedOutputs.entrySet().stream()
- .filter(e ->
e.getValue().equals(outEdgeForThisVertex.getDstIRVertex().getId()))
- .findAny().orElseThrow(() -> new RuntimeException("Unexpected
error while finding tag"))
- .getKey();
- additionalChildrenTaskWriters.put(tag,
- dataTransferFactory.createWriter(taskId,
outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex));
- });
-
- return additionalChildrenTaskWriters;
+ private Optional<Readable> getSourceVertexReader(final IRVertex irVertex,
+ final Map<String, Readable>
irVertexIdToReadable) {
+ if (irVertex instanceof SourceVertex) {
+ final Readable readable = irVertexIdToReadable.get(irVertex.getId());
+ if (readable == null) {
+ throw new IllegalStateException(irVertex.toString());
+ }
+ return Optional.of(readable);
+ } else {
+ return Optional.empty();
+ }
}
- private List<VertexHarness> getChildrenHarnesses(final IRVertex irVertex,
- final DAG<IRVertex,
RuntimeEdge<IRVertex>> irVertexDag,
- final Map<String,
VertexHarness> vertexIdToHarness) {
- final List<VertexHarness> childrenHandlers =
irVertexDag.getChildren(irVertex.getId())
+ private List<InputReader> getParentTaskReaders(final int taskIndex,
+ final List<StageEdge>
inEdgesFromParentTasks,
+ final DataTransferFactory
dataTransferFactory) {
+ return inEdgesFromParentTasks
.stream()
- .map(IRVertex::getId)
- .map(vertexIdToHarness::get)
+ .map(inEdgeForThisVertex -> dataTransferFactory
+ .createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(),
inEdgeForThisVertex))
.collect(Collectors.toList());
- if (childrenHandlers.stream().anyMatch(harness -> harness == null)) {
- // Sanity check: there shouldn't be a null harness.
- throw new IllegalStateException(childrenHandlers.toString());
- }
- return childrenHandlers;
}
////////////////////////////////////////////// Transform-specific helper
methods
@@ -612,7 +482,7 @@ public final class TaskExecutor {
////////////////////////////////////////////// Misc
- private void setIRVertexPutOnHold(final IRVertex irVertex) {
+ public void setIRVertexPutOnHold(final IRVertex irVertex) {
idOfVertexPutOnHold = irVertex.getId();
}
@@ -634,17 +504,20 @@ public final class TaskExecutor {
});
// finalize OutputWriters for additional tagged children
-
vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriter
-> {
- outputWriter.close();
-
- final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
- writtenBytes.ifPresent(writtenBytesList::add);
+
vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriters
-> {
+ outputWriters.forEach(outputWriter -> {
+ outputWriter.close();
+ final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
+ writtenBytes.ifPresent(writtenBytesList::add);
+ });
});
long totalWrittenBytes = 0;
for (final Long writtenBytes : writtenBytesList) {
totalWrittenBytes += writtenBytes;
}
+
+ // TODO #236: Decouple metric collection and sending logic
metricMessageSender.send("TaskMetric", taskId,
"writtenBytes", SerializationUtils.serialize(totalWrittenBytes));
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
index 39cb88d..a6a73d4 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/VertexHarness.java
@@ -15,17 +15,14 @@
*/
package org.apache.nemo.runtime.executor.task;
+import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
import org.apache.nemo.runtime.executor.datatransfer.OutputWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* Captures the relationship between a non-source IRVertex's outputCollector,
and mainTagChildren vertices.
@@ -35,48 +32,20 @@ final class VertexHarness {
// IRVertex and transform-specific information
private final IRVertex irVertex;
- private final OutputCollectorImpl outputCollector;
+ private final OutputCollector outputCollector;
private final Transform.Context context;
- private final List<VertexHarness> mainTagChildren;
-
- // These lists can be empty
- private final Map<String, VertexHarness> additionalTagOutputChildren;
- private final Map<String, String> tagToAdditionalChildrenId;
- private final List<OutputWriter> writersToMainChildrenTasks;
- private final Map<String, OutputWriter> writersToAdditionalChildrenTasks;
+ private final List<OutputWriter> externalOutputWriter;
+ private final Map<String, List<OutputWriter>> externalAdditionalOutputWriter;
VertexHarness(final IRVertex irVertex,
- final OutputCollectorImpl outputCollector,
- final List<VertexHarness> children,
- final List<Boolean> isAdditionalTagOutputs,
- final List<OutputWriter> writersToMainChildrenTasks,
- final Map<String, OutputWriter>
writersToAdditionalChildrenTasks,
- final Transform.Context context) {
+ final OutputCollector outputCollector,
+ final Transform.Context context,
+ final List<OutputWriter> externalOutputWriter,
+ final Map<String, List<OutputWriter>>
externalAdditionalOutputWriter) {
this.irVertex = irVertex;
this.outputCollector = outputCollector;
- if (children.size() != isAdditionalTagOutputs.size()) {
- throw new IllegalStateException(irVertex.toString());
- }
- final Map<String, String> taggedOutputMap =
context.getTagToAdditionalChildren();
- final Map<String, VertexHarness> tagged = new HashMap<>();
-
- // Classify input type for intra-task children
- for (int i = 0; i < children.size(); i++) {
- final VertexHarness child = children.get(i);
- if (isAdditionalTagOutputs.get(i)) {
- taggedOutputMap.entrySet().stream()
- .filter(kv -> child.getIRVertex().getId().equals(kv.getValue()))
- .forEach(kv -> tagged.put(kv.getKey(), child));
- }
- }
-
- this.tagToAdditionalChildrenId = context.getTagToAdditionalChildren();
- this.additionalTagOutputChildren = tagged;
- final List<VertexHarness> mainTagChildrenTmp = new ArrayList<>(children);
- mainTagChildrenTmp.removeAll(additionalTagOutputChildren.values());
- this.mainTagChildren = mainTagChildrenTmp;
- this.writersToMainChildrenTasks = writersToMainChildrenTasks;
- this.writersToAdditionalChildrenTasks = writersToAdditionalChildrenTasks;
+ this.externalOutputWriter = externalOutputWriter;
+ this.externalAdditionalOutputWriter = externalAdditionalOutputWriter;
this.context = context;
}
@@ -88,45 +57,31 @@ final class VertexHarness {
}
/**
- * @return OutputCollector of this irVertex.
+ * @return id of irVertex.
*/
- OutputCollectorImpl getOutputCollector() {
- return outputCollector;
+ String getId() {
+ return irVertex.getId();
}
/**
- * @return mainTagChildren harnesses.
- */
- List<VertexHarness> getMainTagChildren() {
- return mainTagChildren;
- }
-
- /**
- * @return map of tagged output mainTagChildren. (empty if none exists)
- */
- public Map<String, VertexHarness> getAdditionalTagOutputChildren() {
- return additionalTagOutputChildren;
- }
-
- /**
- * @return map of tag to additional children id.
+ * @return OutputCollector of this irVertex.
*/
- public Map<String, String> getTagToAdditionalChildrenId() {
- return tagToAdditionalChildrenId;
+ OutputCollector getOutputCollector() {
+ return outputCollector;
}
/**
* @return OutputWriters for main outputs of this irVertex. (empty if none
exists)
*/
List<OutputWriter> getWritersToMainChildrenTasks() {
- return writersToMainChildrenTasks;
+ return externalOutputWriter;
}
/**
* @return OutputWriters for additional tagged outputs of this irVertex.
(empty if none exists)
*/
- Map<String, OutputWriter> getWritersToAdditionalChildrenTasks() {
- return writersToAdditionalChildrenTasks;
+ Map<String, List<OutputWriter>> getWritersToAdditionalChildrenTasks() {
+ return externalAdditionalOutputWriter;
}
/**
diff --git
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
index 1261569..8ccb633 100644
---
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
+++
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/TransformContextImplTest.java
@@ -24,8 +24,6 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.*;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -39,19 +37,17 @@ import static org.mockito.Mockito.when;
@PrepareForTest({BroadcastManagerWorker.class})
public class TransformContextImplTest {
private Transform.Context context;
- private final Map<String, String> taggedOutputs = new HashMap<>();
@Before
public void setUp() {
final BroadcastManagerWorker broadcastManagerWorker =
mock(BroadcastManagerWorker.class);
when(broadcastManagerWorker.get("a")).thenReturn("b");
- this.context = new TransformContextImpl(broadcastManagerWorker,
taggedOutputs);
+ this.context = new TransformContextImpl(broadcastManagerWorker);
}
@Test
public void testContextImpl() {
assertEquals("b", this.context.getBroadcastVariable("a"));
- assertEquals(this.taggedOutputs,
this.context.getTagToAdditionalChildren());
final String sampleText = "test_text";
diff --git
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 8e62947..9cd2b46 100644
---
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -284,9 +284,13 @@ public final class TaskExecutorTest {
* emit(element) and emit(dstVertexId, element) used together. emit(element)
routes results to main output children,
* and emit(dstVertexId, element) routes results to corresponding additional
output children.
*/
- @Test(timeout = 5000)
+ @Test(timeout=5000)
public void testAdditionalOutputs() throws Exception {
- final IRVertex routerVertex = new OperatorVertex(new RoutingTransform());
+ final String additionalTag1 = "bonus1";
+ final String additionalTag2 = "bonus2";
+
+ final IRVertex routerVertex = new OperatorVertex(
+ new RoutingTransform(Arrays.asList(additionalTag1, additionalTag2)));
final IRVertex mainVertex= new OperatorVertex(new RelayTransform());
final IRVertex bonusVertex1 = new OperatorVertex(new RelayTransform());
final IRVertex bonusVertex2 = new OperatorVertex(new RelayTransform());
@@ -295,8 +299,8 @@ public final class TaskExecutorTest {
final RuntimeEdge<IRVertex> edge2 = createEdge(routerVertex, bonusVertex1,
"edge-2");
final RuntimeEdge<IRVertex> edge3 = createEdge(routerVertex, bonusVertex2,
"edge-3");
-
edge2.getExecutionProperties().put(AdditionalOutputTagProperty.of("bonus1"));
-
edge3.getExecutionProperties().put(AdditionalOutputTagProperty.of("bonus2"));
+
edge2.getExecutionProperties().put(AdditionalOutputTagProperty.of(additionalTag1));
+
edge3.getExecutionProperties().put(AdditionalOutputTagProperty.of(additionalTag2));
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new
DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
.addVertex(routerVertex)
@@ -514,12 +518,15 @@ public final class TaskExecutorTest {
*/
private class RoutingTransform implements Transform<Integer, Integer> {
private OutputCollector<Integer> outputCollector;
- private Map<String, String> tagToVertex;
+ private final Collection<String> additionalTags;
+
+ public RoutingTransform(final Collection<String> additionalTags) {
+ this.additionalTags = additionalTags;
+ }
@Override
public void prepare(final Context context, OutputCollector<Integer>
outputCollector) {
this.outputCollector = outputCollector;
- this.tagToVertex = context.getTagToAdditionalChildren();
}
@Override
@@ -530,7 +537,7 @@ public final class TaskExecutorTest {
outputCollector.emit(i);
} else {
// route to all additional outputs. Invoked if user calls
c.output(tupleTag, element)
- tagToVertex.values().forEach(vertex -> outputCollector.emit(vertex,
i));
+ additionalTags.forEach(tag -> outputCollector.emit(tag, i));
}
}