johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711250
 
 

 ##########
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##########
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
         .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge<Task> 
internalEdge) {
-    final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-    addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+    List<LocalPipe> inputPipes = new ArrayList<>();
+    List<Task> parentTasks = taskGroupDag.getParents(task.getId());
+    final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge<Task> 
internalEdge) {
-    final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-    addOutputWriter(task, outputWriter);
+    if (parentTasks != null) {
+      parentTasks.forEach(parent -> {
+        final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+        inputPipes.add(parentOutputPipe);
+        LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+            getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+      });
+      taskToInputPipesMap.put(task, inputPipes);
+    }
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+    final LocalPipe outputPipe = new LocalPipe();
     final String physicalTaskId = getPhysicalTaskId(task.getId());
-    physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-    physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+    final List<RuntimeEdge<Task>> outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+    outEdges.forEach(outEdge -> {
+      if (outEdge.isSideInput()) {
 
 Review comment:
   Ditto on else{}.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to