johnyangk closed pull request #149: [NEMO-266] Throws NoSuchElementException in 
Readeable.readCurrent
URL: https://github.com/apache/incubator-nemo/pull/149
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java 
b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
index 4d157555f..586fc4757 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
@@ -47,11 +47,6 @@ public final O readCurrent() {
     return iterator.next();
   }
 
-  @Override
-  public final void advance() {
-    // do nothing
-  }
-
   @Override
   public final boolean isFinished() {
     return !iterator.hasNext();
diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java 
b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
index e0f7b7c79..785d66246 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
@@ -44,11 +44,6 @@
    */
   O readCurrent() throws NoSuchElementException;
 
-  /**
-   * Advance current data point.
-   */
-  void advance() throws IOException;
-
   /**
    * Read watermark.
    * @return watermark
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
index 6e909cfb3..ecdd34bcc 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
@@ -100,12 +100,6 @@ public T readCurrent() {
         "CachedSourceVertex should not be used");
     }
 
-    @Override
-    public void advance() throws IOException {
-      throw new UnsupportedOperationException(
-        "CachedSourceVertex should not be used");
-    }
-
     @Override
     public long readWatermark() {
       throw new UnsupportedOperationException(
diff --git 
a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java 
b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 20bc14508..ee1ea48ed 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -246,10 +246,6 @@ public T readCurrent() {
       return null;
     }
 
-    @Override
-    public void advance() {
-    }
-
     @Override
     public long readWatermark() {
       return 0;
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 880a3a04d..bc672b7b9 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -27,7 +27,6 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.function.Function;
 
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -107,7 +106,6 @@ public ObjectNode getPropertiesAsJsonNode() {
     private final BoundedSource<T> boundedSource;
     private boolean finished = false;
     private BoundedSource.BoundedReader<T> reader;
-    private Function<T, WindowedValue<T>> windowedValueConverter;
 
     /**
      * Constructor of the BoundedSourceReadable.
@@ -122,16 +120,6 @@ public void prepare() {
       try {
         reader = boundedSource.createReader(null);
         finished = !reader.start();
-
-        if (!finished) {
-          T elem = reader.getCurrent();
-
-          if (elem instanceof WindowedValue) {
-            windowedValueConverter = val -> (WindowedValue) val;
-          } else {
-            windowedValueConverter = WindowedValue::valueInGlobalWindow;
-          }
-        }
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
@@ -144,12 +132,15 @@ public void prepare() {
       }
 
       final T elem = reader.getCurrent();
-      return windowedValueConverter.apply(elem);
-    }
 
-    @Override
-    public void advance() throws IOException {
-      finished = !reader.advance();
+      try {
+        finished = !reader.advance();
+      } catch (final IOException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+
+      return WindowedValue.valueInGlobalWindow(elem);
     }
 
     @Override
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index 97adc5bb6..482dd9d1a 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -31,7 +31,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.function.Function;
 
 /**
  * SourceVertex implementation for UnboundedSource.
@@ -100,8 +99,8 @@ public ObjectNode getPropertiesAsJsonNode() {
       implements Readable<Object> {
     private final UnboundedSource<O, M> unboundedSource;
     private UnboundedSource.UnboundedReader<O> reader;
-    private Function<O, WindowedValue<O>> windowedValueConverter;
-    private boolean finished = false;
+    private boolean isStarted = false;
+    private boolean isCurrentAvailable = false;
 
     UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
       this.unboundedSource = unboundedSource;
@@ -111,45 +110,30 @@ public ObjectNode getPropertiesAsJsonNode() {
     public void prepare() {
       try {
         reader = unboundedSource.createReader(null, null);
-        reader.start();
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
-
-      // get first element
-      final O firstElement = retrieveFirstElement();
-      if (firstElement instanceof WindowedValue) {
-        windowedValueConverter = val -> (WindowedValue) val;
-      } else {
-        windowedValueConverter = WindowedValue::valueInGlobalWindow;
-      }
-    }
-
-    private O retrieveFirstElement() {
-      while (true) {
-        try {
-          return reader.getCurrent();
-        } catch (final NoSuchElementException e) {
-          // the first element is not currently available... retry
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException e1) {
-            e1.printStackTrace();
-            throw new RuntimeException(e);
-          }
-        }
-      }
     }
 
     @Override
     public Object readCurrent() {
-      final O elem = reader.getCurrent();
-      return windowedValueConverter.apply(elem);
-    }
+      try {
+        if (!isStarted) {
+          isStarted = true;
+          isCurrentAvailable = reader.start();
+        } else {
+          isCurrentAvailable = reader.advance();
+        }
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
 
-    @Override
-    public void advance() throws IOException {
-      reader.advance();
+      if (isCurrentAvailable) {
+        final O elem = reader.getCurrent();
+        return WindowedValue.timestampedValueInGlobalWindow(elem, 
reader.getCurrentTimestamp());
+      } else {
+        throw new NoSuchElementException();
+      }
     }
 
     @Override
@@ -159,7 +143,7 @@ public long readWatermark() {
 
     @Override
     public boolean isFinished() {
-      return finished;
+      return false;
     }
 
     @Override
@@ -169,7 +153,6 @@ public boolean isFinished() {
 
     @Override
     public void close() throws IOException {
-      finished = true;
       reader.close();
     }
   }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 9ea8fa8fb..fa4bd8a11 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -105,7 +105,6 @@ private Object retrieveElement() throws 
NoSuchElementException, IOException {
 
     // Data
     final Object element = readable.readCurrent();
-    readable.advance();
     return element;
   }
 }
diff --git 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index e05bbfb81..6ae716ae2 100644
--- 
a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ 
b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -694,12 +694,9 @@ public Object readCurrent() throws NoSuchElementException {
       if (pointer == middle && numEmittedWatermarks < expectedNumWatermarks) {
         throw new NoSuchElementException();
       }
-      return elements.get(pointer);
-    }
-
-    @Override
-    public void advance() throws IOException {
+      final Object element = elements.get(pointer);
       pointer += 1;
+      return element;
     }
 
     @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to