This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch 717-TGE
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 82816b13d22bcd3aff759fc93ae8c4eec9c99405
Author: Jeongyoon Eo <[email protected]>
AuthorDate: Mon Mar 5 13:21:41 2018 +0900

    use id instead of PipeImpl instance for pipeToDstTasksMap
---
 .../nemo/runtime/executor/TaskGroupExecutor.java   | 22 +++++++++++++---------
 .../runtime/executor/datatransfer/PipeImpl.java    | 10 ++++++++++
 2 files changed, 23 insertions(+), 9 deletions(-)

diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 85d58ee..75c743b 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@ -68,7 +68,7 @@ public final class TaskGroupExecutor {
   private final Map<String, List<Task>> srcIteratorIdToTasksMap;
   private final Map<String, List<Task>> iteratorIdToTasksMap;
   private final LinkedBlockingQueue<Pair<String, 
DataUtil.IteratorWithNumBytes>> iteratorQueue;
-  private volatile Map<PipeImpl, List<Task>> pipeToDstTasksMap;
+  private volatile Map<String, List<Task>> pipeToDstTasksMap;
   private final Set<Transform> preparedTransforms;
   private final Set<String> finishedTaskIds;
   private final AtomicInteger completedFutures;
@@ -360,7 +360,7 @@ public final class TaskGroupExecutor {
         tasks.forEach(task -> {
           final List<Task> dstTasks = taskGroupDag.getChildren(task.getId());
           PipeImpl pipe = taskToOutputPipeMap.get(task);
-          pipeToDstTasksMap.putIfAbsent(pipe, dstTasks);
+          pipeToDstTasksMap.putIfAbsent(pipe.getId(), dstTasks);
           LOG.info("{} pipeToDstTasksMap: [{}'s OutputPipe, {}]",
               taskGroupId, getPhysicalTaskId(task.getId()), dstTasks);
         }));
@@ -368,21 +368,21 @@ public final class TaskGroupExecutor {
         tasks.forEach(task -> {
           final List<Task> dstTasks = taskGroupDag.getChildren(task.getId());
           PipeImpl pipe = taskToOutputPipeMap.get(task);
-          pipeToDstTasksMap.putIfAbsent(pipe, dstTasks);
+          pipeToDstTasksMap.putIfAbsent(pipe.getId(), dstTasks);
           LOG.info("{} pipeToDstTasksMap: [{}'s OutputPipe, {}]",
               taskGroupId, getPhysicalTaskId(task.getId()), dstTasks);
         }));
   }
 
   private void updatePipeToDstTasksMap() {
-    Map<PipeImpl, List<Task>> currentMap = pipeToDstTasksMap;
-    Map<PipeImpl, List<Task>> updatedMap = new HashMap<>();
+    Map<String, List<Task>> currentMap = pipeToDstTasksMap;
+    Map<String, List<Task>> updatedMap = new HashMap<>();
 
     currentMap.values().forEach(tasks ->
         tasks.forEach(task -> {
           final List<Task> dstTasks = taskGroupDag.getChildren(task.getId());
           PipeImpl pipe = taskToOutputPipeMap.get(task);
-          updatedMap.putIfAbsent(pipe, dstTasks);
+          updatedMap.putIfAbsent(pipe.getId(), dstTasks);
           LOG.info("{} pipeToDstTasksMap: [{}, {}]",
               taskGroupId, getPhysicalTaskId(task.getId()), dstTasks);
         })
@@ -525,7 +525,7 @@ public final class TaskGroupExecutor {
         final String iteratorId = idToIteratorPair.left();
         final DataUtil.IteratorWithNumBytes iterator = 
idToIteratorPair.right();
         List<Task> dstTasks = iteratorIdToTasksMap.get(iteratorId);
-        idToIteratorPair.right().forEachRemaining(element -> {
+        iterator.forEachRemaining(element -> {
           for (final Task task : dstTasks) {
             List data = Collections.singletonList(element);
             runTask(task, data);
@@ -556,10 +556,14 @@ public final class TaskGroupExecutor {
       // Intra-TaskGroup data comes from pipes of this TaskGroup's Tasks.
       initializePipeToDstTasksMap();
       while (!finishedAllTasks()) {
-        pipeToDstTasksMap.forEach((pipe, dstTasks) -> {
+        pipeToDstTasksMap.forEach((pipeId, dstTasks) -> {
+          PipeImpl pipe = taskToOutputPipeMap.values().stream()
+              .filter(p -> p.getId() == pipeId)
+              .findFirst().get();
+
           // Get the task that has this pipe as its output pipe
           Task pipeOwnerTask = taskToOutputPipeMap.entrySet().stream()
-              .filter(entry -> entry.getValue().equals(pipe))
+              .filter(entry -> entry.getValue().getId() == pipeId)
               .findAny().get().getKey();
           LOG.info("{} pipeOwnerTask {}", taskGroupId, 
getPhysicalTaskId(pipeOwnerTask.getId()));
 
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java
index a104322..fbcc662 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/PipeImpl.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Pipe implementation that requires synchronization.
@@ -31,6 +32,10 @@ import java.util.List;
  */
 public final class PipeImpl<O> implements Pipe<O> {
   private static final Logger LOG = 
LoggerFactory.getLogger(PipeImpl.class.getName());
+  private static final String PIPEID_PREFIX = "PIPE_";
+  private static final AtomicInteger PIPEID_GENERATOR = new AtomicInteger(0);
+
+  private final String id;
   private final ArrayDeque<O> outputQueue;
   private RuntimeEdge sideInputRuntimeEdge;
   private List<String> sideInputReceivers;
@@ -39,6 +44,7 @@ public final class PipeImpl<O> implements Pipe<O> {
    * Constructor of a new Pipe.
    */
   public PipeImpl() {
+    this.id = PIPEID_PREFIX + PIPEID_GENERATOR.getAndIncrement();
     this.outputQueue = new ArrayDeque<>();
     this.sideInputRuntimeEdge = null;
     this.sideInputReceivers = new ArrayList<>();
@@ -86,4 +92,8 @@ public final class PipeImpl<O> implements Pipe<O> {
   public boolean hasSideInputFor(final String physicalTaskId) {
     return sideInputReceivers.contains(physicalTaskId);
   }
+
+  public String getId() {
+    return id;
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to