Repository: beam Updated Branches: refs/heads/release-2.0.0 64b493a89 -> c79aa4c43
[BEAM-2095] Made SourceRDD hasNext idempotent Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/17da5e93 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/17da5e93 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/17da5e93 Branch: refs/heads/release-2.0.0 Commit: 17da5e933b0cb4850a744a42f4a158fa00b230ab Parents: 64b493a Author: Stas Levin <[email protected]> Authored: Mon May 1 07:30:49 2017 +0300 Committer: Luke Cwik <[email protected]> Committed: Fri May 12 10:00:46 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/spark/io/SourceRDD.java | 173 ++++++++++++------- .../spark/io/ReaderToIteratorAdapterTest.java | 145 ++++++++++++++++ 2 files changed, 260 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/17da5e93/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index 7b7d216..01cc176 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -19,12 +19,15 @@ package org.apache.beam.runners.spark.io; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; @@ -47,6 +50,7 @@ import org.apache.spark.rdd.RDD; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; +import scala.collection.JavaConversions; /** * Classes implementing Beam {@link Source} {@link RDD}s. @@ -118,80 +122,133 @@ public class SourceRDD { } } + private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) { + try { + return ((BoundedSource<T>) partition.source).createReader( + runtimeContext.getPipelineOptions()); + } catch (IOException e) { + throw new RuntimeException("Failed to create reader from a BoundedSource.", e); + } + } + @Override public scala.collection.Iterator<WindowedValue<T>> compute(final Partition split, - TaskContext context) { + final TaskContext context) { final MetricsContainer metricsContainer = metricsAccum.localValue().getContainer(stepName); - final Iterator<WindowedValue<T>> iter = new Iterator<WindowedValue<T>>() { - @SuppressWarnings("unchecked") - SourcePartition<T> partition = (SourcePartition<T>) split; - BoundedSource.BoundedReader<T> reader = createReader(partition); - - private boolean finished = false; - private boolean started = false; - private boolean closed = false; - - @Override - public boolean hasNext() { - // Add metrics container to the scope of org.apache.beam.sdk.io.Source.Reader methods - // since they may report metrics. - try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { - try { - if (!started) { - started = true; - finished = !reader.start(); - } else { - finished = !reader.advance(); - } - if (finished) { - // safely close the reader if there are no more elements left to read. - closeIfNotClosed(); - } - return !finished; - } catch (IOException e) { - closeIfNotClosed(); - throw new RuntimeException("Failed to read from reader.", e); + @SuppressWarnings("unchecked") + final BoundedSource.BoundedReader<T> reader = createReader((SourcePartition<T>) split); + + final Iterator<WindowedValue<T>> readerIterator = + new ReaderToIteratorAdapter<>(metricsContainer, reader); + + return new InterruptibleIterator<>(context, JavaConversions.asScalaIterator(readerIterator)); + } + + /** + * Exposes an <code>Iterator</code><{@link WindowedValue}> interface on top of a + * {@link Source.Reader}. + * <p> + * <code>hasNext</code> is idempotent and returns <code>true</code> iff further items are + * available for reading using the underlying reader. + * Consequently, when the reader is closed, or when the reader has no further elements + * available (i.e, {@link Source.Reader#advance()} returned <code>false</code>), + * <code>hasNext</code> returns <code>false</code>. + * </p> + * <p> + * Since this is a read-only iterator, an attempt to call <code>remove</code> will throw an + * <code>UnsupportedOperationException</code>. + * </p> + */ + @VisibleForTesting + static class ReaderToIteratorAdapter<T> implements Iterator<WindowedValue<T>> { + + private static final boolean FAILED_TO_OBTAIN_NEXT = false; + private static final boolean SUCCESSFULLY_OBTAINED_NEXT = true; + + private final MetricsContainer metricsContainer; + private final Source.Reader<T> reader; + + private boolean started = false; + private boolean closed = false; + private WindowedValue<T> next = null; + + ReaderToIteratorAdapter(final MetricsContainer metricsContainer, + final Source.Reader<T> reader) { + this.metricsContainer = metricsContainer; + this.reader = reader; + } + + private boolean tryProduceNext() { + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { + if (closed) { + return FAILED_TO_OBTAIN_NEXT; + } else { + checkState(next == null, "unexpected non-null value for next"); + if (seekNext()) { + next = WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), + reader.getCurrentTimestamp()); + return SUCCESSFULLY_OBTAINED_NEXT; + } else { + close(); + return FAILED_TO_OBTAIN_NEXT; } - } catch (IOException e) { - throw new RuntimeException(e); } + } catch (final Exception e) { + throw new RuntimeException("Failed to read data.", e); + } + } + + private void close() { + closed = true; + try { + reader.close(); + } catch (final IOException e) { + throw new RuntimeException(e); } + } - @Override - public WindowedValue<T> next() { - return WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), - reader.getCurrentTimestamp()); + private boolean seekNext() throws IOException { + if (!started) { + started = true; + return reader.start(); + } else { + return !closed && reader.advance(); } + } - @Override - public void remove() { - throw new UnsupportedOperationException("Remove from partition iterator is not allowed."); + private WindowedValue<T> consumeCurrent() { + if (next == null) { + throw new NoSuchElementException(); + } else { + final WindowedValue<T> current = next; + next = null; + return current; } + } - private void closeIfNotClosed() { - if (!closed) { - closed = true; - try { - reader.close(); - } catch (IOException e) { - throw new RuntimeException("Failed to close Reader.", e); - } - } + private WindowedValue<T> consumeNext() { + if (next == null) { + tryProduceNext(); } - }; + return consumeCurrent(); + } - return new InterruptibleIterator<>(context, - scala.collection.JavaConversions.asScalaIterator(iter)); - } + @Override + public boolean hasNext() { + return next != null || tryProduceNext(); + } - private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) { - try { - return ((BoundedSource<T>) partition.source).createReader( - runtimeContext.getPipelineOptions()); - } catch (IOException e) { - throw new RuntimeException("Failed to create reader from a BoundedSource.", e); + @Override + public WindowedValue<T> next() { + return consumeNext(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/17da5e93/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java new file mode 100644 index 0000000..5728fa0 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/ReaderToIteratorAdapterTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.io; + +import static com.google.common.base.Preconditions.checkState; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.NoSuchElementException; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.sdk.io.Source; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Test for {@link SourceRDD.Bounded.ReaderToIteratorAdapter}. + */ +public class ReaderToIteratorAdapterTest { + + @Rule + public ExpectedException exception = ExpectedException.none(); + + private static class TestReader extends Source.Reader<Integer> { + + static final int LIMIT = 4; + static final int START = 1; + + private Integer current = START - 1; + private boolean closed = false; + private boolean drained = false; + + boolean isClosed() { + return closed; + } + + @Override + public boolean start() throws IOException { + return advance(); + } + + @Override + public boolean advance() throws IOException { + checkState(!drained && !closed); + drained = ++current >= LIMIT; + return !drained; + } + + @Override + public Integer getCurrent() throws NoSuchElementException { + checkState(!drained && !closed); + return current; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + checkState(!drained && !closed); + return Instant.now(); + } + + @Override + public void close() throws IOException { + checkState(!closed); + closed = true; + } + + @Override + public Source<Integer> getCurrentSource() { + return null; + } + } + + private final TestReader testReader = new TestReader(); + + private final SourceRDD.Bounded.ReaderToIteratorAdapter<Integer> readerIterator = + new SourceRDD.Bounded.ReaderToIteratorAdapter<>(new MetricsContainerImpl(""), testReader); + + private void assertReaderRange(final int start, final int end) { + for (int i = start; i < end; i++) { + assertThat(readerIterator.next().getValue(), is(i)); + } + } + + @Test + public void testReaderIsClosedAfterDrainage() throws Exception { + assertReaderRange(TestReader.START, TestReader.LIMIT); + + assertThat(readerIterator.hasNext(), is(false)); + + // reader is closed only after hasNext realises there are no more elements + assertThat(testReader.isClosed(), is(true)); + } + + @Test + public void testNextWhenDrainedThrows() throws Exception { + assertReaderRange(TestReader.START, TestReader.LIMIT); + + exception.expect(NoSuchElementException.class); + readerIterator.next(); + } + + @Test + public void testHasNextIdempotencyCombo() throws Exception { + assertThat(readerIterator.hasNext(), is(true)); + assertThat(readerIterator.hasNext(), is(true)); + + assertThat(readerIterator.next().getValue(), is(1)); + + assertThat(readerIterator.hasNext(), is(true)); + assertThat(readerIterator.hasNext(), is(true)); + assertThat(readerIterator.hasNext(), is(true)); + + assertThat(readerIterator.next().getValue(), is(2)); + assertThat(readerIterator.next().getValue(), is(3)); + + // drained + + assertThat(readerIterator.hasNext(), is(false)); + assertThat(readerIterator.hasNext(), is(false)); + + // no next to give + + exception.expect(NoSuchElementException.class); + readerIterator.next(); + } + +}
