johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711209
 
 

 ##########
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##########
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
         .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge<Task> 
internalEdge) {
-    final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-    addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+    List<LocalPipe> inputPipes = new ArrayList<>();
+    List<Task> parentTasks = taskGroupDag.getParents(task.getId());
+    final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge<Task> 
internalEdge) {
-    final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-    addOutputWriter(task, outputWriter);
+    if (parentTasks != null) {
+      parentTasks.forEach(parent -> {
+        final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+        inputPipes.add(parentOutputPipe);
+        LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+            getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+      });
+      taskToInputPipesMap.put(task, inputPipes);
+    }
 
 Review comment:
   add else {} and add a comment in it to explain why we're not handling that 
case..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to