taegeonum closed pull request #138: [NEMO-237] Refactor ParentTaskDataFetcher 
to emit streaming data and watermark
URL: https://github.com/apache/incubator-nemo/pull/138
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
new file mode 100644
index 000000000..a9b0da3ce
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.task;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.runtime.executor.data.DataUtil;
+import org.apache.nemo.runtime.executor.datatransfer.InputReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.*;
+
+/**
+ * Task thread -> fetchDataElement() -> (((QUEUE))) <- List of iterators <- 
queueInsertionThreads
+ *
+ * Unlike {@link ParentTaskDataFetcher}, where the task thread directly 
consumes (and blocks on) iterators one by one,
+ * this class spawns threads that each forwards elements from an iterator to a 
global queue.
+ *
+ * This class should be used when dealing with unbounded data streams, as we 
do not want to be blocked on a
+ * single unbounded iterator forever.
+ */
+@NotThreadSafe
+class MultiThreadParentTaskDataFetcher extends DataFetcher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultiThreadParentTaskDataFetcher.class);
+
+  private final InputReader readersForParentTask;
+  private final ExecutorService queueInsertionThreads;
+
+  // Non-finals (lazy fetching)
+  private boolean firstFetch = true;
+
+  private final ConcurrentLinkedQueue elementQueue;
+
+  private long serBytes = 0;
+  private long encodedBytes = 0;
+
+  private int numOfIterators;
+  private int numOfFinishMarks = 0;
+
+  MultiThreadParentTaskDataFetcher(final IRVertex dataSource,
+                                   final InputReader readerForParentTask,
+                                   final OutputCollector outputCollector) {
+    super(dataSource, outputCollector);
+    this.readersForParentTask = readerForParentTask;
+    this.firstFetch = true;
+    this.elementQueue = new ConcurrentLinkedQueue();
+    this.queueInsertionThreads = Executors.newCachedThreadPool();
+  }
+
+  @Override
+  Object fetchDataElement() throws IOException, NoSuchElementException {
+    if (firstFetch) {
+      fetchDataLazily();
+      firstFetch = false;
+    }
+
+    while (true) {
+      final Object element = elementQueue.poll();
+      if (element == null) {
+        throw new NoSuchElementException();
+      } else if (element instanceof Finishmark) {
+        numOfFinishMarks++;
+        if (numOfFinishMarks == numOfIterators) {
+          return Finishmark.getInstance();
+        }
+        // else try again.
+      } else {
+        return element;
+      }
+    }
+  }
+
+  private void fetchDataLazily() {
+    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = 
readersForParentTask.read();
+    numOfIterators = futures.size();
+
+    futures.forEach(compFuture -> compFuture.whenComplete((iterator, 
exception) -> {
+      // A thread for each iterator
+      queueInsertionThreads.submit(() -> {
+        if (exception == null) {
+          // Consume this iterator to the end.
+          while (iterator.hasNext()) { // blocked on the iterator.
+            final Object element = iterator.next();
+            elementQueue.offer(element);
+          }
+
+          // This iterator is finished.
+          countBytesSynchronized(iterator);
+          elementQueue.offer(Finishmark.getInstance());
+        } else {
+          exception.printStackTrace();
+          throw new RuntimeException(exception);
+        }
+      });
+
+    }));
+  }
+
+  final long getSerializedBytes() {
+    return serBytes;
+  }
+
+  final long getEncodedBytes() {
+    return encodedBytes;
+  }
+
+  private synchronized void countBytesSynchronized(final 
DataUtil.IteratorWithNumBytes iterator) {
+    try {
+      serBytes += iterator.getNumSerializedBytes();
+    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException 
e) {
+      serBytes = -1;
+    } catch (final IllegalStateException e) {
+      LOG.error("Failed to get the number of bytes of serialized data - the 
data is not ready yet ", e);
+    }
+    try {
+      encodedBytes += iterator.getNumEncodedBytes();
+    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException 
e) {
+      encodedBytes = -1;
+    } catch (final IllegalStateException e) {
+      LOG.error("Failed to get the number of bytes of encoded data - the data 
is not ready yet ", e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    queueInsertionThreads.shutdown();
+  }
+}
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index e546ee7a5..80bbea241 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -69,7 +69,6 @@ Object fetchDataElement() throws IOException {
       }
 
       while (true) {
-
         // This iterator has the element
         if (this.currentIterator.hasNext()) {
           return this.currentIterator.next();
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index a2da4bc00..1541792b9 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -229,9 +229,18 @@ public TaskExecutor(final Task task,
       nonBroadcastInEdges.removeAll(broadcastInEdges);
       final List<InputReader> nonBroadcastReaders =
         getParentTaskReaders(taskIndex, nonBroadcastInEdges, 
intermediateDataIOFactory);
-      nonBroadcastReaders.forEach(parentTaskReader -> 
nonBroadcastDataFetcherList.add(
-        new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), 
parentTaskReader,
-          new DataFetcherOutputCollector((OperatorVertex) irVertex))));
+      nonBroadcastReaders.forEach(parentTaskReader -> {
+        final DataFetcher dataFetcher;
+        if (parentTaskReader instanceof PipeInputReader) {
+          nonBroadcastDataFetcherList.add(
+            new 
MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), 
parentTaskReader,
+              new DataFetcherOutputCollector((OperatorVertex) irVertex)));
+        } else {
+          nonBroadcastDataFetcherList.add(
+            new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), 
parentTaskReader,
+              new DataFetcherOutputCollector((OperatorVertex) irVertex)));
+        }
+      });
     });
 
     final List<VertexHarness> sortedHarnessList = 
irVertexDag.getTopologicalSort()
@@ -317,11 +326,9 @@ private void finalizeVertex(final VertexHarness 
vertexHarness) {
    * If the element is an instance of Finishmark, we remove the dataFetcher 
from the current list.
    * @param element element
    * @param dataFetcher current data fetcher
-   * @param dataFetchers current list
    */
   private void handleElement(final Object element,
-                             final DataFetcher dataFetcher,
-                             final List<DataFetcher> dataFetchers) {
+                             final DataFetcher dataFetcher) {
     if (element instanceof Finishmark) {
       // We've consumed all the data from this data fetcher.
       if (dataFetcher instanceof SourceVertexDataFetcher) {
@@ -329,10 +336,10 @@ private void handleElement(final Object element,
       } else if (dataFetcher instanceof ParentTaskDataFetcher) {
         serializedReadBytes += ((ParentTaskDataFetcher) 
dataFetcher).getSerializedBytes();
         encodedReadBytes += ((ParentTaskDataFetcher) 
dataFetcher).getEncodedBytes();
+      } else if (dataFetcher instanceof MultiThreadParentTaskDataFetcher) {
+        serializedReadBytes += ((MultiThreadParentTaskDataFetcher) 
dataFetcher).getSerializedBytes();
+        encodedReadBytes += ((MultiThreadParentTaskDataFetcher) 
dataFetcher).getEncodedBytes();
       }
-
-      // remove current data fetcher from the list
-      dataFetchers.remove(dataFetcher);
     } else if (element instanceof Watermark) {
       // Watermark
       processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
@@ -388,7 +395,11 @@ private boolean handleDataFetchers(final List<DataFetcher> 
fetchers) {
       while (availableIterator.hasNext()) {
         final DataFetcher dataFetcher = availableIterator.next();
         try {
-          handleElement(dataFetcher.fetchDataElement(), dataFetcher, 
availableFetchers);
+          final Object element = dataFetcher.fetchDataElement();
+          handleElement(element, dataFetcher);
+          if (element instanceof Finishmark) {
+            availableIterator.remove();
+          }
         } catch (final NoSuchElementException e) {
           // No element in current data fetcher, fetch data from next fetcher
           // move current data fetcher to pending.
@@ -412,12 +423,15 @@ private boolean handleDataFetchers(final 
List<DataFetcher> fetchers) {
 
         final DataFetcher dataFetcher = pendingIterator.next();
         try {
-          handleElement(dataFetcher.fetchDataElement(), dataFetcher, 
pendingFetchers);
+          final Object element = dataFetcher.fetchDataElement();
+          handleElement(element, dataFetcher);
 
           // We processed data. This means the data fetcher is now available.
           // Add current data fetcher to available
           pendingIterator.remove();
-          availableFetchers.add(dataFetcher);
+          if (!(element instanceof Finishmark)) {
+            availableFetchers.add(dataFetcher);
+          }
 
         } catch (final NoSuchElementException e) {
           // The current data fetcher is still pending.. try next data fetcher


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to