This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 970751a [NEMO-233] Emit watermark at unbounded source (#130)
970751a is described below
commit 970751acfb4f9ca357310db6d12f1ccd6841cc43
Author: Taegeon Um <[email protected]>
AuthorDate: Mon Oct 29 20:54:20 2018 +0900
[NEMO-233] Emit watermark at unbounded source (#130)
JIRA: [NEMO-233: Emit watermark at unbounded
source](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-233)
**Major changes:**
- Change `Readable` interface to retrieve watermark. `Readable` does not
return `Iterable`, which can block data fetching. Instead, `TaskExecutor`
checks whether the `Readable` is finished or not, and retrieve data or
watermark if it is not finished.
- Add polling logic to `TaskExecutor`
**Minor changes to note:**
-
**Tests for the changes:**
- Test unbounded source readable
**Other comments:**
-
Closes #130
---
...{Readable.java => BoundedIteratorReadable.java} | 46 +++---
.../java/org/apache/nemo/common/ir/Readable.java | 38 ++++-
.../nemo/common/ir/vertex/CachedSourceVertex.java | 40 +++++-
.../common/ir/vertex/InMemorySourceVertex.java | 25 +++-
.../apache/nemo/common/ir/vertex/SourceVertex.java | 2 +
.../apache/nemo/common/punctuation/Finishmark.java | 35 +++++
.../Readable.java => punctuation/Watermark.java} | 32 ++---
.../apache/nemo/common/test/EmptyComponents.java | 34 ++++-
.../compiler/frontend/beam/PipelineTranslator.java | 3 +-
.../beam/source/BeamBoundedSourceVertex.java | 73 +++++++---
.../beam/source/BeamUnboundedSourceVertex.java | 128 ++++++++---------
.../source/SparkDatasetBoundedSourceVertex.java | 29 +++-
.../source/SparkTextFileBoundedSourceVertex.java | 34 +++--
.../nemo/runtime/executor/task/DataFetcher.java | 5 +-
.../executor/task/ParentTaskDataFetcher.java | 10 +-
.../executor/task/SourceVertexDataFetcher.java | 81 ++++++++---
.../nemo/runtime/executor/task/TaskExecutor.java | 158 +++++++++++++++++----
.../executor/task/ParentTaskDataFetcherTest.java | 7 +-
.../runtime/executor/task/TaskExecutorTest.java | 134 ++++++++++++++++-
19 files changed, 700 insertions(+), 214 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
similarity index 54%
copy from common/src/main/java/org/apache/nemo/common/ir/Readable.java
copy to
common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
index 2955ed3..4d15755 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
@@ -18,30 +18,42 @@
*/
package org.apache.nemo.common.ir;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
+import java.util.Iterator;
/**
- * Interface for readable.
+ * An abstract readable class that retrieves data from iterator.
* @param <O> output type.
*/
-public interface Readable<O> extends Serializable {
+public abstract class BoundedIteratorReadable<O> implements Readable<O> {
+
+ private Iterator<O> iterator;
+
/**
- * Method to read data from the source.
- *
- * @return an {@link Iterable} of the data read by the readable.
- * @throws IOException exception while reading data.
+ * Initialize iterator.
+ * @return iterator
*/
- Iterable<O> read() throws IOException;
+ protected abstract Iterator<O> initializeIterator();
/**
- * Returns the list of locations where this readable resides.
- * Each location has a complete copy of the readable.
- *
- * @return List of locations where this readable resides
- * @throws UnsupportedOperationException when this operation is not supported
- * @throws Exception any other exceptions on the way
+ * Prepare reading data.
*/
- List<String> getLocations() throws Exception;
+ @Override
+ public final void prepare() {
+ iterator = initializeIterator();
+ }
+
+ @Override
+ public final O readCurrent() {
+ return iterator.next();
+ }
+
+ @Override
+ public final void advance() {
+ // do nothing
+ }
+
+ @Override
+ public final boolean isFinished() {
+ return !iterator.hasNext();
+ }
}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
index 2955ed3..e0f7b7c 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
@@ -21,19 +21,44 @@ package org.apache.nemo.common.ir;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import java.util.NoSuchElementException;
/**
* Interface for readable.
* @param <O> output type.
*/
public interface Readable<O> extends Serializable {
+
+ /**
+ * Prepare reading data.
+ */
+ void prepare();
+
/**
- * Method to read data from the source.
+ * Method to read current data from the source.
+ * The caller should check whether the Readable is finished or not by using
isFinished() method
+ * before calling this method.
*
- * @return an {@link Iterable} of the data read by the readable.
- * @throws IOException exception while reading data.
+ * It can throw NoSuchElementException although it is not finished in
Unbounded source.
+ * @return a data read by the readable.
+ */
+ O readCurrent() throws NoSuchElementException;
+
+ /**
+ * Advance current data point.
+ */
+ void advance() throws IOException;
+
+ /**
+ * Read watermark.
+ * @return watermark
+ */
+ long readWatermark();
+
+ /**
+ * @return true if it reads all data.
*/
- Iterable<O> read() throws IOException;
+ boolean isFinished();
/**
* Returns the list of locations where this readable resides.
@@ -44,4 +69,9 @@ public interface Readable<O> extends Serializable {
* @throws Exception any other exceptions on the way
*/
List<String> getLocations() throws Exception;
+
+ /**
+ * Close.
+ */
+ void close() throws IOException;
}
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
index fe42ed5..6e909cf 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
@@ -22,7 +22,6 @@ import org.apache.nemo.common.ir.Readable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
/**
@@ -62,6 +61,12 @@ public final class CachedSourceVertex<T> extends
SourceVertex<T> {
}
@Override
+ public boolean isBounded() {
+ // It supports only bounded source.
+ return true;
+ }
+
+ @Override
public List<Readable<T>> getReadables(final int desiredNumOfSplits) {
// Ignore the desired number of splits.
return readables;
@@ -77,7 +82,6 @@ public final class CachedSourceVertex<T> extends
SourceVertex<T> {
* It does not contain any actual data but the data will be sent from the
cached store through external input reader.
*/
private final class CachedReadable implements Readable<T> {
-
/**
* Constructor.
*/
@@ -86,13 +90,41 @@ public final class CachedSourceVertex<T> extends
SourceVertex<T> {
}
@Override
- public Iterable<T> read() throws IOException {
- return Collections.emptyList();
+ public void prepare() {
+
+ }
+
+ @Override
+ public T readCurrent() {
+ throw new UnsupportedOperationException(
+ "CachedSourceVertex should not be used");
+ }
+
+ @Override
+ public void advance() throws IOException {
+ throw new UnsupportedOperationException(
+ "CachedSourceVertex should not be used");
+ }
+
+ @Override
+ public long readWatermark() {
+ throw new UnsupportedOperationException(
+ "CachedSourceVertex should not be used");
+ }
+
+ @Override
+ public boolean isFinished() {
+ return true;
}
@Override
public List<String> getLocations() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void close() throws IOException {
+
+ }
}
}
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
index 6cf2009..3278963 100644
---
a/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
+++
b/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
@@ -18,8 +18,10 @@
*/
package org.apache.nemo.common.ir.vertex;
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
import org.apache.nemo.common.ir.Readable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -57,6 +59,11 @@ public final class InMemorySourceVertex<T> extends
SourceVertex<T> {
}
@Override
+ public boolean isBounded() {
+ return true;
+ }
+
+ @Override
public List<Readable<T>> getReadables(final int desiredNumOfSplits) throws
Exception {
final List<Readable<T>> readables = new ArrayList<>();
@@ -88,7 +95,8 @@ public final class InMemorySourceVertex<T> extends
SourceVertex<T> {
* Simply returns the in-memory data.
* @param <T> type of the data.
*/
- private static final class InMemorySourceReadable<T> implements Readable<T> {
+ private static final class InMemorySourceReadable<T> extends
BoundedIteratorReadable<T> {
+
private final Iterable<T> initializedSourceData;
/**
@@ -96,17 +104,28 @@ public final class InMemorySourceVertex<T> extends
SourceVertex<T> {
* @param initializedSourceData the source data.
*/
private InMemorySourceReadable(final Iterable<T> initializedSourceData) {
+ super();
this.initializedSourceData = initializedSourceData;
}
@Override
- public Iterable<T> read() {
- return this.initializedSourceData;
+ protected Iterator<T> initializeIterator() {
+ return initializedSourceData.iterator();
+ }
+
+ @Override
+ public long readWatermark() {
+ throw new UnsupportedOperationException("No watermark");
}
@Override
public List<String> getLocations() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void close() throws IOException {
+
+ }
}
}
diff --git
a/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java
b/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java
index 6a86064..7892490 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java
@@ -36,6 +36,8 @@ public abstract class SourceVertex<O> extends IRVertex {
super();
}
+ public abstract boolean isBounded();
+
/**
* Copy Constructor for SourceVertex.
*
diff --git
a/common/src/main/java/org/apache/nemo/common/punctuation/Finishmark.java
b/common/src/main/java/org/apache/nemo/common/punctuation/Finishmark.java
new file mode 100644
index 0000000..e801c25
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Finishmark.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+/**
+ * Finish mark that notifies the data fetching is finished.
+ * This is only used for bounded source because unbounded source does not
finish.
+ */
+public final class Finishmark {
+ private static final Finishmark INSTANCE = new Finishmark();
+
+ private Finishmark() {
+
+ }
+
+ public static Finishmark getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
similarity index 51%
copy from common/src/main/java/org/apache/nemo/common/ir/Readable.java
copy to common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index 2955ed3..4f24a80 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -16,32 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.common.ir;
+package org.apache.nemo.common.punctuation;
-import java.io.IOException;
import java.io.Serializable;
-import java.util.List;
/**
- * Interface for readable.
- * @param <O> output type.
+ * Watermark event.
*/
-public interface Readable<O> extends Serializable {
- /**
- * Method to read data from the source.
- *
- * @return an {@link Iterable} of the data read by the readable.
- * @throws IOException exception while reading data.
- */
- Iterable<O> read() throws IOException;
+public final class Watermark implements Serializable {
+ private final long timestamp;
+ public Watermark(final long timestamp) {
+ this.timestamp = timestamp;
+ }
- /**
- * Returns the list of locations where this readable resides.
- * Each location has a complete copy of the readable.
- *
- * @return List of locations where this readable resides
- * @throws UnsupportedOperationException when this operation is not supported
- * @throws Exception any other exceptions on the way
- */
- List<String> getLocations() throws Exception;
+ public long getTimestamp() {
+ return timestamp;
+ }
}
diff --git
a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 1ce2637..9c95bed 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -36,6 +36,7 @@ import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.beam.sdk.values.KV;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -205,6 +206,11 @@ public final class EmptyComponents {
}
@Override
+ public boolean isBounded() {
+ return true;
+ }
+
+ @Override
public List<Readable<T>> getReadables(final int desirednumOfSplits) {
final List list = new ArrayList(desirednumOfSplits);
for (int i = 0; i < desirednumOfSplits; i++) {
@@ -230,13 +236,37 @@ public final class EmptyComponents {
*/
static final class EmptyReadable<T> implements Readable<T> {
@Override
- public Iterable<T> read() {
- return new ArrayList<>();
+ public void prepare() {
+
+ }
+
+ @Override
+ public T readCurrent() {
+ return null;
+ }
+
+ @Override
+ public void advance() {
+ }
+
+ @Override
+ public long readWatermark() {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return true;
}
@Override
public List<String> getLocations() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void close() throws IOException {
+
+ }
}
}
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 89be5ba..58a6d4a 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -24,6 +24,7 @@ import
org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
@@ -225,7 +226,7 @@ public final class PipelineTranslator
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
final TupleTag mainOutputTag = new TupleTag<>();
- if (mainInput.getWindowingStrategy() == WindowingStrategy.globalDefault())
{
+ if (mainInput.getWindowingStrategy().getWindowFn() instanceof
GlobalWindows) {
return new GroupByKeyTransform();
} else {
return new GroupByKeyAndWindowDoFnTransform(
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 2602e72..880a3a0 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.function.Function;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.beam.sdk.io.BoundedSource;
@@ -72,6 +73,11 @@ public final class BeamBoundedSourceVertex<O> extends
SourceVertex<WindowedValue
}
@Override
+ public boolean isBounded() {
+ return true;
+ }
+
+ @Override
public List<Readable<WindowedValue<O>>> getReadables(final int
desiredNumOfSplits) throws Exception {
final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
LOG.info("estimate: {}", source.getEstimatedSizeBytes(null));
@@ -99,6 +105,9 @@ public final class BeamBoundedSourceVertex<O> extends
SourceVertex<WindowedValue
*/
private static final class BoundedSourceReadable<T> implements
Readable<WindowedValue<T>> {
private final BoundedSource<T> boundedSource;
+ private boolean finished = false;
+ private BoundedSource.BoundedReader<T> reader;
+ private Function<T, WindowedValue<T>> windowedValueConverter;
/**
* Constructor of the BoundedSourceReadable.
@@ -109,32 +118,48 @@ public final class BeamBoundedSourceVertex<O> extends
SourceVertex<WindowedValue
}
@Override
- public Iterable<WindowedValue<T>> read() throws IOException {
- boolean started = false;
- boolean windowed = false;
-
- final ArrayList<WindowedValue<T>> elements = new ArrayList<>();
- try (BoundedSource.BoundedReader<T> reader =
boundedSource.createReader(null)) {
- for (boolean available = reader.start(); available; available =
reader.advance()) {
- final T elem = reader.getCurrent();
-
- // Check whether the element is windowed or not
- // We only have to check the first element.
- if (!started) {
- started = true;
- if (elem instanceof WindowedValue) {
- windowed = true;
- }
- }
+ public void prepare() {
+ try {
+ reader = boundedSource.createReader(null);
+ finished = !reader.start();
+
+ if (!finished) {
+ T elem = reader.getCurrent();
- if (!windowed) {
-
elements.add(WindowedValue.valueInGlobalWindow(reader.getCurrent()));
+ if (elem instanceof WindowedValue) {
+ windowedValueConverter = val -> (WindowedValue) val;
} else {
- elements.add((WindowedValue<T>) elem);
+ windowedValueConverter = WindowedValue::valueInGlobalWindow;
}
}
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
}
- return elements;
+ }
+
+ @Override
+ public WindowedValue<T> readCurrent() {
+ if (finished) {
+ throw new IllegalStateException("Bounded reader read all elements");
+ }
+
+ final T elem = reader.getCurrent();
+ return windowedValueConverter.apply(elem);
+ }
+
+ @Override
+ public void advance() throws IOException {
+ finished = !reader.advance();
+ }
+
+ @Override
+ public long readWatermark() {
+ throw new UnsupportedOperationException("No watermark");
+ }
+
+ @Override
+ public boolean isFinished() {
+ return finished;
}
@Override
@@ -149,5 +174,11 @@ public final class BeamBoundedSourceVertex<O> extends
SourceVertex<WindowedValue
throw new UnsupportedOperationException();
}
}
+
+ @Override
+ public void close() throws IOException {
+ finished = true;
+ reader.close();
+ }
}
}
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index 8ce26fe..97adc5b 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -29,8 +29,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
/**
* SourceVertex implementation for UnboundedSource.
@@ -38,14 +39,12 @@ import java.util.List;
* @param <M> checkpoint mark type.
*/
public final class BeamUnboundedSourceVertex<O, M extends
UnboundedSource.CheckpointMark> extends
- SourceVertex<WindowedValue<O>> {
+ SourceVertex<Object> {
private static final Logger LOG =
LoggerFactory.getLogger(BeamUnboundedSourceVertex.class.getName());
private UnboundedSource<O, M> source;
private final String sourceDescription;
- private static final long POLLING_INTERVAL = 10L;
-
/**
* The default constructor for beam unbounded source.
* @param source unbounded source.
@@ -68,8 +67,13 @@ public final class BeamUnboundedSourceVertex<O, M extends
UnboundedSource.Checkp
}
@Override
- public List<Readable<WindowedValue<O>>> getReadables(final int
desiredNumOfSplits) throws Exception {
- final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
+ public boolean isBounded() {
+ return false;
+ }
+
+ @Override
+ public List<Readable<Object>> getReadables(final int desiredNumOfSplits)
throws Exception {
+ final List<Readable<Object>> readables = new ArrayList<>();
source.split(desiredNumOfSplits, null)
.forEach(unboundedSource -> readables.add(new
UnboundedSourceReadable<>(unboundedSource)));
return readables;
@@ -93,90 +97,80 @@ public final class BeamUnboundedSourceVertex<O, M extends
UnboundedSource.Checkp
* @param <M> checkpoint mark type.
*/
private static final class UnboundedSourceReadable<O, M extends
UnboundedSource.CheckpointMark>
- implements Readable<WindowedValue<O>> {
+ implements Readable<Object> {
private final UnboundedSource<O, M> unboundedSource;
+ private UnboundedSource.UnboundedReader<O> reader;
+ private Function<O, WindowedValue<O>> windowedValueConverter;
+ private boolean finished = false;
UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
this.unboundedSource = unboundedSource;
}
@Override
- public Iterable<WindowedValue<O>> read() throws IOException {
- return new UnboundedSourceIterable<>(unboundedSource);
- }
+ public void prepare() {
+ try {
+ reader = unboundedSource.createReader(null, null);
+ reader.start();
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
- @Override
- public List<String> getLocations() throws Exception {
- return new ArrayList<>();
+ // get first element
+ final O firstElement = retrieveFirstElement();
+ if (firstElement instanceof WindowedValue) {
+ windowedValueConverter = val -> (WindowedValue) val;
+ } else {
+ windowedValueConverter = WindowedValue::valueInGlobalWindow;
+ }
}
- }
-
- /**
- * The iterable class for unbounded sources.
- * @param <O> output type.
- * @param <M> checkpoint mark type.
- */
- private static final class UnboundedSourceIterable<O, M extends
UnboundedSource.CheckpointMark>
- implements Iterable<WindowedValue<O>> {
- private UnboundedSourceIterator<O, M> iterator;
-
- UnboundedSourceIterable(final UnboundedSource<O, M> unboundedSource)
throws IOException {
- this.iterator = new UnboundedSourceIterator<>(unboundedSource);
+ private O retrieveFirstElement() {
+ while (true) {
+ try {
+ return reader.getCurrent();
+ } catch (final NoSuchElementException e) {
+ // the first element is not currently available... retry
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
@Override
- public Iterator<WindowedValue<O>> iterator() {
- return iterator;
+ public Object readCurrent() {
+ final O elem = reader.getCurrent();
+ return windowedValueConverter.apply(elem);
}
- }
- /**
- * The iterator for unbounded sources.
- * @param <O> output type.
- * @param <M> checkpoint mark type.
- */
- // TODO #233: Emit watermark at unbounded source
- private static final class UnboundedSourceIterator<O, M extends
UnboundedSource.CheckpointMark>
- implements Iterator<WindowedValue<O>> {
+ @Override
+ public void advance() throws IOException {
+ reader.advance();
+ }
- private final UnboundedSource.UnboundedReader<O> unboundedReader;
- private boolean available;
+ @Override
+ public long readWatermark() {
+ return reader.getWatermark().getMillis();
+ }
- UnboundedSourceIterator(final UnboundedSource<O, M> unboundedSource)
throws IOException {
- this.unboundedReader = unboundedSource.createReader(null, null);
- available = unboundedReader.start();
+ @Override
+ public boolean isFinished() {
+ return finished;
}
@Override
- public boolean hasNext() {
- // Unbounded source always has next element until it finishes.
- return true;
+ public List<String> getLocations() throws Exception {
+ return new ArrayList<>();
}
@Override
- @SuppressWarnings("unchecked")
- public WindowedValue<O> next() {
- try {
- while (true) {
- if (!available) {
- Thread.sleep(POLLING_INTERVAL);
- } else {
- final O element = unboundedReader.getCurrent();
- final boolean windowed = element instanceof WindowedValue;
- if (!windowed) {
- return WindowedValue.valueInGlobalWindow(element);
- } else {
- return (WindowedValue<O>) element;
- }
- }
- available = unboundedReader.advance();
- }
- } catch (final InterruptedException | IOException e) {
- LOG.error("Exception occurred while waiting for the events...");
- e.printStackTrace();
- throw new RuntimeException(e);
- }
+ public void close() throws IOException {
+ finished = true;
+ reader.close();
}
}
}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
index 18eefb4..31502be 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.compiler.frontend.spark.source;
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.compiler.frontend.spark.sql.Dataset;
@@ -74,6 +75,11 @@ public final class SparkDatasetBoundedSourceVertex<T>
extends SourceVertex<T> {
}
@Override
+ public boolean isBounded() {
+ return true;
+ }
+
+ @Override
public List<Readable<T>> getReadables(final int desiredNumOfSplits) {
return readables;
}
@@ -86,7 +92,7 @@ public final class SparkDatasetBoundedSourceVertex<T> extends
SourceVertex<T> {
/**
* A Readable wrapper for Spark Dataset.
*/
- private final class SparkDatasetBoundedSourceReadable implements Readable<T>
{
+ private final class SparkDatasetBoundedSourceReadable extends
BoundedIteratorReadable<T> {
private final LinkedHashMap<String, Object[]> commands;
private final Map<String, String> sessionInitialConf;
private final int partitionIndex;
@@ -111,11 +117,11 @@ public final class SparkDatasetBoundedSourceVertex<T>
extends SourceVertex<T> {
}
@Override
- public Iterable<T> read() throws IOException {
+ protected Iterator<T> initializeIterator() {
// for setting up the same environment in the executors.
final SparkSession spark = SparkSession.builder()
- .config(sessionInitialConf)
- .getOrCreate();
+ .config(sessionInitialConf)
+ .getOrCreate();
final Dataset<T> dataset;
try {
@@ -126,8 +132,14 @@ public final class SparkDatasetBoundedSourceVertex<T>
extends SourceVertex<T> {
// Spark does lazy evaluation: it doesn't load the full dataset, but
only the partition it is asked for.
final RDD<T> rdd = dataset.sparkRDD();
- return () -> JavaConverters.asJavaIteratorConverter(
- rdd.iterator(rdd.getPartitions()[partitionIndex],
TaskContext$.MODULE$.empty())).asJava();
+ final Iterable<T> iterable = () ->
JavaConverters.asJavaIteratorConverter(
+ rdd.iterator(rdd.getPartitions()[partitionIndex],
TaskContext$.MODULE$.empty())).asJava();
+ return iterable.iterator();
+ }
+
+ @Override
+ public long readWatermark() {
+ throw new UnsupportedOperationException("No watermark");
}
@Override
@@ -138,5 +150,10 @@ public final class SparkDatasetBoundedSourceVertex<T>
extends SourceVertex<T> {
return locations;
}
}
+
+ @Override
+ public void close() throws IOException {
+
+ }
}
}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
index ddfa85f..f0294b1 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.compiler.frontend.spark.source;
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.spark.*;
@@ -73,6 +74,11 @@ public final class SparkTextFileBoundedSourceVertex extends
SourceVertex<String>
}
@Override
+ public boolean isBounded() {
+ return true;
+ }
+
+ @Override
public List<Readable<String>> getReadables(final int desiredNumOfSplits) {
return readables;
}
@@ -85,7 +91,7 @@ public final class SparkTextFileBoundedSourceVertex extends
SourceVertex<String>
/**
* A Readable wrapper for Spark text file.
*/
- private final class SparkTextFileBoundedSourceReadable implements
Readable<String> {
+ private final class SparkTextFileBoundedSourceReadable extends
BoundedIteratorReadable<String> {
private final SparkConf sparkConf;
private final int partitionIndex;
private final List<String> locations;
@@ -114,14 +120,8 @@ public final class SparkTextFileBoundedSourceVertex
extends SourceVertex<String>
}
@Override
- public Iterable<String> read() throws IOException {
- // for setting up the same environment in the executors.
- final SparkContext sparkContext = SparkContext.getOrCreate(sparkConf);
-
- // Spark does lazy evaluation: it doesn't load the full data in rdd, but
only the partition it is asked for.
- final RDD<String> rdd = sparkContext.textFile(inputPath, numPartitions);
- return () -> JavaConverters.asJavaIteratorConverter(
- rdd.iterator(rdd.getPartitions()[partitionIndex],
TaskContext$.MODULE$.empty())).asJava();
+ public long readWatermark() {
+ throw new UnsupportedOperationException("No watermark");
}
@Override
@@ -132,5 +132,21 @@ public final class SparkTextFileBoundedSourceVertex
extends SourceVertex<String>
return locations;
}
}
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ protected Iterator<String> initializeIterator() {
+ // for setting up the same environment in the executors.
+ final SparkContext sparkContext = SparkContext.getOrCreate(sparkConf);
+
+ // Spark does lazy evaluation: it doesn't load the full data in rdd, but
only the partition it is asked for.
+ final RDD<String> rdd = sparkContext.textFile(inputPath, numPartitions);
+ final Iterable<String> iterable = () ->
JavaConverters.asJavaIteratorConverter(
+ rdd.iterator(rdd.getPartitions()[partitionIndex],
TaskContext$.MODULE$.empty())).asJava();
+ return iterable.iterator();
+ }
}
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
index 5b4777c..2ca3df8 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
@@ -22,11 +22,12 @@ import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import java.io.IOException;
+import java.util.NoSuchElementException;
/**
* An abstraction for fetching data from task-external sources.
*/
-abstract class DataFetcher {
+abstract class DataFetcher implements AutoCloseable {
private final IRVertex dataSource;
private final OutputCollector outputCollector;
@@ -42,7 +43,7 @@ abstract class DataFetcher {
* @throws IOException upon I/O error
* @throws java.util.NoSuchElementException if no more element is available
*/
- abstract Object fetchDataElement() throws IOException;
+ abstract Object fetchDataElement() throws IOException,
NoSuchElementException;
OutputCollector getOutputCollector() {
return outputCollector;
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 70680cc..6098b30 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.task;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Finishmark;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.datatransfer.InputReader;
import org.slf4j.Logger;
@@ -28,7 +29,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
@@ -94,8 +94,7 @@ class ParentTaskDataFetcher extends DataFetcher {
throw new IOException(e);
}
- // We throw the exception here, outside of the above try-catch region
- throw new NoSuchElementException();
+ return Finishmark.getInstance();
}
private void advanceIterator() throws IOException {
@@ -160,4 +159,9 @@ class ParentTaskDataFetcher extends DataFetcher {
LOG.error("Failed to get the number of bytes of encoded data - the data
is not ready yet ", e);
}
}
+
+ @Override
+ public void close() throws Exception {
+
+ }
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 505d246..9ea8fa8 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -20,49 +20,92 @@ package org.apache.nemo.runtime.executor.task;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.Readable;
-import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.common.punctuation.Finishmark;
import java.io.IOException;
-import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Fetches data from a data source.
*/
class SourceVertexDataFetcher extends DataFetcher {
private final Readable readable;
-
- // Non-finals (lazy fetching)
- private Iterator iterator;
private long boundedSourceReadTime = 0;
+ private static final long WATERMARK_PERIOD = 1000; // ms
+ private final ScheduledExecutorService watermarkTriggerService;
+ private boolean watermarkTriggered = false;
+ private final boolean bounded;
- SourceVertexDataFetcher(final IRVertex dataSource,
+ SourceVertexDataFetcher(final SourceVertex dataSource,
final Readable readable,
final OutputCollector outputCollector) {
super(dataSource, outputCollector);
this.readable = readable;
+ this.readable.prepare();
+ this.bounded = dataSource.isBounded();
+
+ if (!bounded) {
+ this.watermarkTriggerService = Executors.newScheduledThreadPool(1);
+ this.watermarkTriggerService.scheduleAtFixedRate(() -> {
+ watermarkTriggered = true;
+ }, WATERMARK_PERIOD, WATERMARK_PERIOD, TimeUnit.MILLISECONDS);
+ } else {
+ this.watermarkTriggerService = null;
+ }
}
+ /**
+ * This is non-blocking operation.
+ * @return current data
+ * @throws NoSuchElementException if the current data is not available
+ */
@Override
- Object fetchDataElement() throws IOException {
- if (iterator == null) {
- fetchDataLazily();
+ Object fetchDataElement() throws NoSuchElementException, IOException {
+ if (readable.isFinished()) {
+ return Finishmark.getInstance();
+ } else {
+ final long start = System.currentTimeMillis();
+ final Object element = retrieveElement();
+ boundedSourceReadTime += System.currentTimeMillis() - start;
+ return element;
}
+ }
- if (iterator.hasNext()) {
- return iterator.next();
- } else {
- throw new NoSuchElementException();
+ final long getBoundedSourceReadTime() {
+ return boundedSourceReadTime;
+ }
+
+ @Override
+ public void close() throws Exception {
+ readable.close();
+ if (watermarkTriggerService != null) {
+ watermarkTriggerService.shutdown();
}
}
- private void fetchDataLazily() throws IOException {
- final long start = System.currentTimeMillis();
- iterator = this.readable.read().iterator();
- boundedSourceReadTime += System.currentTimeMillis() - start;
+ private boolean isWatermarkTriggerTime() {
+ if (watermarkTriggered) {
+ watermarkTriggered = false;
+ return true;
+ } else {
+ return false;
+ }
}
- final long getBoundedSourceReadTime() {
- return boundedSourceReadTime;
+ private Object retrieveElement() throws NoSuchElementException, IOException {
+ // Emit watermark
+ if (!bounded && isWatermarkTriggerTime()) {
+ return new Watermark(readable.readWatermark());
+ }
+
+ // Data
+ final Object element = readable.readCurrent();
+ readable.advance();
+ return element;
}
}
diff --git
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index fac3e76..449542a 100644
---
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -28,6 +28,8 @@ import
org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdPrope
import org.apache.nemo.common.ir.vertex.*;
import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.common.punctuation.Finishmark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
@@ -62,7 +64,6 @@ import javax.annotation.concurrent.NotThreadSafe;
@NotThreadSafe
public final class TaskExecutor {
private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutor.class.getName());
- private static final int NONE_FINISHED = -1;
// Essential information
private boolean isExecuted;
@@ -194,7 +195,8 @@ public final class TaskExecutor {
// Source read
if (irVertex instanceof SourceVertex) {
// Source vertex read
- nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex,
sourceReader.get(), outputCollector));
+ nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(
+ (SourceVertex) irVertex, sourceReader.get(), outputCollector));
}
// Parent-task read (broadcasts)
@@ -243,6 +245,11 @@ public final class TaskExecutor {
outputCollector.emit(dataElement);
}
+ private void processWatermark(final OutputCollector outputCollector, final
Watermark watermark) {
+ // TODO #231: Add onWatermark() method to Transform and
+ // TODO #231: fowards watermark to Transforms and OutputWriters
+ }
+
/**
* Execute a task, while handling unrecoverable errors and exceptions.
*/
@@ -303,46 +310,145 @@ public final class TaskExecutor {
}
/**
+ * Process an element generated from the dataFetcher.
+ * If the element is an instance of Finishmark, we remove the dataFetcher
from the current list.
+ * @param element element
+ * @param dataFetcher current data fetcher
+ * @param dataFetchers current list
+ */
+ private void handleElement(final Object element,
+ final DataFetcher dataFetcher,
+ final List<DataFetcher> dataFetchers) {
+ if (element instanceof Finishmark) {
+ // We've consumed all the data from this data fetcher.
+ if (dataFetcher instanceof SourceVertexDataFetcher) {
+ boundedSourceReadTime += ((SourceVertexDataFetcher)
dataFetcher).getBoundedSourceReadTime();
+ } else if (dataFetcher instanceof ParentTaskDataFetcher) {
+ serializedReadBytes += ((ParentTaskDataFetcher)
dataFetcher).getSerializedBytes();
+ encodedReadBytes += ((ParentTaskDataFetcher)
dataFetcher).getEncodedBytes();
+ }
+
+ // remove current data fetcher from the list
+ dataFetchers.remove(dataFetcher);
+ } else if (element instanceof Watermark) {
+ // Watermark
+ processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
+ } else {
+ // Process data element
+ processElement(dataFetcher.getOutputCollector(), element);
+ }
+ }
+
+ /**
+ * Check if it is time to poll pending fetchers' data.
+ * @param pollingPeriod polling period
+ * @param currentTime current time
+ * @param prevTime prev time
+ */
+ private boolean isPollingTime(final long pollingPeriod,
+ final long currentTime,
+ final long prevTime) {
+ return (currentTime - prevTime) >= pollingPeriod;
+ }
+
+ /**
+ * This retrieves data from data fetchers and process them.
+ * It maintains two lists:
+ * -- availableFetchers: maintain data fetchers that currently have data
elements to retreive
+ * -- pendingFetchers: maintain data fetchers that currently do not have
available elements.
+ * This can become available in the future, and therefore we check the
pending fetchers every pollingInterval.
+ *
+ * If a data fetcher finishes, we remove it from the two lists.
+ * If a data fetcher has no available element, we move the data fetcher to
pendingFetchers
+ * If a pending data fetcher has element, we move it to availableFetchers
+ * If there are no available fetchers but pending fetchers, sleep for
pollingPeriod
+ * and retry fetching data from the pendingFetchers.
+ *
* @param fetchers to handle.
* @return false if IOException.
*/
private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
- final List<DataFetcher> availableFetchers = new ArrayList<>(fetchers);
- while (!availableFetchers.isEmpty()) { // empty means we've consumed all
task-external input data
- // For this looping of available fetchers.
- int finishedFetcherIndex = NONE_FINISHED;
- for (int i = 0; i < availableFetchers.size(); i++) {
- final DataFetcher dataFetcher = availableFetchers.get(i);
- final Object element;
+ final List<DataFetcher> availableFetchers = new LinkedList<>(fetchers);
+ final List<DataFetcher> pendingFetchers = new LinkedList<>();
+
+ // Polling interval.
+ final long pollingInterval = 100; // ms
+
+ // Previous polling time
+ long prevPollingTime = System.currentTimeMillis();
+
+ // empty means we've consumed all task-external input data
+ while (!availableFetchers.isEmpty() || !pendingFetchers.isEmpty()) {
+ // We first fetch data from available data fetchers
+ final Iterator<DataFetcher> availableIterator =
availableFetchers.iterator();
+
+ while (availableIterator.hasNext()) {
+ final DataFetcher dataFetcher = availableIterator.next();
try {
- element = dataFetcher.fetchDataElement();
- } catch (NoSuchElementException e) {
- // We've consumed all the data from this data fetcher.
- if (dataFetcher instanceof SourceVertexDataFetcher) {
- boundedSourceReadTime += ((SourceVertexDataFetcher)
dataFetcher).getBoundedSourceReadTime();
- } else if (dataFetcher instanceof ParentTaskDataFetcher) {
- serializedReadBytes += ((ParentTaskDataFetcher)
dataFetcher).getSerializedBytes();
- encodedReadBytes += ((ParentTaskDataFetcher)
dataFetcher).getEncodedBytes();
- }
- finishedFetcherIndex = i;
- break;
- } catch (IOException e) {
+ handleElement(dataFetcher.fetchDataElement(), dataFetcher,
availableFetchers);
+ } catch (final NoSuchElementException e) {
+ // No element in current data fetcher, fetch data from next fetcher
+ // move current data fetcher to pending.
+ availableIterator.remove();
+ pendingFetchers.add(dataFetcher);
+ } catch (final IOException e) {
// IOException means that this task should be retried.
taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
Optional.empty(),
Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
LOG.error("{} Execution Failed (Recoverable: input read failure)!
Exception: {}", taskId, e);
return false;
}
+ }
+
+ final Iterator<DataFetcher> pendingIterator = pendingFetchers.iterator();
+ final long currentTime = System.currentTimeMillis();
+ // We check pending data every polling interval
+ while (pendingIterator.hasNext()
+ && isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+ prevPollingTime = currentTime;
+
+ final DataFetcher dataFetcher = pendingIterator.next();
+ try {
+ handleElement(dataFetcher.fetchDataElement(), dataFetcher,
pendingFetchers);
- // Successfully fetched an element
- processElement(dataFetcher.getOutputCollector(), element);
+ // We processed data. This means the data fetcher is now available.
+ // Add current data fetcher to available
+ pendingIterator.remove();
+ availableFetchers.add(dataFetcher);
+
+ } catch (final NoSuchElementException e) {
+ // The current data fetcher is still pending.. try next data fetcher
+ } catch (final IOException e) {
+ // IOException means that this task should be retried.
+ taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
+ Optional.empty(),
Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
+ LOG.error("{} Execution Failed (Recoverable: input read failure)!
Exception: {}", taskId, e);
+ return false;
+ }
}
- // Remove the finished fetcher from the list
- if (finishedFetcherIndex != NONE_FINISHED) {
- availableFetchers.remove(finishedFetcherIndex);
+ // If there are no available fetchers,
+ // Sleep and retry fetching element from pending fetchers every polling
interval
+ if (availableFetchers.isEmpty() && !pendingFetchers.isEmpty()) {
+ try {
+ Thread.sleep(pollingInterval);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
}
}
+
+ // Close all data fetchers
+ fetchers.forEach(fetcher -> {
+ try {
+ fetcher.close();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ });
+
return true;
}
diff --git
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index b120241..705f833 100644
---
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.task;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Finishmark;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.datatransfer.InputReader;
import org.junit.Test;
@@ -44,16 +45,14 @@ import static org.mockito.Mockito.when;
@PrepareForTest({InputReader.class, VertexHarness.class})
public final class ParentTaskDataFetcherTest {
- @Test(timeout=5000, expected = NoSuchElementException.class)
+ @Test(timeout=5000)
public void testEmpty() throws Exception {
final List<String> empty = new ArrayList<>(0); // empty data
final InputReader inputReader =
generateInputReader(generateCompletableFuture(empty.iterator()));
// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
-
- // Should trigger the expected 'NoSuchElementException'
- fetcher.fetchDataElement();
+ assertEquals(Finishmark.getInstance(), fetcher.fetchDataElement());
}
@Test(timeout=5000)
diff --git
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index d346a9d..e0c4f95 100644
---
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -19,6 +19,7 @@
package org.apache.nemo.runtime.executor.task;
import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
@@ -32,6 +33,7 @@ import
org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
import org.apache.nemo.common.ir.vertex.InMemorySourceVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import org.apache.nemo.common.ir.vertex.IRVertex;
@@ -61,9 +63,11 @@ import java.io.Serializable;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
@@ -130,20 +134,32 @@ public final class TaskExecutorTest {
/**
* Test source vertex data fetching.
*/
- @Test(timeout=5000)
+ @Test()
public void testSourceVertexDataFetching() throws Exception {
final IRVertex sourceIRVertex = new InMemorySourceVertex<>(elements);
- final Readable readable = new Readable() {
+ final Readable readable = new BoundedIteratorReadable() {
+ @Override
+ protected Iterator initializeIterator() {
+ return elements.iterator();
+ }
+
@Override
- public Iterable read() throws IOException {
- return elements;
+ public long readWatermark() {
+ throw new UnsupportedOperationException();
}
+
@Override
public List<String> getLocations() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void close() throws IOException {
+
+ }
};
+
final Map<String, Readable> vertexIdToReadable = new HashMap<>();
vertexIdToReadable.put(sourceIRVertex.getId(), readable);
@@ -172,6 +188,116 @@ public final class TaskExecutorTest {
}
/**
+ * This test emits data and watermark by emulating an unbounded source
readable.
+ */
+ @Test()
+ public void testUnboundedSourceVertexDataFetching() throws Exception {
+ final IRVertex sourceIRVertex = new SourceVertex() {
+ @Override
+ public IRVertex getClone() {
+ return this;
+ }
+
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+
+ @Override
+ public List<Readable> getReadables(int desiredNumOfSplits) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public void clearInternalStates() {
+
+ }
+ };
+
+ final long watermark = 1234567L;
+ final AtomicLong emittedWatermark = new AtomicLong(0);
+
+ final Readable readable = new Readable() {
+ int pointer = 0;
+ final int middle = elements.size() / 2;
+ final int end = elements.size();
+ boolean watermarkEmitted = false;
+
+ @Override
+ public void prepare() {
+
+ }
+
+ // This emulates unbounded source that throws NoSuchElementException
+ // It reads current data until middle point and throws
NoSuchElementException at the middle point.
+ // It resumes the data reading after emitting a watermark, and finishes
at the end of the data.
+ @Override
+ public Object readCurrent() throws NoSuchElementException {
+ if (pointer == middle && !watermarkEmitted) {
+ throw new NoSuchElementException();
+ }
+
+ return elements.get(pointer);
+ }
+
+ @Override
+ public void advance() throws IOException {
+ pointer += 1;
+ }
+
+ @Override
+ public long readWatermark() {
+ watermarkEmitted = true;
+ emittedWatermark.set(watermark);
+ return watermark;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return pointer == end;
+ }
+
+ @Override
+ public List<String> getLocations() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ };
+
+ final Map<String, Readable> vertexIdToReadable = new HashMap<>();
+ vertexIdToReadable.put(sourceIRVertex.getId(), readable);
+
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
+ new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
+ .addVertex(sourceIRVertex)
+ .buildWithoutSourceSinkCheck();
+
+ final StageEdge taskOutEdge = mockStageEdgeFrom(sourceIRVertex);
+ final Task task =
+ new Task(
+ "testSourceVertexDataFetching",
+ generateTaskId(),
+ TASK_EXECUTION_PROPERTY_MAP,
+ new byte[0],
+ Collections.emptyList(),
+ Collections.singletonList(taskOutEdge),
+ vertexIdToReadable);
+
+ // Execute the task.
+ final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
+ taskExecutor.execute();
+
+ // Check whether the watermark is emitted
+ assertEquals(watermark, emittedWatermark.get());
+
+ // Check the output.
+ assertTrue(checkEqualElements(elements,
runtimeEdgeToOutputData.get(taskOutEdge.getId())));
+ }
+
+ /**
* Test parent task data fetching.
*/
@Test(timeout=5000)