Repository: incubator-beam Updated Branches: refs/heads/master 82ae661c5 -> 7745b921f
Add UnboundedReadFromBoundedSource Adds one way to convert a BoundedSource to an UnboundedSource. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11d9ec5e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11d9ec5e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11d9ec5e Branch: refs/heads/master Commit: 11d9ec5ebff4820b36db4b6ea4df7a0f79115ddd Parents: 82ae661 Author: Pei He <[email protected]> Authored: Mon May 9 15:59:58 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu Jun 23 19:06:30 2016 -0700 ---------------------------------------------------------------------- runners/core-java/pom.xml | 25 + .../core/UnboundedReadFromBoundedSource.java | 543 +++++++++++++++++++ .../UnboundedReadFromBoundedSourceTest.java | 365 +++++++++++++ .../sdk/io/BoundedReadFromUnboundedSource.java | 3 +- .../org/apache/beam/sdk/io/UnboundedSource.java | 2 +- 5 files changed, 936 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11d9ec5e/runners/core-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index 1587a1a..98d80bb 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -195,6 +195,31 @@ <artifactId>beam-sdks-java-core</artifactId> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <!-- build dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11d9ec5e/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java new file mode 100644 index 0000000..2b3d1c7 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -0,0 +1,543 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.TimestampedValue; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +/** + * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. + * + * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, + * and element timestamps are propagated. While any elements remain, the watermark is the beginning + * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced + * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + * + * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner + * {@link BoundedSource}. + * Sources that cannot be split are read entirely into memory, so this transform does not work well + * with large, unsplittable sources. + * + * <p>This transform is intended to be used by a runner during pipeline translation to convert + * a Read.Bounded into a Read.Unbounded. + */ +public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PCollection<T>> { + + private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadFromBoundedSource.class); + + private final BoundedSource<T> source; + + /** + * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}. + */ + public UnboundedReadFromBoundedSource(BoundedSource<T> source) { + this.source = source; + } + + @Override + public PCollection<T> apply(PInput input) { + return input.getPipeline().apply( + Read.from(new BoundedToUnboundedSourceAdapter<>(source))); + } + + @Override + protected Coder<T> getDefaultOutputCoder() { + return source.getDefaultOutputCoder(); + } + + @Override + public String getKindString() { + return "Read(" + approximateSimpleName(source.getClass()) + ")"; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + // We explicitly do not register base-class data, instead we use the delegate inner source. + builder + .add(DisplayData.item("source", source.getClass())) + .include(source); + } + + /** + * A {@code BoundedSource} to {@code UnboundedSource} adapter. + */ + @VisibleForTesting + public static class BoundedToUnboundedSourceAdapter<T> + extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> { + + private BoundedSource<T> boundedSource; + + public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) { + this.boundedSource = boundedSource; + } + + @Override + public void validate() { + boundedSource.validate(); + } + + @Override + public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + try { + long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; + if (desiredBundleSize <= 0) { + LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", + boundedSource); + return ImmutableList.of(this); + } + List<? extends BoundedSource<T>> splits + = boundedSource.splitIntoBundles(desiredBundleSize, options); + if (splits == null) { + LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource); + return ImmutableList.of(this); + } + return Lists.transform( + splits, + new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() { + @Override + public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) { + return new BoundedToUnboundedSourceAdapter<>(input); + }}); + } catch (Exception e) { + LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e); + return ImmutableList.of(this); + } + } + + @Override + public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint) + throws IOException { + if (checkpoint == null) { + return new Reader( + Collections.<TimestampedValue<T>>emptyList() /* residualElements */, + boundedSource, + options); + } else { + return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); + } + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return boundedSource.getDefaultOutputCoder(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public Coder<Checkpoint<T>> getCheckpointMarkCoder() { + return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); + } + + @VisibleForTesting + static class Checkpoint<T> implements UnboundedSource.CheckpointMark { + private final List<TimestampedValue<T>> residualElements; + private final @Nullable BoundedSource<T> residualSource; + + public Checkpoint( + List<TimestampedValue<T>> residualElements, + @Nullable BoundedSource<T> residualSource) { + this.residualElements = residualElements; + this.residualSource = residualSource; + } + + @Override + public void finalizeCheckpoint() {} + + @VisibleForTesting + List<TimestampedValue<T>> getResidualElements() { + return residualElements; + } + + @VisibleForTesting + @Nullable BoundedSource<T> getResidualSource() { + return residualSource; + } + } + + @VisibleForTesting + static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> { + + @JsonCreator + public static CheckpointCoder<?> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List<Coder<?>> components) { + checkArgument(components.size() == 1, + "Expecting 1 components, got %s", components.size()); + return new CheckpointCoder<>(components.get(0)); + } + + // The coder for a list of residual elements and their timestamps + private final Coder<List<TimestampedValue<T>>> elemsCoder; + // The coder from the BoundedReader for coding each element + private final Coder<T> elemCoder; + // The nullable and serializable coder for the BoundedSource. + @SuppressWarnings("rawtypes") + private final Coder<BoundedSource> sourceCoder; + + CheckpointCoder(Coder<T> elemCoder) { + this.elemsCoder = NullableCoder.of( + ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder))); + this.elemCoder = elemCoder; + this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class)); + } + + @Override + public void encode(Checkpoint<T> value, OutputStream outStream, Context context) + throws CoderException, IOException { + Context nested = context.nested(); + elemsCoder.encode(value.residualElements, outStream, nested); + sourceCoder.encode(value.residualSource, outStream, nested); + } + + @SuppressWarnings("unchecked") + @Override + public Checkpoint<T> decode(InputStream inStream, Context context) + throws CoderException, IOException { + Context nested = context.nested(); + return new Checkpoint<>( + elemsCoder.decode(inStream, nested), + sourceCoder.decode(inStream, nested)); + } + + @Override + public List<Coder<?>> getCoderArguments() { + return Arrays.<Coder<?>>asList(elemCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "CheckpointCoder uses Java Serialization, which may be non-deterministic."); + } + } + + /** + * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into + * {@link ResidualElements} and {@link ResidualSource}. + * + * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains + * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will + * be split into {@link ResidualElements} and {@link ResidualSource}. + */ + @VisibleForTesting + class Reader extends UnboundedReader<T> { + private ResidualElements residualElements; + private @Nullable ResidualSource residualSource; + private final PipelineOptions options; + private boolean done; + + Reader( + List<TimestampedValue<T>> residualElementsList, + @Nullable BoundedSource<T> residualSource, + PipelineOptions options) { + init(residualElementsList, residualSource, options); + this.options = checkNotNull(options, "options"); + this.done = false; + } + + private void init( + List<TimestampedValue<T>> residualElementsList, + @Nullable BoundedSource<T> residualSource, + PipelineOptions options) { + this.residualElements = new ResidualElements(residualElementsList); + this.residualSource = + residualSource == null ? null : new ResidualSource(residualSource, options); + } + + @Override + public boolean start() throws IOException { + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (residualElements.advance()) { + return true; + } else if (residualSource != null && residualSource.advance()) { + return true; + } else { + done = true; + return false; + } + } + + @Override + public void close() throws IOException { + if (residualSource != null) { + residualSource.close(); + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (residualElements.hasCurrent()) { + return residualElements.getCurrent(); + } else if (residualSource != null) { + return residualSource.getCurrent(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (residualElements.hasCurrent()) { + return residualElements.getCurrentTimestamp(); + } else if (residualSource != null) { + return residualSource.getCurrentTimestamp(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public Instant getWatermark() { + return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + /** + * {@inheritDoc} + * + * <p>If only part of the {@link ResidualElements} is consumed, the new + * checkpoint will contain the remaining elements in {@link ResidualElements} and + * the {@link ResidualSource}. + * + * <p>If all {@link ResidualElements} and part of the + * {@link ResidualSource} are consumed, the new checkpoint is done by splitting + * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}. + * {@link ResidualSource} is the source split from the current source, + * and {@link ResidualElements} contains rest elements from the current source after + * the splitting. For unsplittable source, it will put all remaining elements into + * the {@link ResidualElements}. + */ + @Override + public Checkpoint<T> getCheckpointMark() { + Checkpoint<T> newCheckpoint; + if (!residualElements.done()) { + // Part of residualElements are consumed. + // Checkpoints the remaining elements and residualSource. + newCheckpoint = new Checkpoint<>( + residualElements.getRestElements(), + residualSource == null ? null : residualSource.getSource()); + } else if (residualSource != null) { + newCheckpoint = residualSource.getCheckpointMark(); + } else { + newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */); + } + // Re-initialize since the residualElements and the residualSource might be + // consumed or split by checkpointing. + init(newCheckpoint.residualElements, newCheckpoint.residualSource, options); + return newCheckpoint; + } + + @Override + public BoundedToUnboundedSourceAdapter<T> getCurrentSource() { + return BoundedToUnboundedSourceAdapter.this; + } + } + + private class ResidualElements { + private final List<TimestampedValue<T>> elementsList; + private @Nullable Iterator<TimestampedValue<T>> elementsIterator; + private @Nullable TimestampedValue<T> currentT; + private boolean hasCurrent; + private boolean done; + + ResidualElements(List<TimestampedValue<T>> residualElementsList) { + this.elementsList = checkNotNull(residualElementsList, "residualElementsList"); + this.elementsIterator = null; + this.currentT = null; + this.hasCurrent = false; + this.done = false; + } + + public boolean advance() { + if (elementsIterator == null) { + elementsIterator = elementsList.iterator(); + } + if (elementsIterator.hasNext()) { + currentT = elementsIterator.next(); + hasCurrent = true; + return true; + } else { + done = true; + hasCurrent = false; + return false; + } + } + + boolean hasCurrent() { + return hasCurrent; + } + + boolean done() { + return done; + } + + TimestampedValue<T> getCurrentTimestampedValue() { + if (!hasCurrent) { + throw new NoSuchElementException(); + } + return currentT; + } + + T getCurrent() { + return getCurrentTimestampedValue().getValue(); + } + + Instant getCurrentTimestamp() { + return getCurrentTimestampedValue().getTimestamp(); + } + + List<TimestampedValue<T>> getRestElements() { + if (elementsIterator == null) { + return elementsList; + } else { + List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); + while (elementsIterator.hasNext()) { + newResidualElements.add(elementsIterator.next()); + } + return newResidualElements; + } + } + } + + private class ResidualSource { + private BoundedSource<T> residualSource; + private PipelineOptions options; + private @Nullable BoundedReader<T> reader; + private boolean closed; + + public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) { + this.residualSource = checkNotNull(residualSource, "residualSource"); + this.options = checkNotNull(options, "options"); + this.reader = null; + this.closed = false; + } + + private boolean advance() throws IOException { + if (reader == null && !closed) { + reader = residualSource.createReader(options); + return reader.start(); + } else { + return reader.advance(); + } + } + + T getCurrent() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrent(); + } + + Instant getCurrentTimestamp() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrentTimestamp(); + } + + void close() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + closed = true; + } + + BoundedSource<T> getSource() { + return residualSource; + } + + Checkpoint<T> getCheckpointMark() { + if (reader == null) { + // Reader hasn't started, checkpoint the residualSource. + return new Checkpoint<>(null /* residualElements */, residualSource); + } else { + // Part of residualSource are consumed. + // Splits the residualSource and tracks the new residualElements in current source. + BoundedSource<T> residualSplit = null; + Double fractionConsumed = reader.getFractionConsumed(); + if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) { + double fractionRest = 1 - fractionConsumed; + int splitAttempts = 8; + for (int i = 0; i < 8 && residualSplit == null; ++i) { + double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts; + residualSplit = reader.splitAtFraction(fractionToSplit); + } + } + List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); + try { + while (advance()) { + newResidualElements.add( + TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); + } + } catch (IOException e) { + throw new RuntimeException("Failed to read elements from the bounded reader.", e); + } + return new Checkpoint<>(newResidualElements, residualSplit); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11d9ec5e/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java new file mode 100644 index 0000000..afd0927 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.FileBasedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.RemoveDuplicates; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; + +/** + * Unit tests for {@link UnboundedReadFromBoundedSource}. + */ +@RunWith(JUnit4.class) +public class UnboundedReadFromBoundedSourceTest { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testCheckpointCoderNulls() throws Exception { + CheckpointCoder<String> coder = new CheckpointCoder<>(StringUtf8Coder.of()); + Checkpoint<String> emptyCheckpoint = new Checkpoint<>(null, null); + Checkpoint<String> decodedEmptyCheckpoint = CoderUtils.decodeFromByteArray( + coder, + CoderUtils.encodeToByteArray(coder, emptyCheckpoint)); + assertNull(decodedEmptyCheckpoint.getResidualElements()); + assertNull(decodedEmptyCheckpoint.getResidualSource()); + } + + @Test + @Category(NeedsRunner.class) + public void testBoundedToUnboundedSourceAdapter() throws Exception { + long numElements = 100; + BoundedSource<Long> boundedSource = CountingSource.upTo(numElements); + UnboundedSource<Long, Checkpoint<Long>> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + Pipeline p = TestPipeline.create(); + + PCollection<Long> output = + p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements)); + + // Count == numElements + PAssert + .thatSingleton(output.apply("Count", Count.<Long>globally())) + .isEqualTo(numElements); + // Unique count == numElements + PAssert + .thatSingleton(output.apply(RemoveDuplicates.<Long>create()) + .apply("UniqueCount", Count.<Long>globally())) + .isEqualTo(numElements); + // Min == 0 + PAssert + .thatSingleton(output.apply("Min", Min.<Long>globally())) + .isEqualTo(0L); + // Max == numElements-1 + PAssert + .thatSingleton(output.apply("Max", Max.<Long>globally())) + .isEqualTo(numElements - 1); + p.run(); + } + + @Test + public void testCountingSourceToUnboundedCheckpoint() throws Exception { + long numElements = 100; + BoundedSource<Long> countingSource = CountingSource.upTo(numElements); + List<Long> expected = Lists.newArrayList(); + for (long i = 0; i < numElements; ++i) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpoint(countingSource, expected); + } + + @Test + public void testUnsplittableSourceToUnboundedCheckpoint() throws Exception { + String baseName = "test-input"; + File compressedFile = tmpFolder.newFile(baseName + ".gz"); + byte[] input = generateInput(100); + writeFile(compressedFile, input); + + BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1); + List<Byte> expected = Lists.newArrayList(); + for (byte i : input) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpoint(source, expected); + } + + private <T> void testBoundedToUnboundedSourceAdapterCheckpoint( + BoundedSource<T> boundedSource, + List<T> expectedElements) throws Exception { + BoundedToUnboundedSourceAdapter<T> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + PipelineOptions options = PipelineOptionsFactory.create(); + BoundedToUnboundedSourceAdapter<T>.Reader reader = + unboundedSource.createReader(options, null); + + List<T> actual = Lists.newArrayList(); + for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) { + actual.add(reader.getCurrent()); + // checkpoint every 9 elements + if (actual.size() % 9 == 0) { + Checkpoint<T> checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + } + } + assertEquals(expectedElements.size(), actual.size()); + assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); + } + + @Test + public void testCountingSourceToUnboundedCheckpointRestart() throws Exception { + long numElements = 100; + BoundedSource<Long> countingSource = CountingSource.upTo(numElements); + List<Long> expected = Lists.newArrayList(); + for (long i = 0; i < numElements; ++i) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpointRestart(countingSource, expected); + } + + @Test + public void testUnsplittableSourceToUnboundedCheckpointRestart() throws Exception { + String baseName = "test-input"; + File compressedFile = tmpFolder.newFile(baseName + ".gz"); + byte[] input = generateInput(1000); + writeFile(compressedFile, input); + + BoundedSource<Byte> source = new UnsplittableSource(compressedFile.getPath(), 1); + List<Byte> expected = Lists.newArrayList(); + for (byte i : input) { + expected.add(i); + } + testBoundedToUnboundedSourceAdapterCheckpointRestart(source, expected); + } + + private <T> void testBoundedToUnboundedSourceAdapterCheckpointRestart( + BoundedSource<T> boundedSource, + List<T> expectedElements) throws Exception { + BoundedToUnboundedSourceAdapter<T> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(boundedSource); + + PipelineOptions options = PipelineOptionsFactory.create(); + BoundedToUnboundedSourceAdapter<T>.Reader reader = + unboundedSource.createReader(options, null); + + List<T> actual = Lists.newArrayList(); + for (boolean hasNext = reader.start(); hasNext;) { + actual.add(reader.getCurrent()); + // checkpoint every 9 elements + if (actual.size() % 9 == 0) { + Checkpoint<T> checkpoint = reader.getCheckpointMark(); + Coder<Checkpoint<T>> checkpointCoder = unboundedSource.getCheckpointMarkCoder(); + Checkpoint<T> decodedCheckpoint = CoderUtils.decodeFromByteArray( + checkpointCoder, + CoderUtils.encodeToByteArray(checkpointCoder, checkpoint)); + reader.close(); + checkpoint.finalizeCheckpoint(); + + BoundedToUnboundedSourceAdapter<T>.Reader restarted = + unboundedSource.createReader(options, decodedCheckpoint); + reader = restarted; + hasNext = reader.start(); + } else { + hasNext = reader.advance(); + } + } + assertEquals(expectedElements.size(), actual.size()); + assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); + } + + @Test + public void testReadBeforeStart() throws Exception { + thrown.expect(NoSuchElementException.class); + + BoundedSource<Long> countingSource = CountingSource.upTo(100); + BoundedToUnboundedSourceAdapter<Long> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(countingSource); + PipelineOptions options = PipelineOptionsFactory.create(); + + unboundedSource.createReader(options, null).getCurrent(); + } + + @Test + public void testReadFromCheckpointBeforeStart() throws Exception { + thrown.expect(NoSuchElementException.class); + + BoundedSource<Long> countingSource = CountingSource.upTo(100); + BoundedToUnboundedSourceAdapter<Long> unboundedSource = + new BoundedToUnboundedSourceAdapter<>(countingSource); + PipelineOptions options = PipelineOptionsFactory.create(); + + List<TimestampedValue<Long>> elements = + ImmutableList.of(TimestampedValue.of(1L, new Instant(1L))); + Checkpoint<Long> checkpoint = new Checkpoint<>(elements, countingSource); + unboundedSource.createReader(options, checkpoint).getCurrent(); + } + + /** + * Generate byte array of given size. + */ + private static byte[] generateInput(int size) { + // Arbitrary but fixed seed + Random random = new Random(285930); + byte[] buff = new byte[size]; + random.nextBytes(buff); + return buff; + } + + /** + * Writes a single output file. + */ + private static void writeFile(File file, byte[] input) throws IOException { + try (OutputStream os = new FileOutputStream(file)) { + os.write(input); + } + } + + /** + * Unsplittable source for use in tests. + */ + private static class UnsplittableSource extends FileBasedSource<Byte> { + public UnsplittableSource(String fileOrPatternSpec, long minBundleSize) { + super(fileOrPatternSpec, minBundleSize); + } + + public UnsplittableSource( + String fileName, long minBundleSize, long startOffset, long endOffset) { + super(fileName, minBundleSize, startOffset, endOffset); + } + + @Override + protected FileBasedSource<Byte> createForSubrangeOfFile(String fileName, long start, long end) { + return new UnsplittableSource(fileName, getMinBundleSize(), start, end); + } + + @Override + protected FileBasedReader<Byte> createSingleFileReader(PipelineOptions options) { + return new UnsplittableReader(this); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return false; + } + + @Override + public Coder<Byte> getDefaultOutputCoder() { + return SerializableCoder.of(Byte.class); + } + + private static class UnsplittableReader extends FileBasedReader<Byte> { + ByteBuffer buff = ByteBuffer.allocate(1); + Byte current; + long offset; + ReadableByteChannel channel; + + public UnsplittableReader(UnsplittableSource source) { + super(source); + offset = source.getStartOffset() - 1; + } + + @Override + public Byte getCurrent() throws NoSuchElementException { + return current; + } + + @Override + public boolean allowsDynamicSplitting() { + return false; + } + + @Override + protected boolean isAtSplitPoint() { + return true; + } + + @Override + protected void startReading(ReadableByteChannel channel) throws IOException { + this.channel = channel; + } + + @Override + protected boolean readNextRecord() throws IOException { + buff.clear(); + if (channel.read(buff) != 1) { + return false; + } + current = new Byte(buff.get(0)); + offset += 1; + return true; + } + + @Override + protected long getCurrentOffset() { + return offset; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11d9ec5e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index 49b2ad4..ba13f9d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -197,7 +197,8 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, PCollection<T } @Override - public BoundedReader<ValueWithRecordId<T>> createReader(PipelineOptions options) { + public BoundedReader<ValueWithRecordId<T>> createReader(PipelineOptions options) + throws IOException { return new Reader(source.createReader(options, null)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11d9ec5e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java index 2c4a325..ea3004e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java @@ -76,7 +76,7 @@ public abstract class UnboundedSource< * checkpoint if present. */ public abstract UnboundedReader<OutputT> createReader( - PipelineOptions options, @Nullable CheckpointMarkT checkpointMark); + PipelineOptions options, @Nullable CheckpointMarkT checkpointMark) throws IOException; /** * Returns a {@link Coder} for encoding and decoding the checkpoints for this source, or
