This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 53522c6 [NEMO-237] Refactor ParentTaskDataFetcher to emit streaming
data and watermark (#138)
53522c6 is described below
commit 53522c6c0faaef5668eb29420ab3ca1fc0b2fa77
Author: John Yang <[email protected]>
AuthorDate: Thu Nov 1 09:32:42 2018 +0900
[NEMO-237] Refactor ParentTaskDataFetcher to emit streaming data and
watermark (#138)
JIRA: [NEMO-237: Refactor ParentTaskDataFetcher to emit streaming data and
watermark](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-237)
**Major changes:**
- Introduces `MultiThreadParentTaskDataFetcher` that does not block, and
consumes N iterables concurrently
- Make 'Pipe' edges use `MultiThreadParentTaskDataFetcher`
**Minor changes to note:**
- Fixs bugs in TaskExecutor polling logic (thanks to @taegeonum)
**Tests for the changes:**
- Streaming tests `WindowedWordCountITCase` use the added
`MultiThreadParentTaskDataFetcher`
**Other comments:**
- Will handle watermarks in a different PR (after the OutputWriters are
fixed)
Closes #138
---
.../task/MultiThreadParentTaskDataFetcher.java | 150 +++++++++++++++++++++
.../executor/task/ParentTaskDataFetcher.java | 1 -
.../nemo/runtime/executor/task/TaskExecutor.java | 38 ++++--
3 files changed, 176 insertions(+), 13 deletions(-)
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
new file mode 100644
index 0000000..a9b0da3
--- /dev/null
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.task;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.runtime.executor.data.DataUtil;
+import org.apache.nemo.runtime.executor.datatransfer.InputReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.*;
+
+/**
+ * Task thread -> fetchDataElement() -> (((QUEUE))) <- List of iterators <-
queueInsertionThreads
+ *
+ * Unlike {@link ParentTaskDataFetcher}, where the task thread directly
consumes (and blocks on) iterators one by one,
+ * this class spawns threads that each forwards elements from an iterator to a
global queue.
+ *
+ * This class should be used when dealing with unbounded data streams, as we
do not want to be blocked on a
+ * single unbounded iterator forever.
+ */
+@NotThreadSafe
+class MultiThreadParentTaskDataFetcher extends DataFetcher {
+ private static final Logger LOG =
LoggerFactory.getLogger(MultiThreadParentTaskDataFetcher.class);
+
+ private final InputReader readersForParentTask;
+ private final ExecutorService queueInsertionThreads;
+
+ // Non-finals (lazy fetching)
+ private boolean firstFetch = true;
+
+ private final ConcurrentLinkedQueue elementQueue;
+
+ private long serBytes = 0;
+ private long encodedBytes = 0;
+
+ private int numOfIterators;
+ private int numOfFinishMarks = 0;
+
+ MultiThreadParentTaskDataFetcher(final IRVertex dataSource,
+ final InputReader readerForParentTask,
+ final OutputCollector outputCollector) {
+ super(dataSource, outputCollector);
+ this.readersForParentTask = readerForParentTask;
+ this.firstFetch = true;
+ this.elementQueue = new ConcurrentLinkedQueue();
+ this.queueInsertionThreads = Executors.newCachedThreadPool();
+ }
+
+ @Override
+ Object fetchDataElement() throws IOException, NoSuchElementException {
+ if (firstFetch) {
+ fetchDataLazily();
+ firstFetch = false;
+ }
+
+ while (true) {
+ final Object element = elementQueue.poll();
+ if (element == null) {
+ throw new NoSuchElementException();
+ } else if (element instanceof Finishmark) {
+ numOfFinishMarks++;
+ if (numOfFinishMarks == numOfIterators) {
+ return Finishmark.getInstance();
+ }
+ // else try again.
+ } else {
+ return element;
+ }
+ }
+ }
+
+ private void fetchDataLazily() {
+ final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures =
readersForParentTask.read();
+ numOfIterators = futures.size();
+
+ futures.forEach(compFuture -> compFuture.whenComplete((iterator,
exception) -> {
+ // A thread for each iterator
+ queueInsertionThreads.submit(() -> {
+ if (exception == null) {
+ // Consume this iterator to the end.
+ while (iterator.hasNext()) { // blocked on the iterator.
+ final Object element = iterator.next();
+ elementQueue.offer(element);
+ }
+
+ // This iterator is finished.
+ countBytesSynchronized(iterator);
+ elementQueue.offer(Finishmark.getInstance());
+ } else {
+ exception.printStackTrace();
+ throw new RuntimeException(exception);
+ }
+ });
+
+ }));
+ }
+
+ final long getSerializedBytes() {
+ return serBytes;
+ }
+
+ final long getEncodedBytes() {
+ return encodedBytes;
+ }
+
+ private synchronized void countBytesSynchronized(final
DataUtil.IteratorWithNumBytes iterator) {
+ try {
+ serBytes += iterator.getNumSerializedBytes();
+ } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException
e) {
+ serBytes = -1;
+ } catch (final IllegalStateException e) {
+ LOG.error("Failed to get the number of bytes of serialized data - the
data is not ready yet ", e);
+ }
+ try {
+ encodedBytes += iterator.getNumEncodedBytes();
+ } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException
e) {
+ encodedBytes = -1;
+ } catch (final IllegalStateException e) {
+ LOG.error("Failed to get the number of bytes of encoded data - the data
is not ready yet ", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ queueInsertionThreads.shutdown();
+ }
+}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index e546ee7..80bbea2 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -69,7 +69,6 @@ class ParentTaskDataFetcher extends DataFetcher {
}
while (true) {
-
// This iterator has the element
if (this.currentIterator.hasNext()) {
return this.currentIterator.next();
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index a2da4bc..1541792 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -229,9 +229,18 @@ public final class TaskExecutor {
nonBroadcastInEdges.removeAll(broadcastInEdges);
final List<InputReader> nonBroadcastReaders =
getParentTaskReaders(taskIndex, nonBroadcastInEdges,
intermediateDataIOFactory);
- nonBroadcastReaders.forEach(parentTaskReader ->
nonBroadcastDataFetcherList.add(
- new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(),
parentTaskReader,
- new DataFetcherOutputCollector((OperatorVertex) irVertex))));
+ nonBroadcastReaders.forEach(parentTaskReader -> {
+ final DataFetcher dataFetcher;
+ if (parentTaskReader instanceof PipeInputReader) {
+ nonBroadcastDataFetcherList.add(
+ new
MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(),
parentTaskReader,
+ new DataFetcherOutputCollector((OperatorVertex) irVertex)));
+ } else {
+ nonBroadcastDataFetcherList.add(
+ new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(),
parentTaskReader,
+ new DataFetcherOutputCollector((OperatorVertex) irVertex)));
+ }
+ });
});
final List<VertexHarness> sortedHarnessList =
irVertexDag.getTopologicalSort()
@@ -317,11 +326,9 @@ public final class TaskExecutor {
* If the element is an instance of Finishmark, we remove the dataFetcher
from the current list.
* @param element element
* @param dataFetcher current data fetcher
- * @param dataFetchers current list
*/
private void handleElement(final Object element,
- final DataFetcher dataFetcher,
- final List<DataFetcher> dataFetchers) {
+ final DataFetcher dataFetcher) {
if (element instanceof Finishmark) {
// We've consumed all the data from this data fetcher.
if (dataFetcher instanceof SourceVertexDataFetcher) {
@@ -329,10 +336,10 @@ public final class TaskExecutor {
} else if (dataFetcher instanceof ParentTaskDataFetcher) {
serializedReadBytes += ((ParentTaskDataFetcher)
dataFetcher).getSerializedBytes();
encodedReadBytes += ((ParentTaskDataFetcher)
dataFetcher).getEncodedBytes();
+ } else if (dataFetcher instanceof MultiThreadParentTaskDataFetcher) {
+ serializedReadBytes += ((MultiThreadParentTaskDataFetcher)
dataFetcher).getSerializedBytes();
+ encodedReadBytes += ((MultiThreadParentTaskDataFetcher)
dataFetcher).getEncodedBytes();
}
-
- // remove current data fetcher from the list
- dataFetchers.remove(dataFetcher);
} else if (element instanceof Watermark) {
// Watermark
processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
@@ -388,7 +395,11 @@ public final class TaskExecutor {
while (availableIterator.hasNext()) {
final DataFetcher dataFetcher = availableIterator.next();
try {
- handleElement(dataFetcher.fetchDataElement(), dataFetcher,
availableFetchers);
+ final Object element = dataFetcher.fetchDataElement();
+ handleElement(element, dataFetcher);
+ if (element instanceof Finishmark) {
+ availableIterator.remove();
+ }
} catch (final NoSuchElementException e) {
// No element in current data fetcher, fetch data from next fetcher
// move current data fetcher to pending.
@@ -412,12 +423,15 @@ public final class TaskExecutor {
final DataFetcher dataFetcher = pendingIterator.next();
try {
- handleElement(dataFetcher.fetchDataElement(), dataFetcher,
pendingFetchers);
+ final Object element = dataFetcher.fetchDataElement();
+ handleElement(element, dataFetcher);
// We processed data. This means the data fetcher is now available.
// Add current data fetcher to available
pendingIterator.remove();
- availableFetchers.add(dataFetcher);
+ if (!(element instanceof Finishmark)) {
+ availableFetchers.add(dataFetcher);
+ }
} catch (final NoSuchElementException e) {
// The current data fetcher is still pending.. try next data fetcher