This is an automated email from the ASF dual-hosted git repository. jeongyoon pushed a commit to branch 717-TGE in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 82816b13d22bcd3aff759fc93ae8c4eec9c99405 Author: Jeongyoon Eo <[email protected]> AuthorDate: Mon Mar 5 13:21:41 2018 +0900 use id instead of PipeImpl instance for pipeToDstTasksMap --- .../nemo/runtime/executor/TaskGroupExecutor.java | 22 +++++++++++++--------- .../runtime/executor/datatransfer/PipeImpl.java | 10 ++++++++++ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java index 85d58ee..75c743b 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java @@ -68,7 +68,7 @@ public final class TaskGroupExecutor { private final Map<String, List<Task>> srcIteratorIdToTasksMap; private final Map<String, List<Task>> iteratorIdToTasksMap; private final LinkedBlockingQueue<Pair<String, DataUtil.IteratorWithNumBytes>> iteratorQueue; - private volatile Map<PipeImpl, List<Task>> pipeToDstTasksMap; + private volatile Map<String, List<Task>> pipeToDstTasksMap; private final Set<Transform> preparedTransforms; private final Set<String> finishedTaskIds; private final AtomicInteger completedFutures; @@ -360,7 +360,7 @@ public final class TaskGroupExecutor { tasks.forEach(task -> { final List<Task> dstTasks = taskGroupDag.getChildren(task.getId()); PipeImpl pipe = taskToOutputPipeMap.get(task); - pipeToDstTasksMap.putIfAbsent(pipe, dstTasks); + pipeToDstTasksMap.putIfAbsent(pipe.getId(), dstTasks); LOG.info("{} pipeToDstTasksMap: [{}'s OutputPipe, {}]", taskGroupId, getPhysicalTaskId(task.getId()), dstTasks); })); @@ -368,21 +368,21 @@ public final class TaskGroupExecutor { tasks.forEach(task -> { final List<Task> dstTasks = taskGroupDag.getChildren(task.getId()); PipeImpl pipe = taskToOutputPipeMap.get(task); - pipeToDstTasksMap.putIfAbsent(pipe, dstTasks); + pipeToDstTasksMap.putIfAbsent(pipe.getId(), dstTasks); LOG.info("{} pipeToDstTasksMap: [{}'s OutputPipe, {}]", taskGroupId, getPhysicalTaskId(task.getId()), dstTasks); })); } private void updatePipeToDstTasksMap() { - Map<PipeImpl, List<Task>> currentMap = pipeToDstTasksMap; - Map<PipeImpl, List<Task>> updatedMap = new HashMap<>(); + Map<String, List<Task>> currentMap = pipeToDstTasksMap; + Map<String, List<Task>> updatedMap = new HashMap<>(); currentMap.values().forEach(tasks -> tasks.forEach(task -> { final List<Task> dstTasks = taskGroupDag.getChildren(task.getId()); PipeImpl pipe = taskToOutputPipeMap.get(task); - updatedMap.putIfAbsent(pipe, dstTasks); + updatedMap.putIfAbsent(pipe.getId(), dstTasks); LOG.info("{} pipeToDstTasksMap: [{}, {}]", taskGroupId, getPhysicalTaskId(task.getId()), dstTasks); }) @@ -525,7 +525,7 @@ public final class TaskGroupExecutor { final String iteratorId = idToIteratorPair.left(); final DataUtil.IteratorWithNumBytes iterator = idToIteratorPair.right(); List<Task> dstTasks = iteratorIdToTasksMap.get(iteratorId); - idToIteratorPair.right().forEachRemaining(element -> { + iterator.forEachRemaining(element -> { for (final Task task : dstTasks) { List data = Collections.singletonList(element); runTask(task, data); @@ -556,10 +556,14 @@ public final class TaskGroupExecutor { // Intra-TaskGroup data comes from pipes of this TaskGroup's Tasks. initializePipeToDstTasksMap(); while (!finishedAllTasks()) { - pipeToDstTasksMap.forEach((pipe, dstTasks) -> { + pipeToDstTasksMap.forEach((pipeId, dstTasks) -> { + PipeImpl pipe = taskToOutputPipeMap.values().stream() + .filter(p -> p.getId() == pipeId) + .findFirst().get(); + // Get the task that has this pipe as its output pipe Task pipeOwnerTask = taskToOutputPipeMap.entrySet().stream() - .filter(entry -> entry.getValue().equals(pipe)) + .filter(entry -> entry.getValue().getId() == pipeId) .findAny().get().getKey(); LOG.info("{} pipeOwnerTask {}", taskGroupId, getPhysicalTaskId(pipeOwnerTask.getId())); diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java index a104322..fbcc662 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * Pipe implementation that requires synchronization. @@ -31,6 +32,10 @@ import java.util.List; */ public final class PipeImpl<O> implements Pipe<O> { private static final Logger LOG = LoggerFactory.getLogger(PipeImpl.class.getName()); + private static final String PIPEID_PREFIX = "PIPE_"; + private static final AtomicInteger PIPEID_GENERATOR = new AtomicInteger(0); + + private final String id; private final ArrayDeque<O> outputQueue; private RuntimeEdge sideInputRuntimeEdge; private List<String> sideInputReceivers; @@ -39,6 +44,7 @@ public final class PipeImpl<O> implements Pipe<O> { * Constructor of a new Pipe. */ public PipeImpl() { + this.id = PIPEID_PREFIX + PIPEID_GENERATOR.getAndIncrement(); this.outputQueue = new ArrayDeque<>(); this.sideInputRuntimeEdge = null; this.sideInputReceivers = new ArrayList<>(); @@ -86,4 +92,8 @@ public final class PipeImpl<O> implements Pipe<O> { public boolean hasSideInputFor(final String physicalTaskId) { return sideInputReceivers.contains(physicalTaskId); } + + public String getId() { + return id; + } } -- To stop receiving notification emails like this one, please contact [email protected].
