Fix NPE in BigQueryIO.TransformingReader
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c8a6546 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c8a6546 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c8a6546 Branch: refs/heads/master Commit: 2c8a6546af2adb1f7694f29a092338898f851d16 Parents: 7ac8d6d Author: Pei He <[email protected]> Authored: Mon Aug 15 17:23:20 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Aug 17 17:50:07 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/testing/SourceTestUtils.java | 132 +++++++++++++++++++ .../beam/sdk/testing/SourceTestUtilsTest.java | 66 ++++++++++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 31 +++++ 4 files changed, 235 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java index e0b8890..9ce9c5e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.testing; +import static com.google.common.base.Preconditions.checkNotNull; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; @@ -27,10 +29,15 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +45,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -45,6 +53,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import javax.annotation.Nullable; + /** * Helper functions and test harnesses for checking correctness of {@link Source} * implementations. @@ -673,4 +683,126 @@ public class SourceTestUtils { numItemsToReadBeforeSplitting, fraction, options); return (res.numResidualItems > 0); } + + /** + * Returns an equivalent unsplittable {@code BoundedSource<T>}. + * + * <p>It forwards most methods to the given {@code boundedSource}, except: + * <ol> + * <li> {@link BoundedSource#splitIntoBundles} rejects initial splitting + * by returning itself in a list. + * <li> {@link BoundedReader#splitAtFraction} rejects dynamic splitting by returning null. + * </ol> + */ + public static <T> BoundedSource<T> toUnsplittableSource(BoundedSource<T> boundedSource) { + return new UnsplittableSource<>(boundedSource); + } + + private static class UnsplittableSource<T> extends BoundedSource<T> { + + private final BoundedSource<T> boundedSource; + + private UnsplittableSource(BoundedSource<T> boundedSource) { + this.boundedSource = checkNotNull(boundedSource, "boundedSource"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + this.boundedSource.populateDisplayData(builder); + } + + @Override + public List<? extends BoundedSource<T>> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + return ImmutableList.of(this); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return boundedSource.getEstimatedSizeBytes(options); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return boundedSource.producesSortedKeys(options); + } + + @Override + public BoundedReader<T> createReader(PipelineOptions options) throws IOException { + return new UnsplittableReader<>(boundedSource, boundedSource.createReader(options)); + } + + @Override + public void validate() { + boundedSource.validate(); + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return boundedSource.getDefaultOutputCoder(); + } + + private static class UnsplittableReader<T> extends BoundedReader<T> { + + private final BoundedSource<T> boundedSource; + private final BoundedReader<T> boundedReader; + + private UnsplittableReader(BoundedSource<T> boundedSource, BoundedReader<T> boundedReader) { + this.boundedSource = checkNotNull(boundedSource, "boundedSource"); + this.boundedReader = checkNotNull(boundedReader, "boundedReader"); + } + + @Override + public BoundedSource<T> getCurrentSource() { + return boundedSource; + } + + @Override + public boolean start() throws IOException { + return boundedReader.start(); + } + + @Override + public boolean advance() throws IOException { + return boundedReader.advance(); + } + + @Override + public T getCurrent() throws NoSuchElementException { + return boundedReader.getCurrent(); + } + + @Override + public void close() throws IOException { + boundedReader.close(); + } + + @Override + @Nullable + public BoundedSource<T> splitAtFraction(double fraction) { + return null; + } + + @Override + @Nullable + public Double getFractionConsumed() { + return boundedReader.getFractionConsumed(); + } + + @Override + public long getSplitPointsConsumed() { + return boundedReader.getSplitPointsConsumed(); + } + + @Override + public long getSplitPointsRemaining() { + return boundedReader.getSplitPointsRemaining(); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return boundedReader.getCurrentTimestamp(); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java new file mode 100644 index 0000000..f2b332b --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +import com.google.common.collect.Sets; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.List; +import java.util.Set; + +/** + * Tests for {@link SourceTestUtils}. + */ +@RunWith(JUnit4.class) +public class SourceTestUtilsTest { + + @Test + public void testToUnsplittableSource() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + BoundedSource<Long> baseSource = CountingSource.upTo(100); + BoundedSource<Long> unsplittableSource = SourceTestUtils.toUnsplittableSource(baseSource); + List<?> splits = unsplittableSource.splitIntoBundles(1, options); + assertEquals(splits.size(), 1); + assertEquals(splits.get(0), unsplittableSource); + + BoundedReader<Long> unsplittableReader = unsplittableSource.createReader(options); + assertEquals(0, unsplittableReader.getFractionConsumed(), 1e-15); + + Set<Long> expected = Sets.newHashSet(SourceTestUtils.readFromSource(baseSource, options)); + Set<Long> actual = Sets.newHashSet(); + actual.addAll(SourceTestUtils.readNItemsFromUnstartedReader(unsplittableReader, 40)); + assertNull(unsplittableReader.splitAtFraction(0.5)); + actual.addAll(SourceTestUtils.readRemainingFromReader(unsplittableReader, true /* started */)); + assertEquals(1, unsplittableReader.getFractionConsumed(), 1e-15); + + assertEquals(100, actual.size()); + assertEquals(Sets.newHashSet(expected), Sets.newHashSet(actual)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index ce04467..e61dcca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1123,9 +1123,9 @@ public class BigQueryIO { BoundedSource<T> boundedSource, SerializableFunction<T, V> function, Coder<V> outputCoder) { - this.boundedSource = boundedSource; - this.function = function; - this.outputCoder = outputCoder; + this.boundedSource = checkNotNull(boundedSource, "boundedSource"); + this.function = checkNotNull(function, "function"); + this.outputCoder = checkNotNull(outputCoder, "outputCoder"); } @Override @@ -1170,7 +1170,7 @@ public class BigQueryIO { private final BoundedReader<T> boundedReader; private TransformingReader(BoundedReader<T> boundedReader) { - this.boundedReader = boundedReader; + this.boundedReader = checkNotNull(boundedReader, "boundedReader"); } @Override @@ -1201,8 +1201,8 @@ public class BigQueryIO { @Override public synchronized BoundedSource<V> splitAtFraction(double fraction) { - return new TransformingSource<>( - boundedReader.splitAtFraction(fraction), function, outputCoder); + BoundedSource<T> split = boundedReader.splitAtFraction(fraction); + return split == null ? null : new TransformingSource<>(split, function, outputCoder); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8a6546/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index fcaa054..ca60696 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1216,6 +1216,37 @@ public class BigQueryIOTest implements Serializable { } @Test + public void testTransformingSourceUnsplittable() throws Exception { + int numElements = 10000; + @SuppressWarnings("deprecation") + BoundedSource<Long> longSource = + SourceTestUtils.toUnsplittableSource(CountingSource.upTo(numElements)); + SerializableFunction<Long, String> toStringFn = + new SerializableFunction<Long, String>() { + @Override + public String apply(Long input) { + return input.toString(); + } + }; + BoundedSource<String> stringSource = + new TransformingSource<>(longSource, toStringFn, StringUtf8Coder.of()); + + List<String> expected = Lists.newArrayList(); + for (int i = 0; i < numElements; i++) { + expected.add(String.valueOf(i)); + } + + PipelineOptions options = PipelineOptionsFactory.create(); + Assert.assertThat( + SourceTestUtils.readFromSource(stringSource, options), CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + + SourceTestUtils.assertSourcesEqualReferenceSource( + stringSource, stringSource.splitIntoBundles(100, options), options); + } + + @Test @Category(RunnableOnService.class) public void testPassThroughThenCleanup() throws Exception { Pipeline p = TestPipeline.create();
