[BEAM-386] Move UnboundedReadFromBoundedSource to core-construction-java Now that it's in the construction package, we can delete the Dataflow-specific clone that we couldn't use from runners-core.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/66229142 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/66229142 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/66229142 Branch: refs/heads/master Commit: 662291426af2c82c35085053fac0e9c2b845339f Parents: e53f959 Author: Dan Halperin <[email protected]> Authored: Mon Apr 10 07:08:32 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon Apr 10 10:16:04 2017 -0700 ---------------------------------------------------------------------- .../translation/ApexPipelineTranslator.java | 2 +- runners/core-construction-java/pom.xml | 15 + .../UnboundedReadFromBoundedSource.java | 542 ++++++++++++++++++ .../UnboundedReadFromBoundedSourceTest.java | 373 +++++++++++++ runners/core-java/pom.xml | 5 - .../core/UnboundedReadFromBoundedSource.java | 542 ------------------ .../UnboundedReadFromBoundedSourceTest.java | 373 ------------- .../beam/runners/dataflow/DataflowRunner.java | 3 +- .../DataflowUnboundedReadFromBoundedSource.java | 547 ------------------- ...aflowUnboundedReadFromBoundedSourceTest.java | 79 --- .../beam/runners/spark/TestSparkRunner.java | 2 +- 11 files changed, 934 insertions(+), 1549 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java index 42ff144..fdeefc7 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java @@ -24,8 +24,8 @@ import java.util.Map; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView; import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator; -import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.runners.core.construction.PrimitiveCreate; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.runners.TransformHierarchy; http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-construction-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml index 9619280..78b6819 100644 --- a/runners/core-construction-java/pom.xml +++ b/runners/core-construction-java/pom.xml @@ -60,6 +60,16 @@ </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> @@ -69,6 +79,11 @@ <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <!-- test dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java new file mode 100644 index 0000000..6b7bd71 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.NameUtils; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. + * + * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, + * and element timestamps are propagated. While any elements remain, the watermark is the beginning + * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced + * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + * + * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner + * {@link BoundedSource}. + * Sources that cannot be split are read entirely into memory, so this transform does not work well + * with large, unsplittable sources. + * + * <p>This transform is intended to be used by a runner during pipeline translation to convert + * a Read.Bounded into a Read.Unbounded. + */ +public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> { + + private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class); + + private final BoundedSource<T> source; + + /** + * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}. + */ + public UnboundedReadFromBoundedSource(BoundedSource<T> source) { + this.source = source; + } + + @Override + public PCollection<T> expand(PBegin input) { + return input.getPipeline().apply( + Read.from(new BoundedToUnboundedSourceAdapter<>(source))); + } + + @Override + protected Coder<T> getDefaultOutputCoder() { + return source.getDefaultOutputCoder(); + } + + @Override + public String getKindString() { + return String.format("Read(%s)", NameUtils.approximateSimpleName(source)); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + // We explicitly do not register base-class data, instead we use the delegate inner source. + builder + .add(DisplayData.item("source", source.getClass())) + .include("source", source); + } + + /** + * A {@code BoundedSource} to {@code UnboundedSource} adapter. + */ + @VisibleForTesting + public static class BoundedToUnboundedSourceAdapter<T> + extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> { + + private BoundedSource<T> boundedSource; + + public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) { + this.boundedSource = boundedSource; + } + + @Override + public void validate() { + boundedSource.validate(); + } + + @Override + public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + try { + long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; + if (desiredBundleSize <= 0) { + LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", + boundedSource); + return ImmutableList.of(this); + } + List<? extends BoundedSource<T>> splits = + boundedSource.splitIntoBundles(desiredBundleSize, options); + if (splits == null) { + LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource); + return ImmutableList.of(this); + } + return Lists.transform( + splits, + new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() { + @Override + public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) { + return new BoundedToUnboundedSourceAdapter<>(input); + }}); + } catch (Exception e) { + LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e); + return ImmutableList.of(this); + } + } + + @Override + public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint) + throws IOException { + if (checkpoint == null) { + return new Reader(null /* residualElements */, boundedSource, options); + } else { + return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); + } + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return boundedSource.getDefaultOutputCoder(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public Coder<Checkpoint<T>> getCheckpointMarkCoder() { + return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); + } + + @VisibleForTesting + static class Checkpoint<T> implements UnboundedSource.CheckpointMark { + private final @Nullable List<TimestampedValue<T>> residualElements; + private final @Nullable BoundedSource<T> residualSource; + + public Checkpoint( + @Nullable List<TimestampedValue<T>> residualElements, + @Nullable BoundedSource<T> residualSource) { + this.residualElements = residualElements; + this.residualSource = residualSource; + } + + @Override + public void finalizeCheckpoint() {} + + @VisibleForTesting + @Nullable List<TimestampedValue<T>> getResidualElements() { + return residualElements; + } + + @VisibleForTesting + @Nullable BoundedSource<T> getResidualSource() { + return residualSource; + } + } + + @VisibleForTesting + static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> { + + @JsonCreator + public static CheckpointCoder<?> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List<Coder<?>> components) { + checkArgument(components.size() == 1, + "Expecting 1 components, got %s", components.size()); + return new CheckpointCoder<>(components.get(0)); + } + + // The coder for a list of residual elements and their timestamps + private final Coder<List<TimestampedValue<T>>> elemsCoder; + // The coder from the BoundedReader for coding each element + private final Coder<T> elemCoder; + // The nullable and serializable coder for the BoundedSource. + @SuppressWarnings("rawtypes") + private final Coder<BoundedSource> sourceCoder; + + CheckpointCoder(Coder<T> elemCoder) { + this.elemsCoder = NullableCoder.of( + ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder))); + this.elemCoder = elemCoder; + this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class)); + } + + @Override + public void encode(Checkpoint<T> value, OutputStream outStream, Context context) + throws CoderException, IOException { + elemsCoder.encode(value.residualElements, outStream, context.nested()); + sourceCoder.encode(value.residualSource, outStream, context); + } + + @SuppressWarnings("unchecked") + @Override + public Checkpoint<T> decode(InputStream inStream, Context context) + throws CoderException, IOException { + return new Checkpoint<>( + elemsCoder.decode(inStream, context.nested()), + sourceCoder.decode(inStream, context)); + } + + @Override + public List<Coder<?>> getCoderArguments() { + return Arrays.<Coder<?>>asList(elemCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "CheckpointCoder uses Java Serialization, which may be non-deterministic."); + } + } + + /** + * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into + * {@link ResidualElements} and {@link ResidualSource}. + * + * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains + * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will + * be split into {@link ResidualElements} and {@link ResidualSource}. + */ + @VisibleForTesting + class Reader extends UnboundedReader<T> { + private ResidualElements residualElements; + private @Nullable ResidualSource residualSource; + private final PipelineOptions options; + private boolean done; + + Reader( + @Nullable List<TimestampedValue<T>> residualElementsList, + @Nullable BoundedSource<T> residualSource, + PipelineOptions options) { + init(residualElementsList, residualSource, options); + this.options = checkNotNull(options, "options"); + this.done = false; + } + + private void init( + @Nullable List<TimestampedValue<T>> residualElementsList, + @Nullable BoundedSource<T> residualSource, + PipelineOptions options) { + this.residualElements = residualElementsList == null + ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList()) + : new ResidualElements(residualElementsList); + this.residualSource = + residualSource == null ? null : new ResidualSource(residualSource, options); + } + + @Override + public boolean start() throws IOException { + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (residualElements.advance()) { + return true; + } else if (residualSource != null && residualSource.advance()) { + return true; + } else { + done = true; + return false; + } + } + + @Override + public void close() throws IOException { + if (residualSource != null) { + residualSource.close(); + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (residualElements.hasCurrent()) { + return residualElements.getCurrent(); + } else if (residualSource != null) { + return residualSource.getCurrent(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (residualElements.hasCurrent()) { + return residualElements.getCurrentTimestamp(); + } else if (residualSource != null) { + return residualSource.getCurrentTimestamp(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public Instant getWatermark() { + return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + /** + * {@inheritDoc} + * + * <p>If only part of the {@link ResidualElements} is consumed, the new + * checkpoint will contain the remaining elements in {@link ResidualElements} and + * the {@link ResidualSource}. + * + * <p>If all {@link ResidualElements} and part of the + * {@link ResidualSource} are consumed, the new checkpoint is done by splitting + * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}. + * {@link ResidualSource} is the source split from the current source, + * and {@link ResidualElements} contains rest elements from the current source after + * the splitting. For unsplittable source, it will put all remaining elements into + * the {@link ResidualElements}. + */ + @Override + public Checkpoint<T> getCheckpointMark() { + Checkpoint<T> newCheckpoint; + if (!residualElements.done()) { + // Part of residualElements are consumed. + // Checkpoints the remaining elements and residualSource. + newCheckpoint = new Checkpoint<>( + residualElements.getRestElements(), + residualSource == null ? null : residualSource.getSource()); + } else if (residualSource != null) { + newCheckpoint = residualSource.getCheckpointMark(); + } else { + newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */); + } + // Re-initialize since the residualElements and the residualSource might be + // consumed or split by checkpointing. + init(newCheckpoint.residualElements, newCheckpoint.residualSource, options); + return newCheckpoint; + } + + @Override + public BoundedToUnboundedSourceAdapter<T> getCurrentSource() { + return BoundedToUnboundedSourceAdapter.this; + } + } + + private class ResidualElements { + private final List<TimestampedValue<T>> elementsList; + private @Nullable Iterator<TimestampedValue<T>> elementsIterator; + private @Nullable TimestampedValue<T> currentT; + private boolean hasCurrent; + private boolean done; + + ResidualElements(List<TimestampedValue<T>> residualElementsList) { + this.elementsList = checkNotNull(residualElementsList, "residualElementsList"); + this.elementsIterator = null; + this.currentT = null; + this.hasCurrent = false; + this.done = false; + } + + public boolean advance() { + if (elementsIterator == null) { + elementsIterator = elementsList.iterator(); + } + if (elementsIterator.hasNext()) { + currentT = elementsIterator.next(); + hasCurrent = true; + return true; + } else { + done = true; + hasCurrent = false; + return false; + } + } + + boolean hasCurrent() { + return hasCurrent; + } + + boolean done() { + return done; + } + + TimestampedValue<T> getCurrentTimestampedValue() { + if (!hasCurrent) { + throw new NoSuchElementException(); + } + return currentT; + } + + T getCurrent() { + return getCurrentTimestampedValue().getValue(); + } + + Instant getCurrentTimestamp() { + return getCurrentTimestampedValue().getTimestamp(); + } + + List<TimestampedValue<T>> getRestElements() { + if (elementsIterator == null) { + return elementsList; + } else { + List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); + while (elementsIterator.hasNext()) { + newResidualElements.add(elementsIterator.next()); + } + return newResidualElements; + } + } + } + + private class ResidualSource { + private BoundedSource<T> residualSource; + private PipelineOptions options; + private @Nullable BoundedReader<T> reader; + private boolean closed; + private boolean readerDone; + + public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) { + this.residualSource = checkNotNull(residualSource, "residualSource"); + this.options = checkNotNull(options, "options"); + this.reader = null; + this.closed = false; + this.readerDone = false; + } + + private boolean advance() throws IOException { + checkArgument(!closed, "advance() call on closed %s", getClass().getName()); + if (readerDone) { + return false; + } + if (reader == null) { + reader = residualSource.createReader(options); + readerDone = !reader.start(); + } else { + readerDone = !reader.advance(); + } + return !readerDone; + } + + T getCurrent() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrent(); + } + + Instant getCurrentTimestamp() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrentTimestamp(); + } + + void close() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + closed = true; + } + + BoundedSource<T> getSource() { + return residualSource; + } + + Checkpoint<T> getCheckpointMark() { + if (reader == null) { + // Reader hasn't started, checkpoint the residualSource. + return new Checkpoint<>(null /* residualElements */, residualSource); + } else { + // Part of residualSource are consumed. + // Splits the residualSource and tracks the new residualElements in current source. + BoundedSource<T> residualSplit = null; + Double fractionConsumed = reader.getFractionConsumed(); + if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) { + double fractionRest = 1 - fractionConsumed; + int splitAttempts = 8; + for (int i = 0; i < 8 && residualSplit == null; ++i) { + double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts; + residualSplit = reader.splitAtFraction(fractionToSplit); + } + } + List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); + try { + while (advance()) { + newResidualElements.add( + TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); + } + } catch (IOException e) { + throw new RuntimeException("Failed to read elements from the bounded reader.", e); + } + return new Checkpoint<>(newResidualElements, residualSplit); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java new file mode 100644 index 0000000..c905cf5 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSourceTest.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.FileBasedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Distinct; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link UnboundedReadFromBoundedSource}. + */ +@RunWith(JUnit4.class) +public class UnboundedReadFromBoundedSourceTest { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Rule + public TestPipeline p = TestPipeline.create(); + + @Test + public void testCheckpointCoderNulls() throws Exception { + CheckpointCoder<String> coder = new CheckpointCoder<>(StringUtf8Coder.of()); + Checkpoint<String> emptyCheckpoint = new Checkpoint<>(null, null); + Checkpoint<String> decodedEmptyCheckpoint = CoderUtils.decodeFromByteArray( + coder, + CoderUtils.encodeToByteArray(coder, emptyCheckpoint)); + assertNull(decodedEmptyCheckpoint.getResidualElements()); + assertNull(decodedEmptyCheckpoint.getResidualSource()); + } + + @Test + public void testCheckpointCoderIsSerializableWithWellKnownCoderType() throws Exception { + CoderProperties.coderSerializable(new CheckpointCoder<>(GlobalWindow.Coder.INSTANCE)); + } + + @Test + @Category(NeedsRunner.class) + public void testBoundedToUnboundedSourceAdapter() throws Exception { + long numElements = 100; + BoundedSource<Long> boundedSource = CountingSource.upTo(numElements); + UnboundedSource<Long, Checkpoint<Long>> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + PCollection<Long> output = + p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements)); + + // Count == numElements + PAssert + .thatSingleton(output.apply("Count", Count.<Long>globally())) + .isEqualTo(numElements); + // Unique count == numElements + PAssert + .thatSingleton(output.apply(Distinct.<Long>create()) + .apply("UniqueCount", Count.<Long>globally())) + .isEqualTo(numElements); + // Min == 0 + PAssert + .thatSingleton(output.apply("Min", Min.<Long>globally())) + .isEqualTo(0L); + // Max == numElements-1 + PAssert + .thatSingleton(output.apply("Max", Max.<Long>globally())) + .isEqualTo(numElements - 1); + p.run(); + } + + @Test + public void testCountingSourceToUnboundedCheckpoint() throws Exception { + long numElements = 100; + BoundedSource<Long> countingSource = CountingSource.upTo(numElements); + List<Long> expected = Lists.newArrayList(); + for (long i = 0; i < numElements; ++i) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected); + } + + @Test + public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception { + String baseName = "test-input"; + File compressedFile = tmpFolder.newFile(baseName + ".gz"); + byte[] input = generateInput(100); + writeFile(compressedFile, input); + + BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1); + List<Byte> expected = Lists.newArrayList(); + for (byte i : input) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpoint(source, expected); + } + + private <T> void testBoundedToUnboundedSourceAdapterCheckpoint( + BoundedSource<T> boundedSource, + List<T> expectedElements) throws Exception { + BoundedToUnboundedSourceAdapter<T> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + PipelineOptions options = PipelineOptionsFactory.create(); + BoundedToUnboundedSourceAdapter<T>.Reader reader = + unboundedSource.createReader(options, null); + + List<T> actual = Lists.newArrayList(); + for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) { + actual.add(reader.getCurrent()); + // checkpoint every 9 elements + if (actual.size() % 9 == 0) { + Checkpoint<T> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + } + } + Checkpoint<T> checkpointDone = reader.getCheckpointMark(); + assertTrue(checkpointDone.getResidualElements() == null + || checkpointDone.getResidualElements().isEmpty()); + + assertEquals(expectedElements.size(), actual.size()); + assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); + } + + @Test + public void testCountingSourceToUnboundedCheckpointRestart() throws Exception { + long numElements = 100; + BoundedSource<Long> countingSource = CountingSource.upTo(numElements); + List<Long> expected = Lists.newArrayList(); + for (long i = 0; i < numElements; ++i) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected); + } + + @Test + public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception { + String baseName = "test-input"; + File compressedFile = tmpFolder.newFile(baseName + ".gz"); + byte[] input = generateInput(1000); + writeFile(compressedFile, input); + + BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1); + List<Byte> expected = Lists.newArrayList(); + for (byte i : input) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpointRestart(source, expected); + } + + private <T> void testBoundedToUnboundedSourceAdapterCheckpointRestart( + BoundedSource<T> boundedSource, + List<T> expectedElements) throws Exception { + BoundedToUnboundedSourceAdapter<T> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + PipelineOptions options = PipelineOptionsFactory.create(); + BoundedToUnboundedSourceAdapter<T>.Reader reader = + unboundedSource.createReader(options, null); + + List<T> actual = Lists.newArrayList(); + for (boolean hasNext = reader.start(); hasNext;) { + actual.add(reader.getCurrent()); + // checkpoint every 9 elements + if (actual.size() % 9 == 0) { + Checkpoint<T> checkpoint = reader.getCheckpointMark(); + Coder<Checkpoint<T>> checkpointCoder = unboundedSource.getCheckpointMarkCoder(); + Checkpoint<T> decodedCheckpoint = CoderUtils.decodeFromByteArray( + checkpointCoder, + CoderUtils.encodeToByteArray(checkpointCoder, checkpoint)); + reader.close(); + checkpoint.finalizeCheckpoint(); + + BoundedToUnboundedSourceAdapter<T>.Reader restarted = + unboundedSource.createReader(options, decodedCheckpoint); + reader = restarted; + hasNext = reader.start(); + } else { + hasNext = reader.advance(); + } + } + Checkpoint<T> checkpointDone = reader.getCheckpointMark(); + assertTrue(checkpointDone.getResidualElements() == null + || checkpointDone.getResidualElements().isEmpty()); + + assertEquals(expectedElements.size(), actual.size()); + assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); + } + + @Test + public void testReadBeforeStart() throws Exception { + thrown.expect(NoSuchElementException.class); + + BoundedSource<Long> countingSource = CountingSource.upTo(100); + BoundedToUnboundedSourceAdapter<Long> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(countingSource); + PipelineOptions options = PipelineOptionsFactory.create(); + + unboundedSource.createReader(options, null).getCurrent(); + } + + @Test + public void testReadFromCheckpointBeforeStart() throws Exception { + thrown.expect(NoSuchElementException.class); + + BoundedSource<Long> countingSource = CountingSource.upTo(100); + BoundedToUnboundedSourceAdapter<Long> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(countingSource); + PipelineOptions options = PipelineOptionsFactory.create(); + + List<TimestampedValue<Long>> elements = + ImmutableList.of(TimestampedValue.of(1L, new Instant(1L))); + Checkpoint<Long> checkpoint = new Checkpoint<>(elements, countingSource); + unboundedSource.createReader(options, checkpoint).getCurrent(); + } + + /** + * Generate byte array of given size. + */ + private static byte[] generateInput(int size) { + // Arbitrary but fixed seed + Random random = new Random(285930); + byte[] buff = new byte[size]; + random.nextBytes(buff); + return buff; + } + + /** + * Writes a single output file. + */ + private static void writeFile(File file, byte[] input) throws IOException { + try (OutputStream os = new FileOutputStream(file)) { + os.write(input); + } + } + + /** + * Unsplittable source for use in tests. + */ + private static class UnsplittableSource extends FileBasedSource<Byte> { + public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) { + super(fileOrPatternSpec, minBundleSize); + } + + public UnsplittableSource( + String fileName, long minBundleSize, long startOffset, long endOffset) { + super(fileName, minBundleSize, startOffset, endOffset); + } + + @Override + protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) { + return new UnsplittableSource(fileName, getMinBundleSize(), start, end); + } + + @Override + protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) { + return new UnsplittableReader(this); + } + + @Override + public Coder<Byte> getDefaultOutputCoder() { + return SerializableCoder.of(Byte.class); + } + + private static class UnsplittableReader extends FileBasedReader<Byte> { + ByteBuffer buff = ByteBuffer.allocate(1); + Byte current; + long offset; + ReadableByteChannel channel; + + public UnsplittableReader(UnsplittableSource source) { + super(source); + offset = source.getStartOffset() - 1; + } + + @Override + public Byte getCurrent() throws NoSuchElementException { + return current; + } + + @Override + public boolean allowsDynamicSplitting() { + return false; + } + + @Override + protected boolean isAtSplitPoint() { + return true; + } + + @Override + protected void startReading(ReadableByteChannel channel) throws IOException { + this.channel = channel; + } + + @Override + protected boolean readNextRecord() throws IOException { + buff.clear(); + if (channel.read(buff) != 1) { + return false; + } + current = buff.get(0); + offset += 1; + return true; + } + + @Override + protected long getCurrentOffset() { + return offset; + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index 631e8c0..affd1a9 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -105,11 +105,6 @@ <artifactId>joda-time</artifactId> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <!-- test dependencies --> <!-- Utilities such as WindowMatchers --> http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java deleted file mode 100644 index 0c173a0..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ /dev/null @@ -1,542 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.NameUtils; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. - * - * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, - * and element timestamps are propagated. While any elements remain, the watermark is the beginning - * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced - * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. - * - * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner - * {@link BoundedSource}. - * Sources that cannot be split are read entirely into memory, so this transform does not work well - * with large, unsplittable sources. - * - * <p>This transform is intended to be used by a runner during pipeline translation to convert - * a Read.Bounded into a Read.Unbounded. - */ -public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> { - - private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class); - - private final BoundedSource<T> source; - - /** - * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}. - */ - public UnboundedReadFromBoundedSource(BoundedSource<T> source) { - this.source = source; - } - - @Override - public PCollection<T> expand(PBegin input) { - return input.getPipeline().apply( - Read.from(new BoundedToUnboundedSourceAdapter<>(source))); - } - - @Override - protected Coder<T> getDefaultOutputCoder() { - return source.getDefaultOutputCoder(); - } - - @Override - public String getKindString() { - return String.format("Read(%s)", NameUtils.approximateSimpleName(source)); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - // We explicitly do not register base-class data, instead we use the delegate inner source. - builder - .add(DisplayData.item("source", source.getClass())) - .include("source", source); - } - - /** - * A {@code BoundedSource} to {@code UnboundedSource} adapter. - */ - @VisibleForTesting - public static class BoundedToUnboundedSourceAdapter<T> - extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> { - - private BoundedSource<T> boundedSource; - - public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) { - this.boundedSource = boundedSource; - } - - @Override - public void validate() { - boundedSource.validate(); - } - - @Override - public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - try { - long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; - if (desiredBundleSize <= 0) { - LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", - boundedSource); - return ImmutableList.of(this); - } - List<? extends BoundedSource<T>> splits = - boundedSource.splitIntoBundles(desiredBundleSize, options); - if (splits == null) { - LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource); - return ImmutableList.of(this); - } - return Lists.transform( - splits, - new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() { - @Override - public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) { - return new BoundedToUnboundedSourceAdapter<>(input); - }}); - } catch (Exception e) { - LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e); - return ImmutableList.of(this); - } - } - - @Override - public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint) - throws IOException { - if (checkpoint == null) { - return new Reader(null /* residualElements */, boundedSource, options); - } else { - return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); - } - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return boundedSource.getDefaultOutputCoder(); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - @Override - public Coder<Checkpoint<T>> getCheckpointMarkCoder() { - return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); - } - - @VisibleForTesting - static class Checkpoint<T> implements UnboundedSource.CheckpointMark { - private final @Nullable List<TimestampedValue<T>> residualElements; - private final @Nullable BoundedSource<T> residualSource; - - public Checkpoint( - @Nullable List<TimestampedValue<T>> residualElements, - @Nullable BoundedSource<T> residualSource) { - this.residualElements = residualElements; - this.residualSource = residualSource; - } - - @Override - public void finalizeCheckpoint() {} - - @VisibleForTesting - @Nullable List<TimestampedValue<T>> getResidualElements() { - return residualElements; - } - - @VisibleForTesting - @Nullable BoundedSource<T> getResidualSource() { - return residualSource; - } - } - - @VisibleForTesting - static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> { - - @JsonCreator - public static CheckpointCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 1, - "Expecting 1 components, got %s", components.size()); - return new CheckpointCoder<>(components.get(0)); - } - - // The coder for a list of residual elements and their timestamps - private final Coder<List<TimestampedValue<T>>> elemsCoder; - // The coder from the BoundedReader for coding each element - private final Coder<T> elemCoder; - // The nullable and serializable coder for the BoundedSource. - @SuppressWarnings("rawtypes") - private final Coder<BoundedSource> sourceCoder; - - CheckpointCoder(Coder<T> elemCoder) { - this.elemsCoder = NullableCoder.of( - ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder))); - this.elemCoder = elemCoder; - this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class)); - } - - @Override - public void encode(Checkpoint<T> value, OutputStream outStream, Context context) - throws CoderException, IOException { - elemsCoder.encode(value.residualElements, outStream, context.nested()); - sourceCoder.encode(value.residualSource, outStream, context); - } - - @SuppressWarnings("unchecked") - @Override - public Checkpoint<T> decode(InputStream inStream, Context context) - throws CoderException, IOException { - return new Checkpoint<>( - elemsCoder.decode(inStream, context.nested()), - sourceCoder.decode(inStream, context)); - } - - @Override - public List<Coder<?>> getCoderArguments() { - return Arrays.<Coder<?>>asList(elemCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "CheckpointCoder uses Java Serialization, which may be non-deterministic."); - } - } - - /** - * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into - * {@link ResidualElements} and {@link ResidualSource}. - * - * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains - * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will - * be split into {@link ResidualElements} and {@link ResidualSource}. - */ - @VisibleForTesting - class Reader extends UnboundedReader<T> { - private ResidualElements residualElements; - private @Nullable ResidualSource residualSource; - private final PipelineOptions options; - private boolean done; - - Reader( - @Nullable List<TimestampedValue<T>> residualElementsList, - @Nullable BoundedSource<T> residualSource, - PipelineOptions options) { - init(residualElementsList, residualSource, options); - this.options = checkNotNull(options, "options"); - this.done = false; - } - - private void init( - @Nullable List<TimestampedValue<T>> residualElementsList, - @Nullable BoundedSource<T> residualSource, - PipelineOptions options) { - this.residualElements = residualElementsList == null - ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList()) - : new ResidualElements(residualElementsList); - this.residualSource = - residualSource == null ? null : new ResidualSource(residualSource, options); - } - - @Override - public boolean start() throws IOException { - return advance(); - } - - @Override - public boolean advance() throws IOException { - if (residualElements.advance()) { - return true; - } else if (residualSource != null && residualSource.advance()) { - return true; - } else { - done = true; - return false; - } - } - - @Override - public void close() throws IOException { - if (residualSource != null) { - residualSource.close(); - } - } - - @Override - public T getCurrent() throws NoSuchElementException { - if (residualElements.hasCurrent()) { - return residualElements.getCurrent(); - } else if (residualSource != null) { - return residualSource.getCurrent(); - } else { - throw new NoSuchElementException(); - } - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - if (residualElements.hasCurrent()) { - return residualElements.getCurrentTimestamp(); - } else if (residualSource != null) { - return residualSource.getCurrentTimestamp(); - } else { - throw new NoSuchElementException(); - } - } - - @Override - public Instant getWatermark() { - return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - /** - * {@inheritDoc} - * - * <p>If only part of the {@link ResidualElements} is consumed, the new - * checkpoint will contain the remaining elements in {@link ResidualElements} and - * the {@link ResidualSource}. - * - * <p>If all {@link ResidualElements} and part of the - * {@link ResidualSource} are consumed, the new checkpoint is done by splitting - * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}. - * {@link ResidualSource} is the source split from the current source, - * and {@link ResidualElements} contains rest elements from the current source after - * the splitting. For unsplittable source, it will put all remaining elements into - * the {@link ResidualElements}. - */ - @Override - public Checkpoint<T> getCheckpointMark() { - Checkpoint<T> newCheckpoint; - if (!residualElements.done()) { - // Part of residualElements are consumed. - // Checkpoints the remaining elements and residualSource. - newCheckpoint = new Checkpoint<>( - residualElements.getRestElements(), - residualSource == null ? null : residualSource.getSource()); - } else if (residualSource != null) { - newCheckpoint = residualSource.getCheckpointMark(); - } else { - newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */); - } - // Re-initialize since the residualElements and the residualSource might be - // consumed or split by checkpointing. - init(newCheckpoint.residualElements, newCheckpoint.residualSource, options); - return newCheckpoint; - } - - @Override - public BoundedToUnboundedSourceAdapter<T> getCurrentSource() { - return BoundedToUnboundedSourceAdapter.this; - } - } - - private class ResidualElements { - private final List<TimestampedValue<T>> elementsList; - private @Nullable Iterator<TimestampedValue<T>> elementsIterator; - private @Nullable TimestampedValue<T> currentT; - private boolean hasCurrent; - private boolean done; - - ResidualElements(List<TimestampedValue<T>> residualElementsList) { - this.elementsList = checkNotNull(residualElementsList, "residualElementsList"); - this.elementsIterator = null; - this.currentT = null; - this.hasCurrent = false; - this.done = false; - } - - public boolean advance() { - if (elementsIterator == null) { - elementsIterator = elementsList.iterator(); - } - if (elementsIterator.hasNext()) { - currentT = elementsIterator.next(); - hasCurrent = true; - return true; - } else { - done = true; - hasCurrent = false; - return false; - } - } - - boolean hasCurrent() { - return hasCurrent; - } - - boolean done() { - return done; - } - - TimestampedValue<T> getCurrentTimestampedValue() { - if (!hasCurrent) { - throw new NoSuchElementException(); - } - return currentT; - } - - T getCurrent() { - return getCurrentTimestampedValue().getValue(); - } - - Instant getCurrentTimestamp() { - return getCurrentTimestampedValue().getTimestamp(); - } - - List<TimestampedValue<T>> getRestElements() { - if (elementsIterator == null) { - return elementsList; - } else { - List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); - while (elementsIterator.hasNext()) { - newResidualElements.add(elementsIterator.next()); - } - return newResidualElements; - } - } - } - - private class ResidualSource { - private BoundedSource<T> residualSource; - private PipelineOptions options; - private @Nullable BoundedReader<T> reader; - private boolean closed; - private boolean readerDone; - - public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) { - this.residualSource = checkNotNull(residualSource, "residualSource"); - this.options = checkNotNull(options, "options"); - this.reader = null; - this.closed = false; - this.readerDone = false; - } - - private boolean advance() throws IOException { - checkArgument(!closed, "advance() call on closed %s", getClass().getName()); - if (readerDone) { - return false; - } - if (reader == null) { - reader = residualSource.createReader(options); - readerDone = !reader.start(); - } else { - readerDone = !reader.advance(); - } - return !readerDone; - } - - T getCurrent() throws NoSuchElementException { - if (reader == null) { - throw new NoSuchElementException(); - } - return reader.getCurrent(); - } - - Instant getCurrentTimestamp() throws NoSuchElementException { - if (reader == null) { - throw new NoSuchElementException(); - } - return reader.getCurrentTimestamp(); - } - - void close() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - closed = true; - } - - BoundedSource<T> getSource() { - return residualSource; - } - - Checkpoint<T> getCheckpointMark() { - if (reader == null) { - // Reader hasn't started, checkpoint the residualSource. - return new Checkpoint<>(null /* residualElements */, residualSource); - } else { - // Part of residualSource are consumed. - // Splits the residualSource and tracks the new residualElements in current source. - BoundedSource<T> residualSplit = null; - Double fractionConsumed = reader.getFractionConsumed(); - if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) { - double fractionRest = 1 - fractionConsumed; - int splitAttempts = 8; - for (int i = 0; i < 8 && residualSplit == null; ++i) { - double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts; - residualSplit = reader.splitAtFraction(fractionToSplit); - } - } - List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); - try { - while (advance()) { - newResidualElements.add( - TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); - } - } catch (IOException e) { - throw new RuntimeException("Failed to read elements from the bounded reader.", e); - } - return new Checkpoint<>(newResidualElements, residualSplit); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java deleted file mode 100644 index e1968cb..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java +++ /dev/null @@ -1,373 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Random; -import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; -import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; -import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.CountingSource; -import org.apache.beam.sdk.io.FileBasedSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Distinct; -import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Unit tests for {@link UnboundedReadFromBoundedSource}. - */ -@RunWith(JUnit4.class) -public class UnboundedReadFromBoundedSourceTest { - - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - @Rule - public TestPipeline p = TestPipeline.create(); - - @Test - public void testCheckpointCoderNulls() throws Exception { - CheckpointCoder<String> coder = new CheckpointCoder<>(StringUtf8Coder.of()); - Checkpoint<String> emptyCheckpoint = new Checkpoint<>(null, null); - Checkpoint<String> decodedEmptyCheckpoint = CoderUtils.decodeFromByteArray( - coder, - CoderUtils.encodeToByteArray(coder, emptyCheckpoint)); - assertNull(decodedEmptyCheckpoint.getResidualElements()); - assertNull(decodedEmptyCheckpoint.getResidualSource()); - } - - @Test - public void testCheckpointCoderIsSerializableWithWellKnownCoderType() throws Exception { - CoderProperties.coderSerializable(new CheckpointCoder<>(GlobalWindow.Coder.INSTANCE)); - } - - @Test - @Category(NeedsRunner.class) - public void testBoundedToUnboundedSourceAdapter() throws Exception { - long numElements = 100; - BoundedSource<Long> boundedSource = CountingSource.upTo(numElements); - UnboundedSource<Long, Checkpoint<Long>> unboundedSource = - new BoundedToUnboundedSourceAdapter<>(boundedSource); - - PCollection<Long> output = - p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements)); - - // Count == numElements - PAssert - .thatSingleton(output.apply("Count", Count.<Long>globally())) - .isEqualTo(numElements); - // Unique count == numElements - PAssert - .thatSingleton(output.apply(Distinct.<Long>create()) - .apply("UniqueCount", Count.<Long>globally())) - .isEqualTo(numElements); - // Min == 0 - PAssert - .thatSingleton(output.apply("Min", Min.<Long>globally())) - .isEqualTo(0L); - // Max == numElements-1 - PAssert - .thatSingleton(output.apply("Max", Max.<Long>globally())) - .isEqualTo(numElements - 1); - p.run(); - } - - @Test - public void testCountingSourceToUnboundedCheckpoint() throws Exception { - long numElements = 100; - BoundedSource<Long> countingSource = CountingSource.upTo(numElements); - List<Long> expected = Lists.newArrayList(); - for (long i = 0; i < numElements; ++i) { - expected.add(i); - } - testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected); - } - - @Test - public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception { - String baseName = "test-input"; - File compressedFile = tmpFolder.newFile(baseName + ".gz"); - byte[] input = generateInput(100); - writeFile(compressedFile, input); - - BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1); - List<Byte> expected = Lists.newArrayList(); - for (byte i : input) { - expected.add(i); - } - testBoundedToUnboundedSourceAdapterCheckpoint(source, expected); - } - - private <T> void testBoundedToUnboundedSourceAdapterCheckpoint( - BoundedSource<T> boundedSource, - List<T> expectedElements) throws Exception { - BoundedToUnboundedSourceAdapter<T> unboundedSource = - new BoundedToUnboundedSourceAdapter<>(boundedSource); - - PipelineOptions options = PipelineOptionsFactory.create(); - BoundedToUnboundedSourceAdapter<T>.Reader reader = - unboundedSource.createReader(options, null); - - List<T> actual = Lists.newArrayList(); - for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) { - actual.add(reader.getCurrent()); - // checkpoint every 9 elements - if (actual.size() % 9 == 0) { - Checkpoint<T> checkpoint = reader.getCheckpointMark(); - checkpoint.finalizeCheckpoint(); - } - } - Checkpoint<T> checkpointDone = reader.getCheckpointMark(); - assertTrue(checkpointDone.getResidualElements() == null - || checkpointDone.getResidualElements().isEmpty()); - - assertEquals(expectedElements.size(), actual.size()); - assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); - } - - @Test - public void testCountingSourceToUnboundedCheckpointRestart() throws Exception { - long numElements = 100; - BoundedSource<Long> countingSource = CountingSource.upTo(numElements); - List<Long> expected = Lists.newArrayList(); - for (long i = 0; i < numElements; ++i) { - expected.add(i); - } - testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected); - } - - @Test - public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception { - String baseName = "test-input"; - File compressedFile = tmpFolder.newFile(baseName + ".gz"); - byte[] input = generateInput(1000); - writeFile(compressedFile, input); - - BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1); - List<Byte> expected = Lists.newArrayList(); - for (byte i : input) { - expected.add(i); - } - testBoundedToUnboundedSourceAdapterCheckpointRestart(source, expected); - } - - private <T> void testBoundedToUnboundedSourceAdapterCheckpointRestart( - BoundedSource<T> boundedSource, - List<T> expectedElements) throws Exception { - BoundedToUnboundedSourceAdapter<T> unboundedSource = - new BoundedToUnboundedSourceAdapter<>(boundedSource); - - PipelineOptions options = PipelineOptionsFactory.create(); - BoundedToUnboundedSourceAdapter<T>.Reader reader = - unboundedSource.createReader(options, null); - - List<T> actual = Lists.newArrayList(); - for (boolean hasNext = reader.start(); hasNext;) { - actual.add(reader.getCurrent()); - // checkpoint every 9 elements - if (actual.size() % 9 == 0) { - Checkpoint<T> checkpoint = reader.getCheckpointMark(); - Coder<Checkpoint<T>> checkpointCoder = unboundedSource.getCheckpointMarkCoder(); - Checkpoint<T> decodedCheckpoint = CoderUtils.decodeFromByteArray( - checkpointCoder, - CoderUtils.encodeToByteArray(checkpointCoder, checkpoint)); - reader.close(); - checkpoint.finalizeCheckpoint(); - - BoundedToUnboundedSourceAdapter<T>.Reader restarted = - unboundedSource.createReader(options, decodedCheckpoint); - reader = restarted; - hasNext = reader.start(); - } else { - hasNext = reader.advance(); - } - } - Checkpoint<T> checkpointDone = reader.getCheckpointMark(); - assertTrue(checkpointDone.getResidualElements() == null - || checkpointDone.getResidualElements().isEmpty()); - - assertEquals(expectedElements.size(), actual.size()); - assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); - } - - @Test - public void testReadBeforeStart() throws Exception { - thrown.expect(NoSuchElementException.class); - - BoundedSource<Long> countingSource = CountingSource.upTo(100); - BoundedToUnboundedSourceAdapter<Long> unboundedSource = - new BoundedToUnboundedSourceAdapter<>(countingSource); - PipelineOptions options = PipelineOptionsFactory.create(); - - unboundedSource.createReader(options, null).getCurrent(); - } - - @Test - public void testReadFromCheckpointBeforeStart() throws Exception { - thrown.expect(NoSuchElementException.class); - - BoundedSource<Long> countingSource = CountingSource.upTo(100); - BoundedToUnboundedSourceAdapter<Long> unboundedSource = - new BoundedToUnboundedSourceAdapter<>(countingSource); - PipelineOptions options = PipelineOptionsFactory.create(); - - List<TimestampedValue<Long>> elements = - ImmutableList.of(TimestampedValue.of(1L, new Instant(1L))); - Checkpoint<Long> checkpoint = new Checkpoint<>(elements, countingSource); - unboundedSource.createReader(options, checkpoint).getCurrent(); - } - - /** - * Generate byte array of given size. - */ - private static byte[] generateInput(int size) { - // Arbitrary but fixed seed - Random random = new Random(285930); - byte[] buff = new byte[size]; - random.nextBytes(buff); - return buff; - } - - /** - * Writes a single output file. - */ - private static void writeFile(File file, byte[] input) throws IOException { - try (OutputStream os = new FileOutputStream(file)) { - os.write(input); - } - } - - /** - * Unsplittable source for use in tests. - */ - private static class UnsplittableSource extends FileBasedSource<Byte> { - public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) { - super(fileOrPatternSpec, minBundleSize); - } - - public UnsplittableSource( - String fileName, long minBundleSize, long startOffset, long endOffset) { - super(fileName, minBundleSize, startOffset, endOffset); - } - - @Override - protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) { - return new UnsplittableSource(fileName, getMinBundleSize(), start, end); - } - - @Override - protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) { - return new UnsplittableReader(this); - } - - @Override - public Coder<Byte> getDefaultOutputCoder() { - return SerializableCoder.of(Byte.class); - } - - private static class UnsplittableReader extends FileBasedReader<Byte> { - ByteBuffer buff = ByteBuffer.allocate(1); - Byte current; - long offset; - ReadableByteChannel channel; - - public UnsplittableReader(UnsplittableSource source) { - super(source); - offset = source.getStartOffset() - 1; - } - - @Override - public Byte getCurrent() throws NoSuchElementException { - return current; - } - - @Override - public boolean allowsDynamicSplitting() { - return false; - } - - @Override - protected boolean isAtSplitPoint() { - return true; - } - - @Override - protected void startReading(ReadableByteChannel channel) throws IOException { - this.channel = channel; - } - - @Override - protected boolean readNextRecord() throws IOException { - buff.clear(); - if (channel.read(buff) != 1) { - return false; - } - current = buff.get(0); - offset += 1; - return true; - } - - @Override - protected long getCurrentOffset() { - return offset; - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a2b715f..7212d4f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -64,6 +64,7 @@ import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory; import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; @@ -1193,7 +1194,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { public final PCollection<T> expand(PBegin input) { source.validate(); - return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source)) + return Pipeline.applyTransform(input, new UnboundedReadFromBoundedSource<>(source)) .setIsBoundedInternal(IsBounded.BOUNDED); } }
