Do not call advance when all elements are consumed This prevents UnboundedReadFromBoundedSource from attempting to read elements from a reader where elements are known to not exist. This defends against bounded readers which expect to be discarded the first time they return false from advance().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/00f55f8e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/00f55f8e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/00f55f8e Branch: refs/heads/master Commit: 00f55f8e3bebf70de1d5497c40e0950b8bd6cbdd Parents: 636185e Author: Pei He <[email protected]> Authored: Mon Mar 20 08:51:04 2017 +0800 Committer: Thomas Groh <[email protected]> Committed: Mon Mar 20 09:24:46 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/UnboundedReadFromBoundedSource.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/00f55f8e/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index 3073076..0c173a0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -458,22 +458,28 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle private PipelineOptions options; private @Nullable BoundedReader<T> reader; private boolean closed; + private boolean readerDone; public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) { this.residualSource = checkNotNull(residualSource, "residualSource"); this.options = checkNotNull(options, "options"); this.reader = null; this.closed = false; + this.readerDone = false; } private boolean advance() throws IOException { checkArgument(!closed, "advance() call on closed %s", getClass().getName()); + if (readerDone) { + return false; + } if (reader == null) { reader = residualSource.createReader(options); - return reader.start(); + readerDone = !reader.start(); } else { - return reader.advance(); + readerDone = !reader.advance(); } + return !readerDone; } T getCurrent() throws NoSuchElementException {
