johnyangk closed pull request #149: [NEMO-266] Throws NoSuchElementException in Readeable.readCurrent URL: https://github.com/apache/incubator-nemo/pull/149
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java index 4d157555f..586fc4757 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java +++ b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java @@ -47,11 +47,6 @@ public final O readCurrent() { return iterator.next(); } - @Override - public final void advance() { - // do nothing - } - @Override public final boolean isFinished() { return !iterator.hasNext(); diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java b/common/src/main/java/org/apache/nemo/common/ir/Readable.java index e0f7b7c79..785d66246 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java +++ b/common/src/main/java/org/apache/nemo/common/ir/Readable.java @@ -44,11 +44,6 @@ */ O readCurrent() throws NoSuchElementException; - /** - * Advance current data point. - */ - void advance() throws IOException; - /** * Read watermark. * @return watermark diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java index 6e909cfb3..ecdd34bcc 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java +++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java @@ -100,12 +100,6 @@ public T readCurrent() { "CachedSourceVertex should not be used"); } - @Override - public void advance() throws IOException { - throw new UnsupportedOperationException( - "CachedSourceVertex should not be used"); - } - @Override public long readWatermark() { throw new UnsupportedOperationException( diff --git a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java index 20bc14508..ee1ea48ed 100644 --- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java +++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java @@ -246,10 +246,6 @@ public T readCurrent() { return null; } - @Override - public void advance() { - } - @Override public long readWatermark() { return 0; diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java index 880a3a04d..bc672b7b9 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.function.Function; import org.apache.nemo.common.ir.vertex.SourceVertex; import org.apache.beam.sdk.io.BoundedSource; @@ -107,7 +106,6 @@ public ObjectNode getPropertiesAsJsonNode() { private final BoundedSource<T> boundedSource; private boolean finished = false; private BoundedSource.BoundedReader<T> reader; - private Function<T, WindowedValue<T>> windowedValueConverter; /** * Constructor of the BoundedSourceReadable. @@ -122,16 +120,6 @@ public void prepare() { try { reader = boundedSource.createReader(null); finished = !reader.start(); - - if (!finished) { - T elem = reader.getCurrent(); - - if (elem instanceof WindowedValue) { - windowedValueConverter = val -> (WindowedValue) val; - } else { - windowedValueConverter = WindowedValue::valueInGlobalWindow; - } - } } catch (final Exception e) { throw new RuntimeException(e); } @@ -144,12 +132,15 @@ public void prepare() { } final T elem = reader.getCurrent(); - return windowedValueConverter.apply(elem); - } - @Override - public void advance() throws IOException { - finished = !reader.advance(); + try { + finished = !reader.advance(); + } catch (final IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + + return WindowedValue.valueInGlobalWindow(elem); } @Override diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java index 97adc5bb6..482dd9d1a 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; -import java.util.function.Function; /** * SourceVertex implementation for UnboundedSource. @@ -100,8 +99,8 @@ public ObjectNode getPropertiesAsJsonNode() { implements Readable<Object> { private final UnboundedSource<O, M> unboundedSource; private UnboundedSource.UnboundedReader<O> reader; - private Function<O, WindowedValue<O>> windowedValueConverter; - private boolean finished = false; + private boolean isStarted = false; + private boolean isCurrentAvailable = false; UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) { this.unboundedSource = unboundedSource; @@ -111,45 +110,30 @@ public ObjectNode getPropertiesAsJsonNode() { public void prepare() { try { reader = unboundedSource.createReader(null, null); - reader.start(); } catch (final Exception e) { throw new RuntimeException(e); } - - // get first element - final O firstElement = retrieveFirstElement(); - if (firstElement instanceof WindowedValue) { - windowedValueConverter = val -> (WindowedValue) val; - } else { - windowedValueConverter = WindowedValue::valueInGlobalWindow; - } - } - - private O retrieveFirstElement() { - while (true) { - try { - return reader.getCurrent(); - } catch (final NoSuchElementException e) { - // the first element is not currently available... retry - try { - Thread.sleep(100); - } catch (InterruptedException e1) { - e1.printStackTrace(); - throw new RuntimeException(e); - } - } - } } @Override public Object readCurrent() { - final O elem = reader.getCurrent(); - return windowedValueConverter.apply(elem); - } + try { + if (!isStarted) { + isStarted = true; + isCurrentAvailable = reader.start(); + } else { + isCurrentAvailable = reader.advance(); + } + } catch (final Exception e) { + throw new RuntimeException(e); + } - @Override - public void advance() throws IOException { - reader.advance(); + if (isCurrentAvailable) { + final O elem = reader.getCurrent(); + return WindowedValue.timestampedValueInGlobalWindow(elem, reader.getCurrentTimestamp()); + } else { + throw new NoSuchElementException(); + } } @Override @@ -159,7 +143,7 @@ public long readWatermark() { @Override public boolean isFinished() { - return finished; + return false; } @Override @@ -169,7 +153,6 @@ public boolean isFinished() { @Override public void close() throws IOException { - finished = true; reader.close(); } } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java index 9ea8fa8fb..fa4bd8a11 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java @@ -105,7 +105,6 @@ private Object retrieveElement() throws NoSuchElementException, IOException { // Data final Object element = readable.readCurrent(); - readable.advance(); return element; } } diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java index e05bbfb81..6ae716ae2 100644 --- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java +++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java @@ -694,12 +694,9 @@ public Object readCurrent() throws NoSuchElementException { if (pointer == middle && numEmittedWatermarks < expectedNumWatermarks) { throw new NoSuchElementException(); } - return elements.get(pointer); - } - - @Override - public void advance() throws IOException { + final Object element = elements.get(pointer); pointer += 1; + return element; } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services