This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 970751a  [NEMO-233] Emit watermark at unbounded source  (#130)
970751a is described below

commit 970751acfb4f9ca357310db6d12f1ccd6841cc43
Author: Taegeon Um <[email protected]>
AuthorDate: Mon Oct 29 20:54:20 2018 +0900

    [NEMO-233] Emit watermark at unbounded source  (#130)
    
    JIRA: [NEMO-233: Emit watermark at unbounded 
source](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-233)
    
    **Major changes:**
    - Change `Readable` interface to retrieve watermark. `Readable` does not 
return `Iterable`, which can block data fetching. Instead, `TaskExecutor` 
checks whether the `Readable` is finished or not, and retrieve data or 
watermark if it is not finished.
    - Add polling logic to `TaskExecutor`
    
    **Minor changes to note:**
    -
    
    **Tests for the changes:**
    - Test unbounded source readable
    
    **Other comments:**
    -
    
    Closes #130
---
 ...{Readable.java => BoundedIteratorReadable.java} |  46 +++---
 .../java/org/apache/nemo/common/ir/Readable.java   |  38 ++++-
 .../nemo/common/ir/vertex/CachedSourceVertex.java  |  40 +++++-
 .../common/ir/vertex/InMemorySourceVertex.java     |  25 +++-
 .../apache/nemo/common/ir/vertex/SourceVertex.java |   2 +
 .../apache/nemo/common/punctuation/Finishmark.java |  35 +++++
 .../Readable.java => punctuation/Watermark.java}   |  32 ++---
 .../apache/nemo/common/test/EmptyComponents.java   |  34 ++++-
 .../compiler/frontend/beam/PipelineTranslator.java |   3 +-
 .../beam/source/BeamBoundedSourceVertex.java       |  73 +++++++---
 .../beam/source/BeamUnboundedSourceVertex.java     | 128 ++++++++---------
 .../source/SparkDatasetBoundedSourceVertex.java    |  29 +++-
 .../source/SparkTextFileBoundedSourceVertex.java   |  34 +++--
 .../nemo/runtime/executor/task/DataFetcher.java    |   5 +-
 .../executor/task/ParentTaskDataFetcher.java       |  10 +-
 .../executor/task/SourceVertexDataFetcher.java     |  81 ++++++++---
 .../nemo/runtime/executor/task/TaskExecutor.java   | 158 +++++++++++++++++----
 .../executor/task/ParentTaskDataFetcherTest.java   |   7 +-
 .../runtime/executor/task/TaskExecutorTest.java    | 134 ++++++++++++++++-
 19 files changed, 700 insertions(+), 214 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java 
b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
similarity index 54%
copy from common/src/main/java/org/apache/nemo/common/ir/Readable.java
copy to 
common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
index 2955ed3..4d15755 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
@@ -18,30 +18,42 @@
  */
 package org.apache.nemo.common.ir;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
+import java.util.Iterator;
 
 /**
- * Interface for readable.
+ * An abstract readable class that retrieves data from iterator.
  * @param <O> output type.
  */
-public interface Readable<O> extends Serializable {
+public abstract class BoundedIteratorReadable<O> implements Readable<O> {
+
+  private Iterator<O> iterator;
+
   /**
-   * Method to read data from the source.
-   *
-   * @return an {@link Iterable} of the data read by the readable.
-   * @throws IOException exception while reading data.
+   * Initialize iterator.
+   * @return iterator
    */
-  Iterable<O> read() throws IOException;
+  protected abstract Iterator<O> initializeIterator();
 
   /**
-   * Returns the list of locations where this readable resides.
-   * Each location has a complete copy of the readable.
-   *
-   * @return List of locations where this readable resides
-   * @throws UnsupportedOperationException when this operation is not supported
-   * @throws Exception                     any other exceptions on the way
+   * Prepare reading data.
    */
-  List<String> getLocations() throws Exception;
+  @Override
+  public final void prepare() {
+    iterator = initializeIterator();
+  }
+
+  @Override
+  public final O readCurrent() {
+    return iterator.next();
+  }
+
+  @Override
+  public final void advance() {
+    // do nothing
+  }
+
+  @Override
+  public final boolean isFinished() {
+    return !iterator.hasNext();
+  }
 }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java 
b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
index 2955ed3..e0f7b7c 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
@@ -21,19 +21,44 @@ package org.apache.nemo.common.ir;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 /**
  * Interface for readable.
  * @param <O> output type.
  */
 public interface Readable<O> extends Serializable {
+
+  /**
+   * Prepare reading data.
+   */
+  void prepare();
+
   /**
-   * Method to read data from the source.
+   * Method to read current data from the source.
+   * The caller should check whether the Readable is finished or not by using 
isFinished() method
+   * before calling this method.
    *
-   * @return an {@link Iterable} of the data read by the readable.
-   * @throws IOException exception while reading data.
+   * It can throw NoSuchElementException although it is not finished in 
Unbounded source.
+   * @return a data read by the readable.
+   */
+  O readCurrent() throws NoSuchElementException;
+
+  /**
+   * Advance current data point.
+   */
+  void advance() throws IOException;
+
+  /**
+   * Read watermark.
+   * @return watermark
+   */
+  long readWatermark();
+
+  /**
+   * @return true if it reads all data.
    */
-  Iterable<O> read() throws IOException;
+  boolean isFinished();
 
   /**
    * Returns the list of locations where this readable resides.
@@ -44,4 +69,9 @@ public interface Readable<O> extends Serializable {
    * @throws Exception                     any other exceptions on the way
    */
   List<String> getLocations() throws Exception;
+
+  /**
+   * Close.
+   */
+  void close() throws IOException;
 }
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
index fe42ed5..6e909cf 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
@@ -22,7 +22,6 @@ import org.apache.nemo.common.ir.Readable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -62,6 +61,12 @@ public final class CachedSourceVertex<T> extends 
SourceVertex<T> {
   }
 
   @Override
+  public boolean isBounded() {
+    // It supports only bounded source.
+    return true;
+  }
+
+  @Override
   public List<Readable<T>> getReadables(final int desiredNumOfSplits) {
     // Ignore the desired number of splits.
     return readables;
@@ -77,7 +82,6 @@ public final class CachedSourceVertex<T> extends 
SourceVertex<T> {
    * It does not contain any actual data but the data will be sent from the 
cached store through external input reader.
    */
   private final class CachedReadable implements Readable<T> {
-
     /**
      * Constructor.
      */
@@ -86,13 +90,41 @@ public final class CachedSourceVertex<T> extends 
SourceVertex<T> {
     }
 
     @Override
-    public Iterable<T> read() throws IOException {
-      return Collections.emptyList();
+    public void prepare() {
+
+    }
+
+    @Override
+    public T readCurrent() {
+      throw new UnsupportedOperationException(
+        "CachedSourceVertex should not be used");
+    }
+
+    @Override
+    public void advance() throws IOException {
+      throw new UnsupportedOperationException(
+        "CachedSourceVertex should not be used");
+    }
+
+    @Override
+    public long readWatermark() {
+      throw new UnsupportedOperationException(
+        "CachedSourceVertex should not be used");
+    }
+
+    @Override
+    public boolean isFinished() {
+      return true;
     }
 
     @Override
     public List<String> getLocations() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void close() throws IOException {
+
+    }
   }
 }
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
index 6cf2009..3278963 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
@@ -18,8 +18,10 @@
  */
 package org.apache.nemo.common.ir.vertex;
 
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
 import org.apache.nemo.common.ir.Readable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -57,6 +59,11 @@ public final class InMemorySourceVertex<T> extends 
SourceVertex<T> {
   }
 
   @Override
+  public boolean isBounded() {
+    return true;
+  }
+
+  @Override
   public List<Readable<T>> getReadables(final int desiredNumOfSplits) throws 
Exception {
 
     final List<Readable<T>> readables = new ArrayList<>();
@@ -88,7 +95,8 @@ public final class InMemorySourceVertex<T> extends 
SourceVertex<T> {
    * Simply returns the in-memory data.
    * @param <T> type of the data.
    */
-  private static final class InMemorySourceReadable<T> implements Readable<T> {
+  private static final class InMemorySourceReadable<T> extends 
BoundedIteratorReadable<T> {
+
     private final Iterable<T> initializedSourceData;
 
     /**
@@ -96,17 +104,28 @@ public final class InMemorySourceVertex<T> extends 
SourceVertex<T> {
      * @param initializedSourceData the source data.
      */
     private InMemorySourceReadable(final Iterable<T> initializedSourceData) {
+      super();
       this.initializedSourceData = initializedSourceData;
     }
 
     @Override
-    public Iterable<T> read() {
-      return this.initializedSourceData;
+    protected Iterator<T> initializeIterator() {
+      return initializedSourceData.iterator();
+    }
+
+    @Override
+    public long readWatermark() {
+      throw new UnsupportedOperationException("No watermark");
     }
 
     @Override
     public List<String> getLocations() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void close() throws IOException {
+
+    }
   }
 }
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java
index 6a86064..7892490 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java
@@ -36,6 +36,8 @@ public abstract class SourceVertex<O> extends IRVertex {
     super();
   }
 
+  public abstract boolean isBounded();
+
   /**
    * Copy Constructor for SourceVertex.
    *
diff --git 
a/common/src/main/java/org/apache/nemo/common/punctuation/Finishmark.java 
b/common/src/main/java/org/apache/nemo/common/punctuation/Finishmark.java
new file mode 100644
index 0000000..e801c25
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Finishmark.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+/**
+ * Finish mark that notifies the data fetching is finished.
+ * This is only used for bounded source because unbounded source does not 
finish.
+ */
+public final class Finishmark {
+  private static final Finishmark INSTANCE = new Finishmark();
+
+  private Finishmark() {
+
+  }
+
+  public static Finishmark getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java 
b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
similarity index 51%
copy from common/src/main/java/org/apache/nemo/common/ir/Readable.java
copy to common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index 2955ed3..4f24a80 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -16,32 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.common.ir;
+package org.apache.nemo.common.punctuation;
 
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.List;
 
 /**
- * Interface for readable.
- * @param <O> output type.
+ * Watermark event.
  */
-public interface Readable<O> extends Serializable {
-  /**
-   * Method to read data from the source.
-   *
-   * @return an {@link Iterable} of the data read by the readable.
-   * @throws IOException exception while reading data.
-   */
-  Iterable<O> read() throws IOException;
+public final class Watermark implements Serializable {
+  private final long timestamp;
+  public Watermark(final long timestamp) {
+    this.timestamp = timestamp;
+  }
 
-  /**
-   * Returns the list of locations where this readable resides.
-   * Each location has a complete copy of the readable.
-   *
-   * @return List of locations where this readable resides
-   * @throws UnsupportedOperationException when this operation is not supported
-   * @throws Exception                     any other exceptions on the way
-   */
-  List<String> getLocations() throws Exception;
+  public long getTimestamp() {
+    return timestamp;
+  }
 }
diff --git 
a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java 
b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 1ce2637..9c95bed 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -36,6 +36,7 @@ import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.beam.sdk.values.KV;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -205,6 +206,11 @@ public final class EmptyComponents {
     }
 
     @Override
+    public boolean isBounded() {
+      return true;
+    }
+
+    @Override
     public List<Readable<T>> getReadables(final int desirednumOfSplits) {
       final List list = new ArrayList(desirednumOfSplits);
       for (int i = 0; i < desirednumOfSplits; i++) {
@@ -230,13 +236,37 @@ public final class EmptyComponents {
    */
   static final class EmptyReadable<T> implements Readable<T> {
     @Override
-    public Iterable<T> read() {
-      return new ArrayList<>();
+    public void prepare() {
+
+    }
+
+    @Override
+    public T readCurrent() {
+      return null;
+    }
+
+    @Override
+    public void advance() {
+    }
+
+    @Override
+    public long readWatermark() {
+      return 0;
+    }
+
+    @Override
+    public boolean isFinished() {
+      return true;
     }
 
     @Override
     public List<String> getLocations() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void close() throws IOException {
+
+    }
   }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 89be5ba..58a6d4a 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -24,6 +24,7 @@ import 
org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
@@ -225,7 +226,7 @@ public final class PipelineTranslator
       
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
     final TupleTag mainOutputTag = new TupleTag<>();
 
-    if (mainInput.getWindowingStrategy() == WindowingStrategy.globalDefault()) 
{
+    if (mainInput.getWindowingStrategy().getWindowFn() instanceof 
GlobalWindows) {
       return new GroupByKeyTransform();
     } else {
       return new GroupByKeyAndWindowDoFnTransform(
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 2602e72..880a3a0 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Function;
 
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -72,6 +73,11 @@ public final class BeamBoundedSourceVertex<O> extends 
SourceVertex<WindowedValue
   }
 
   @Override
+  public boolean isBounded() {
+    return true;
+  }
+
+  @Override
   public List<Readable<WindowedValue<O>>> getReadables(final int 
desiredNumOfSplits) throws Exception {
     final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
     LOG.info("estimate: {}", source.getEstimatedSizeBytes(null));
@@ -99,6 +105,9 @@ public final class BeamBoundedSourceVertex<O> extends 
SourceVertex<WindowedValue
    */
   private static final class BoundedSourceReadable<T> implements 
Readable<WindowedValue<T>> {
     private final BoundedSource<T> boundedSource;
+    private boolean finished = false;
+    private BoundedSource.BoundedReader<T> reader;
+    private Function<T, WindowedValue<T>> windowedValueConverter;
 
     /**
      * Constructor of the BoundedSourceReadable.
@@ -109,32 +118,48 @@ public final class BeamBoundedSourceVertex<O> extends 
SourceVertex<WindowedValue
     }
 
     @Override
-    public Iterable<WindowedValue<T>> read() throws IOException {
-      boolean started = false;
-      boolean windowed = false;
-
-      final ArrayList<WindowedValue<T>> elements = new ArrayList<>();
-      try (BoundedSource.BoundedReader<T> reader = 
boundedSource.createReader(null)) {
-        for (boolean available = reader.start(); available; available = 
reader.advance()) {
-          final T elem = reader.getCurrent();
-
-          // Check whether the element is windowed or not
-          // We only have to check the first element.
-          if (!started) {
-            started = true;
-            if (elem instanceof WindowedValue) {
-              windowed = true;
-            }
-          }
+    public void prepare() {
+      try {
+        reader = boundedSource.createReader(null);
+        finished = !reader.start();
+
+        if (!finished) {
+          T elem = reader.getCurrent();
 
-          if (!windowed) {
-            
elements.add(WindowedValue.valueInGlobalWindow(reader.getCurrent()));
+          if (elem instanceof WindowedValue) {
+            windowedValueConverter = val -> (WindowedValue) val;
           } else {
-            elements.add((WindowedValue<T>) elem);
+            windowedValueConverter = WindowedValue::valueInGlobalWindow;
           }
         }
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
       }
-      return elements;
+    }
+
+    @Override
+    public WindowedValue<T> readCurrent() {
+      if (finished) {
+        throw new IllegalStateException("Bounded reader read all elements");
+      }
+
+      final T elem = reader.getCurrent();
+      return windowedValueConverter.apply(elem);
+    }
+
+    @Override
+    public void advance() throws IOException {
+      finished = !reader.advance();
+    }
+
+    @Override
+    public long readWatermark() {
+      throw new UnsupportedOperationException("No watermark");
+    }
+
+    @Override
+    public boolean isFinished() {
+      return finished;
     }
 
     @Override
@@ -149,5 +174,11 @@ public final class BeamBoundedSourceVertex<O> extends 
SourceVertex<WindowedValue
         throw new UnsupportedOperationException();
       }
     }
+
+    @Override
+    public void close() throws IOException {
+      finished = true;
+      reader.close();
+    }
   }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index 8ce26fe..97adc5b 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -29,8 +29,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
 
 /**
  * SourceVertex implementation for UnboundedSource.
@@ -38,14 +39,12 @@ import java.util.List;
  * @param <M> checkpoint mark type.
  */
 public final class BeamUnboundedSourceVertex<O, M extends 
UnboundedSource.CheckpointMark> extends
-  SourceVertex<WindowedValue<O>> {
+  SourceVertex<Object> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamUnboundedSourceVertex.class.getName());
   private UnboundedSource<O, M> source;
   private final String sourceDescription;
 
-  private static final long POLLING_INTERVAL = 10L;
-
   /**
    * The default constructor for beam unbounded source.
    * @param source unbounded source.
@@ -68,8 +67,13 @@ public final class BeamUnboundedSourceVertex<O, M extends 
UnboundedSource.Checkp
   }
 
   @Override
-  public List<Readable<WindowedValue<O>>> getReadables(final int 
desiredNumOfSplits) throws Exception {
-    final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
+  public boolean isBounded() {
+    return false;
+  }
+
+  @Override
+  public List<Readable<Object>> getReadables(final int desiredNumOfSplits) 
throws Exception {
+    final List<Readable<Object>> readables = new ArrayList<>();
     source.split(desiredNumOfSplits, null)
       .forEach(unboundedSource -> readables.add(new 
UnboundedSourceReadable<>(unboundedSource)));
     return readables;
@@ -93,90 +97,80 @@ public final class BeamUnboundedSourceVertex<O, M extends 
UnboundedSource.Checkp
    * @param <M> checkpoint mark type.
    */
   private static final class UnboundedSourceReadable<O, M extends 
UnboundedSource.CheckpointMark>
-      implements Readable<WindowedValue<O>> {
+      implements Readable<Object> {
     private final UnboundedSource<O, M> unboundedSource;
+    private UnboundedSource.UnboundedReader<O> reader;
+    private Function<O, WindowedValue<O>> windowedValueConverter;
+    private boolean finished = false;
 
     UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
       this.unboundedSource = unboundedSource;
     }
 
     @Override
-    public Iterable<WindowedValue<O>> read() throws IOException {
-      return new UnboundedSourceIterable<>(unboundedSource);
-    }
+    public void prepare() {
+      try {
+        reader = unboundedSource.createReader(null, null);
+        reader.start();
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
 
-    @Override
-    public List<String> getLocations() throws Exception {
-      return new ArrayList<>();
+      // get first element
+      final O firstElement = retrieveFirstElement();
+      if (firstElement instanceof WindowedValue) {
+        windowedValueConverter = val -> (WindowedValue) val;
+      } else {
+        windowedValueConverter = WindowedValue::valueInGlobalWindow;
+      }
     }
-  }
-
-  /**
-   * The iterable class for unbounded sources.
-   * @param <O> output type.
-   * @param <M> checkpoint mark type.
-   */
-  private static final class UnboundedSourceIterable<O, M extends 
UnboundedSource.CheckpointMark>
-      implements Iterable<WindowedValue<O>> {
 
-    private UnboundedSourceIterator<O, M> iterator;
-
-    UnboundedSourceIterable(final UnboundedSource<O, M> unboundedSource) 
throws IOException {
-      this.iterator = new UnboundedSourceIterator<>(unboundedSource);
+    private O retrieveFirstElement() {
+      while (true) {
+        try {
+          return reader.getCurrent();
+        } catch (final NoSuchElementException e) {
+          // the first element is not currently available... retry
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e1) {
+            e1.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+      }
     }
 
     @Override
-    public Iterator<WindowedValue<O>> iterator() {
-      return iterator;
+    public Object readCurrent() {
+      final O elem = reader.getCurrent();
+      return windowedValueConverter.apply(elem);
     }
-  }
 
-  /**
-   * The iterator for unbounded sources.
-   * @param <O> output type.
-   * @param <M> checkpoint mark type.
-   */
-  // TODO #233: Emit watermark at unbounded source
-  private static final class UnboundedSourceIterator<O, M extends 
UnboundedSource.CheckpointMark>
-      implements Iterator<WindowedValue<O>> {
+    @Override
+    public void advance() throws IOException {
+      reader.advance();
+    }
 
-    private final UnboundedSource.UnboundedReader<O> unboundedReader;
-    private boolean available;
+    @Override
+    public long readWatermark() {
+      return reader.getWatermark().getMillis();
+    }
 
-    UnboundedSourceIterator(final UnboundedSource<O, M> unboundedSource) 
throws IOException {
-      this.unboundedReader = unboundedSource.createReader(null, null);
-      available = unboundedReader.start();
+    @Override
+    public boolean isFinished() {
+      return finished;
     }
 
     @Override
-    public boolean hasNext() {
-      // Unbounded source always has next element until it finishes.
-      return true;
+    public List<String> getLocations() throws Exception {
+      return new ArrayList<>();
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public WindowedValue<O> next() {
-      try {
-        while (true) {
-          if (!available) {
-            Thread.sleep(POLLING_INTERVAL);
-          } else {
-            final O element = unboundedReader.getCurrent();
-            final boolean windowed = element instanceof WindowedValue;
-            if (!windowed) {
-              return WindowedValue.valueInGlobalWindow(element);
-            } else {
-              return (WindowedValue<O>) element;
-            }
-          }
-          available = unboundedReader.advance();
-        }
-      } catch (final InterruptedException | IOException e) {
-        LOG.error("Exception occurred while waiting for the events...");
-        e.printStackTrace();
-        throw new RuntimeException(e);
-      }
+    public void close() throws IOException {
+      finished = true;
+      reader.close();
     }
   }
 }
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
index 18eefb4..31502be 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.spark.source;
 
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.compiler.frontend.spark.sql.Dataset;
@@ -74,6 +75,11 @@ public final class SparkDatasetBoundedSourceVertex<T> 
extends SourceVertex<T> {
   }
 
   @Override
+  public boolean isBounded() {
+    return true;
+  }
+
+  @Override
   public List<Readable<T>> getReadables(final int desiredNumOfSplits) {
     return readables;
   }
@@ -86,7 +92,7 @@ public final class SparkDatasetBoundedSourceVertex<T> extends 
SourceVertex<T> {
   /**
    * A Readable wrapper for Spark Dataset.
    */
-  private final class SparkDatasetBoundedSourceReadable implements Readable<T> 
{
+  private final class SparkDatasetBoundedSourceReadable extends 
BoundedIteratorReadable<T> {
     private final LinkedHashMap<String, Object[]> commands;
     private final Map<String, String> sessionInitialConf;
     private final int partitionIndex;
@@ -111,11 +117,11 @@ public final class SparkDatasetBoundedSourceVertex<T> 
extends SourceVertex<T> {
     }
 
     @Override
-    public Iterable<T> read() throws IOException {
+    protected Iterator<T> initializeIterator() {
       // for setting up the same environment in the executors.
       final SparkSession spark = SparkSession.builder()
-          .config(sessionInitialConf)
-          .getOrCreate();
+        .config(sessionInitialConf)
+        .getOrCreate();
       final Dataset<T> dataset;
 
       try {
@@ -126,8 +132,14 @@ public final class SparkDatasetBoundedSourceVertex<T> 
extends SourceVertex<T> {
 
       // Spark does lazy evaluation: it doesn't load the full dataset, but 
only the partition it is asked for.
       final RDD<T> rdd = dataset.sparkRDD();
-      return () -> JavaConverters.asJavaIteratorConverter(
-          rdd.iterator(rdd.getPartitions()[partitionIndex], 
TaskContext$.MODULE$.empty())).asJava();
+      final Iterable<T> iterable = () -> 
JavaConverters.asJavaIteratorConverter(
+        rdd.iterator(rdd.getPartitions()[partitionIndex], 
TaskContext$.MODULE$.empty())).asJava();
+      return iterable.iterator();
+    }
+
+    @Override
+    public long readWatermark() {
+      throw new UnsupportedOperationException("No watermark");
     }
 
     @Override
@@ -138,5 +150,10 @@ public final class SparkDatasetBoundedSourceVertex<T> 
extends SourceVertex<T> {
         return locations;
       }
     }
+
+    @Override
+    public void close() throws IOException {
+
+    }
   }
 }
diff --git 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
index ddfa85f..f0294b1 100644
--- 
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
+++ 
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.spark.source;
 
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.spark.*;
@@ -73,6 +74,11 @@ public final class SparkTextFileBoundedSourceVertex extends 
SourceVertex<String>
   }
 
   @Override
+  public boolean isBounded() {
+    return true;
+  }
+
+  @Override
   public List<Readable<String>> getReadables(final int desiredNumOfSplits) {
     return readables;
   }
@@ -85,7 +91,7 @@ public final class SparkTextFileBoundedSourceVertex extends 
SourceVertex<String>
   /**
    * A Readable wrapper for Spark text file.
    */
-  private final class SparkTextFileBoundedSourceReadable implements 
Readable<String> {
+  private final class SparkTextFileBoundedSourceReadable extends 
BoundedIteratorReadable<String> {
     private final SparkConf sparkConf;
     private final int partitionIndex;
     private final List<String> locations;
@@ -114,14 +120,8 @@ public final class SparkTextFileBoundedSourceVertex 
extends SourceVertex<String>
     }
 
     @Override
-    public Iterable<String> read() throws IOException {
-      // for setting up the same environment in the executors.
-      final SparkContext sparkContext = SparkContext.getOrCreate(sparkConf);
-
-      // Spark does lazy evaluation: it doesn't load the full data in rdd, but 
only the partition it is asked for.
-      final RDD<String> rdd = sparkContext.textFile(inputPath, numPartitions);
-      return () -> JavaConverters.asJavaIteratorConverter(
-          rdd.iterator(rdd.getPartitions()[partitionIndex], 
TaskContext$.MODULE$.empty())).asJava();
+    public long readWatermark() {
+      throw new UnsupportedOperationException("No watermark");
     }
 
     @Override
@@ -132,5 +132,21 @@ public final class SparkTextFileBoundedSourceVertex 
extends SourceVertex<String>
         return locations;
       }
     }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    protected Iterator<String> initializeIterator() {
+      // for setting up the same environment in the executors.
+      final SparkContext sparkContext = SparkContext.getOrCreate(sparkConf);
+
+      // Spark does lazy evaluation: it doesn't load the full data in rdd, but 
only the partition it is asked for.
+      final RDD<String> rdd = sparkContext.textFile(inputPath, numPartitions);
+      final Iterable<String> iterable = () -> 
JavaConverters.asJavaIteratorConverter(
+        rdd.iterator(rdd.getPartitions()[partitionIndex], 
TaskContext$.MODULE$.empty())).asJava();
+      return iterable.iterator();
+    }
   }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
index 5b4777c..2ca3df8 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
@@ -22,11 +22,12 @@ import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 
 import java.io.IOException;
+import java.util.NoSuchElementException;
 
 /**
  * An abstraction for fetching data from task-external sources.
  */
-abstract class DataFetcher {
+abstract class DataFetcher implements AutoCloseable {
   private final IRVertex dataSource;
   private final OutputCollector outputCollector;
 
@@ -42,7 +43,7 @@ abstract class DataFetcher {
    * @throws IOException upon I/O error
    * @throws java.util.NoSuchElementException if no more element is available
    */
-  abstract Object fetchDataElement() throws IOException;
+  abstract Object fetchDataElement() throws IOException, 
NoSuchElementException;
 
   OutputCollector getOutputCollector() {
     return outputCollector;
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 70680cc..6098b30 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.task;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
 import org.slf4j.Logger;
@@ -28,7 +29,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -94,8 +94,7 @@ class ParentTaskDataFetcher extends DataFetcher {
       throw new IOException(e);
     }
 
-    // We throw the exception here, outside of the above try-catch region
-    throw new NoSuchElementException();
+    return Finishmark.getInstance();
   }
 
   private void advanceIterator() throws IOException {
@@ -160,4 +159,9 @@ class ParentTaskDataFetcher extends DataFetcher {
       LOG.error("Failed to get the number of bytes of encoded data - the data 
is not ready yet ", e);
     }
   }
+
+  @Override
+  public void close() throws Exception {
+
+  }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 505d246..9ea8fa8 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -20,49 +20,92 @@ package org.apache.nemo.runtime.executor.task;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
-import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.common.punctuation.Finishmark;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Fetches data from a data source.
  */
 class SourceVertexDataFetcher extends DataFetcher {
   private final Readable readable;
-
-  // Non-finals (lazy fetching)
-  private Iterator iterator;
   private long boundedSourceReadTime = 0;
+  private static final long WATERMARK_PERIOD = 1000; // ms
+  private final ScheduledExecutorService watermarkTriggerService;
+  private boolean watermarkTriggered = false;
+  private final boolean bounded;
 
-  SourceVertexDataFetcher(final IRVertex dataSource,
+  SourceVertexDataFetcher(final SourceVertex dataSource,
                           final Readable readable,
                           final OutputCollector outputCollector) {
     super(dataSource, outputCollector);
     this.readable = readable;
+    this.readable.prepare();
+    this.bounded = dataSource.isBounded();
+
+    if (!bounded) {
+      this.watermarkTriggerService = Executors.newScheduledThreadPool(1);
+      this.watermarkTriggerService.scheduleAtFixedRate(() -> {
+        watermarkTriggered = true;
+      }, WATERMARK_PERIOD, WATERMARK_PERIOD, TimeUnit.MILLISECONDS);
+    } else {
+      this.watermarkTriggerService = null;
+    }
   }
 
+  /**
+   * This is non-blocking operation.
+   * @return current data
+   * @throws NoSuchElementException if the current data is not available
+   */
   @Override
-  Object fetchDataElement() throws IOException {
-    if (iterator == null) {
-      fetchDataLazily();
+  Object fetchDataElement() throws NoSuchElementException, IOException {
+    if (readable.isFinished()) {
+      return Finishmark.getInstance();
+    } else {
+      final long start = System.currentTimeMillis();
+      final Object element = retrieveElement();
+      boundedSourceReadTime += System.currentTimeMillis() - start;
+      return element;
     }
+  }
 
-    if (iterator.hasNext()) {
-      return iterator.next();
-    } else {
-      throw new NoSuchElementException();
+  final long getBoundedSourceReadTime() {
+    return boundedSourceReadTime;
+  }
+
+  @Override
+  public void close() throws Exception {
+    readable.close();
+    if (watermarkTriggerService != null) {
+      watermarkTriggerService.shutdown();
     }
   }
 
-  private void fetchDataLazily() throws IOException {
-    final long start = System.currentTimeMillis();
-    iterator = this.readable.read().iterator();
-    boundedSourceReadTime += System.currentTimeMillis() - start;
+  private boolean isWatermarkTriggerTime() {
+    if (watermarkTriggered) {
+      watermarkTriggered = false;
+      return true;
+    } else {
+      return false;
+    }
   }
 
-  final long getBoundedSourceReadTime() {
-    return boundedSourceReadTime;
+  private Object retrieveElement() throws NoSuchElementException, IOException {
+    // Emit watermark
+    if (!bounded && isWatermarkTriggerTime()) {
+      return new Watermark(readable.readWatermark());
+    }
+
+    // Data
+    final Object element = readable.readCurrent();
+    readable.advance();
+    return element;
   }
 }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index fac3e76..449542a 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -28,6 +28,8 @@ import 
org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdPrope
 import org.apache.nemo.common.ir.vertex.*;
 import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
@@ -62,7 +64,6 @@ import javax.annotation.concurrent.NotThreadSafe;
 @NotThreadSafe
 public final class TaskExecutor {
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class.getName());
-  private static final int NONE_FINISHED = -1;
 
   // Essential information
   private boolean isExecuted;
@@ -194,7 +195,8 @@ public final class TaskExecutor {
       // Source read
       if (irVertex instanceof SourceVertex) {
         // Source vertex read
-        nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex, 
sourceReader.get(), outputCollector));
+        nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(
+          (SourceVertex) irVertex, sourceReader.get(), outputCollector));
       }
 
       // Parent-task read (broadcasts)
@@ -243,6 +245,11 @@ public final class TaskExecutor {
     outputCollector.emit(dataElement);
   }
 
+  private void processWatermark(final OutputCollector outputCollector, final 
Watermark watermark) {
+    // TODO #231: Add onWatermark() method to Transform and
+    // TODO #231: fowards watermark to Transforms and OutputWriters
+  }
+
   /**
    * Execute a task, while handling unrecoverable errors and exceptions.
    */
@@ -303,46 +310,145 @@ public final class TaskExecutor {
   }
 
   /**
+   * Process an element generated from the dataFetcher.
+   * If the element is an instance of Finishmark, we remove the dataFetcher 
from the current list.
+   * @param element element
+   * @param dataFetcher current data fetcher
+   * @param dataFetchers current list
+   */
+  private void handleElement(final Object element,
+                             final DataFetcher dataFetcher,
+                             final List<DataFetcher> dataFetchers) {
+    if (element instanceof Finishmark) {
+      // We've consumed all the data from this data fetcher.
+      if (dataFetcher instanceof SourceVertexDataFetcher) {
+        boundedSourceReadTime += ((SourceVertexDataFetcher) 
dataFetcher).getBoundedSourceReadTime();
+      } else if (dataFetcher instanceof ParentTaskDataFetcher) {
+        serializedReadBytes += ((ParentTaskDataFetcher) 
dataFetcher).getSerializedBytes();
+        encodedReadBytes += ((ParentTaskDataFetcher) 
dataFetcher).getEncodedBytes();
+      }
+
+      // remove current data fetcher from the list
+      dataFetchers.remove(dataFetcher);
+    } else if (element instanceof Watermark) {
+      // Watermark
+      processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
+    } else {
+      // Process data element
+      processElement(dataFetcher.getOutputCollector(), element);
+    }
+  }
+
+  /**
+   * Check if it is time to poll pending fetchers' data.
+   * @param pollingPeriod polling period
+   * @param currentTime current time
+   * @param prevTime prev time
+   */
+  private boolean isPollingTime(final long pollingPeriod,
+                                final long currentTime,
+                                final long prevTime) {
+    return (currentTime - prevTime) >= pollingPeriod;
+  }
+
+  /**
+   * This retrieves data from data fetchers and process them.
+   * It maintains two lists:
+   *  -- availableFetchers: maintain data fetchers that currently have data 
elements to retreive
+   *  -- pendingFetchers: maintain data fetchers that currently do not have 
available elements.
+   *     This can become available in the future, and therefore we check the 
pending fetchers every pollingInterval.
+   *
+   *  If a data fetcher finishes, we remove it from the two lists.
+   *  If a data fetcher has no available element, we move the data fetcher to 
pendingFetchers
+   *  If a pending data fetcher has element, we move it to availableFetchers
+   *  If there are no available fetchers but pending fetchers, sleep for 
pollingPeriod
+   *  and retry fetching data from the pendingFetchers.
+   *
    * @param fetchers to handle.
    * @return false if IOException.
    */
   private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
-    final List<DataFetcher> availableFetchers = new ArrayList<>(fetchers);
-    while (!availableFetchers.isEmpty()) { // empty means we've consumed all 
task-external input data
-      // For this looping of available fetchers.
-      int finishedFetcherIndex = NONE_FINISHED;
-      for (int i = 0; i < availableFetchers.size(); i++) {
-        final DataFetcher dataFetcher = availableFetchers.get(i);
-        final Object element;
+    final List<DataFetcher> availableFetchers = new LinkedList<>(fetchers);
+    final List<DataFetcher> pendingFetchers = new LinkedList<>();
+
+    // Polling interval.
+    final long pollingInterval = 100; // ms
+
+    // Previous polling time
+    long prevPollingTime = System.currentTimeMillis();
+
+    // empty means we've consumed all task-external input data
+    while (!availableFetchers.isEmpty() || !pendingFetchers.isEmpty()) {
+      // We first fetch data from available data fetchers
+      final Iterator<DataFetcher> availableIterator = 
availableFetchers.iterator();
+
+      while (availableIterator.hasNext()) {
+        final DataFetcher dataFetcher = availableIterator.next();
         try {
-          element = dataFetcher.fetchDataElement();
-        } catch (NoSuchElementException e) {
-          // We've consumed all the data from this data fetcher.
-          if (dataFetcher instanceof SourceVertexDataFetcher) {
-            boundedSourceReadTime += ((SourceVertexDataFetcher) 
dataFetcher).getBoundedSourceReadTime();
-          } else if (dataFetcher instanceof ParentTaskDataFetcher) {
-            serializedReadBytes += ((ParentTaskDataFetcher) 
dataFetcher).getSerializedBytes();
-            encodedReadBytes += ((ParentTaskDataFetcher) 
dataFetcher).getEncodedBytes();
-          }
-          finishedFetcherIndex = i;
-          break;
-        } catch (IOException e) {
+          handleElement(dataFetcher.fetchDataElement(), dataFetcher, 
availableFetchers);
+        } catch (final NoSuchElementException e) {
+          // No element in current data fetcher, fetch data from next fetcher
+          // move current data fetcher to pending.
+          availableIterator.remove();
+          pendingFetchers.add(dataFetcher);
+        } catch (final IOException e) {
           // IOException means that this task should be retried.
           taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
             Optional.empty(), 
Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
           LOG.error("{} Execution Failed (Recoverable: input read failure)! 
Exception: {}", taskId, e);
           return false;
         }
+      }
+
+      final Iterator<DataFetcher> pendingIterator = pendingFetchers.iterator();
+      final long currentTime = System.currentTimeMillis();
+      // We check pending data every polling interval
+      while (pendingIterator.hasNext()
+        && isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+        prevPollingTime = currentTime;
+
+        final DataFetcher dataFetcher = pendingIterator.next();
+        try {
+          handleElement(dataFetcher.fetchDataElement(), dataFetcher, 
pendingFetchers);
 
-        // Successfully fetched an element
-        processElement(dataFetcher.getOutputCollector(), element);
+          // We processed data. This means the data fetcher is now available.
+          // Add current data fetcher to available
+          pendingIterator.remove();
+          availableFetchers.add(dataFetcher);
+
+        } catch (final NoSuchElementException e) {
+          // The current data fetcher is still pending.. try next data fetcher
+        } catch (final IOException e) {
+          // IOException means that this task should be retried.
+          taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
+            Optional.empty(), 
Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
+          LOG.error("{} Execution Failed (Recoverable: input read failure)! 
Exception: {}", taskId, e);
+          return false;
+        }
       }
 
-      // Remove the finished fetcher from the list
-      if (finishedFetcherIndex != NONE_FINISHED) {
-        availableFetchers.remove(finishedFetcherIndex);
+      // If there are no available fetchers,
+      // Sleep and retry fetching element from pending fetchers every polling 
interval
+      if (availableFetchers.isEmpty() && !pendingFetchers.isEmpty()) {
+        try {
+          Thread.sleep(pollingInterval);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
       }
     }
+
+    // Close all data fetchers
+    fetchers.forEach(fetcher -> {
+      try {
+        fetcher.close();
+      } catch (final Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+    });
+
     return true;
   }
 
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index b120241..705f833 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.task;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
 import org.junit.Test;
@@ -44,16 +45,14 @@ import static org.mockito.Mockito.when;
 @PrepareForTest({InputReader.class, VertexHarness.class})
 public final class ParentTaskDataFetcherTest {
 
-  @Test(timeout=5000, expected = NoSuchElementException.class)
+  @Test(timeout=5000)
   public void testEmpty() throws Exception {
     final List<String> empty = new ArrayList<>(0); // empty data
     final InputReader inputReader = 
generateInputReader(generateCompletableFuture(empty.iterator()));
 
     // Fetcher
     final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
-
-    // Should trigger the expected 'NoSuchElementException'
-    fetcher.fetchDataElement();
+    assertEquals(Finishmark.getInstance(), fetcher.fetchDataElement());
   }
 
   @Test(timeout=5000)
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index d346a9d..e0c4f95 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.runtime.executor.task;
 
 import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
@@ -32,6 +33,7 @@ import 
org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import org.apache.nemo.common.ir.vertex.InMemorySourceVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import org.apache.nemo.common.ir.vertex.IRVertex;
@@ -61,9 +63,11 @@ import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -130,20 +134,32 @@ public final class TaskExecutorTest {
   /**
    * Test source vertex data fetching.
    */
-  @Test(timeout=5000)
+  @Test()
   public void testSourceVertexDataFetching() throws Exception {
     final IRVertex sourceIRVertex = new InMemorySourceVertex<>(elements);
 
-    final Readable readable = new Readable() {
+    final Readable readable = new BoundedIteratorReadable() {
+      @Override
+      protected Iterator initializeIterator() {
+        return elements.iterator();
+      }
+
       @Override
-      public Iterable read() throws IOException {
-        return elements;
+      public long readWatermark() {
+        throw new UnsupportedOperationException();
       }
+
       @Override
       public List<String> getLocations() {
         throw new UnsupportedOperationException();
       }
+
+      @Override
+      public void close() throws IOException {
+
+      }
     };
+
     final Map<String, Readable> vertexIdToReadable = new HashMap<>();
     vertexIdToReadable.put(sourceIRVertex.getId(), readable);
 
@@ -172,6 +188,116 @@ public final class TaskExecutorTest {
   }
 
   /**
+   * This test emits data and watermark by emulating an unbounded source 
readable.
+   */
+  @Test()
+  public void testUnboundedSourceVertexDataFetching() throws Exception {
+    final IRVertex sourceIRVertex = new SourceVertex() {
+      @Override
+      public IRVertex getClone() {
+        return this;
+      }
+
+      @Override
+      public boolean isBounded() {
+        return false;
+      }
+
+      @Override
+      public List<Readable> getReadables(int desiredNumOfSplits) throws 
Exception {
+        return null;
+      }
+
+      @Override
+      public void clearInternalStates() {
+
+      }
+    };
+
+    final long watermark = 1234567L;
+    final AtomicLong emittedWatermark = new AtomicLong(0);
+
+    final Readable readable = new Readable() {
+      int pointer = 0;
+      final int middle = elements.size() / 2;
+      final int end = elements.size();
+      boolean watermarkEmitted = false;
+
+      @Override
+      public void prepare() {
+
+      }
+
+      // This emulates unbounded source that throws NoSuchElementException
+      // It reads current data until middle point and  throws 
NoSuchElementException at the middle point.
+      // It resumes the data reading after emitting a watermark, and finishes 
at the end of the data.
+      @Override
+      public Object readCurrent() throws NoSuchElementException {
+        if (pointer == middle && !watermarkEmitted) {
+          throw new NoSuchElementException();
+        }
+
+        return elements.get(pointer);
+      }
+
+      @Override
+      public void advance() throws IOException {
+        pointer += 1;
+      }
+
+      @Override
+      public long readWatermark() {
+        watermarkEmitted = true;
+        emittedWatermark.set(watermark);
+        return watermark;
+      }
+
+      @Override
+      public boolean isFinished() {
+        return pointer == end;
+      }
+
+      @Override
+      public List<String> getLocations() throws Exception {
+        return null;
+      }
+
+      @Override
+      public void close() throws IOException {
+      }
+    };
+
+    final Map<String, Readable> vertexIdToReadable = new HashMap<>();
+    vertexIdToReadable.put(sourceIRVertex.getId(), readable);
+
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
+      new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
+        .addVertex(sourceIRVertex)
+        .buildWithoutSourceSinkCheck();
+
+    final StageEdge taskOutEdge = mockStageEdgeFrom(sourceIRVertex);
+    final Task task =
+      new Task(
+        "testSourceVertexDataFetching",
+        generateTaskId(),
+        TASK_EXECUTION_PROPERTY_MAP,
+        new byte[0],
+        Collections.emptyList(),
+        Collections.singletonList(taskOutEdge),
+        vertexIdToReadable);
+
+    // Execute the task.
+    final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
+    taskExecutor.execute();
+
+    // Check whether the watermark is emitted
+    assertEquals(watermark, emittedWatermark.get());
+
+    // Check the output.
+    assertTrue(checkEqualElements(elements, 
runtimeEdgeToOutputData.get(taskOutEdge.getId())));
+  }
+
+  /**
    * Test parent task data fetching.
    */
   @Test(timeout=5000)

Reply via email to