Removes TransformingSource that is now unused
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c16947ec Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c16947ec Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c16947ec Branch: refs/heads/master Commit: c16947ec01d21ef99a5e2024d7aaead3c7a4399f Parents: ebd0041 Author: Eugene Kirpichov <[email protected]> Authored: Tue Jul 25 23:35:00 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Jul 28 10:25:07 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/TransformingSource.java | 136 ------------------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 68 ---------- 2 files changed, 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c16947ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java deleted file mode 100644 index b8e6b39..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigquery; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import java.io.IOException; -import java.util.List; -import java.util.NoSuchElementException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.joda.time.Instant; - -/** - * A {@link BoundedSource} that reads from {@code BoundedSource<T>} - * and transforms elements to type {@code V}. -*/ -@VisibleForTesting -class TransformingSource<T, V> extends BoundedSource<V> { - private final BoundedSource<T> boundedSource; - private final SerializableFunction<T, V> function; - private final Coder<V> outputCoder; - - TransformingSource( - BoundedSource<T> boundedSource, - SerializableFunction<T, V> function, - Coder<V> outputCoder) { - this.boundedSource = checkNotNull(boundedSource, "boundedSource"); - this.function = checkNotNull(function, "function"); - this.outputCoder = checkNotNull(outputCoder, "outputCoder"); - } - - @Override - public List<? extends BoundedSource<V>> split( - long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - return Lists.transform( - boundedSource.split(desiredBundleSizeBytes, options), - new Function<BoundedSource<T>, BoundedSource<V>>() { - @Override - public BoundedSource<V> apply(BoundedSource<T> input) { - return new TransformingSource<>(input, function, outputCoder); - } - }); - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - return boundedSource.getEstimatedSizeBytes(options); - } - - @Override - public BoundedReader<V> createReader(PipelineOptions options) throws IOException { - return new TransformingReader(boundedSource.createReader(options)); - } - - @Override - public void validate() { - boundedSource.validate(); - } - - @Override - public Coder<V> getDefaultOutputCoder() { - return outputCoder; - } - - private class TransformingReader extends BoundedReader<V> { - private final BoundedReader<T> boundedReader; - - private TransformingReader(BoundedReader<T> boundedReader) { - this.boundedReader = checkNotNull(boundedReader, "boundedReader"); - } - - @Override - public synchronized BoundedSource<V> getCurrentSource() { - return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder); - } - - @Override - public boolean start() throws IOException { - return boundedReader.start(); - } - - @Override - public boolean advance() throws IOException { - return boundedReader.advance(); - } - - @Override - public V getCurrent() throws NoSuchElementException { - T current = boundedReader.getCurrent(); - return function.apply(current); - } - - @Override - public void close() throws IOException { - boundedReader.close(); - } - - @Override - public synchronized BoundedSource<V> splitAtFraction(double fraction) { - BoundedSource<T> split = boundedReader.splitAtFraction(fraction); - return split == null ? null : new TransformingSource<>(split, function, outputCoder); - } - - @Override - public Double getFractionConsumed() { - return boundedReader.getFractionConsumed(); - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return boundedReader.getCurrentTimestamp(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c16947ec/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 3465b4e..8db4e94 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -86,7 +86,6 @@ import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.fs.ResourceId; @@ -1510,8 +1509,6 @@ public class BigQueryIOTest implements Serializable { // Simulate a repeated call to split(), like a Dataflow worker will sometimes do. sources = bqSource.split(200, options); assertEquals(2, sources.size()); - BoundedSource<TableRow> actual = sources.get(0); - assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); // A repeated call to split() should not have caused a duplicate extract job. assertEquals(1, fakeJobService.getNumExtractJobCalls()); @@ -1594,8 +1591,6 @@ public class BigQueryIOTest implements Serializable { List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options); assertEquals(2, sources.size()); - BoundedSource<TableRow> actual = sources.get(0); - assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); } @Test @@ -1673,69 +1668,6 @@ public class BigQueryIOTest implements Serializable { List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options); assertEquals(2, sources.size()); - BoundedSource<TableRow> actual = sources.get(0); - assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); - } - - @Test - public void testTransformingSource() throws Exception { - int numElements = 10000; - @SuppressWarnings("deprecation") - BoundedSource<Long> longSource = CountingSource.upTo(numElements); - SerializableFunction<Long, String> toStringFn = - new SerializableFunction<Long, String>() { - @Override - public String apply(Long input) { - return input.toString(); - }}; - BoundedSource<String> stringSource = new TransformingSource<>( - longSource, toStringFn, StringUtf8Coder.of()); - - List<String> expected = Lists.newArrayList(); - for (int i = 0; i < numElements; i++) { - expected.add(String.valueOf(i)); - } - - PipelineOptions options = PipelineOptionsFactory.create(); - Assert.assertThat( - SourceTestUtils.readFromSource(stringSource, options), - CoreMatchers.is(expected)); - SourceTestUtils.assertSplitAtFractionBehavior( - stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options); - - SourceTestUtils.assertSourcesEqualReferenceSource( - stringSource, stringSource.split(100, options), options); - } - - @Test - public void testTransformingSourceUnsplittable() throws Exception { - int numElements = 10000; - @SuppressWarnings("deprecation") - BoundedSource<Long> longSource = - SourceTestUtils.toUnsplittableSource(CountingSource.upTo(numElements)); - SerializableFunction<Long, String> toStringFn = - new SerializableFunction<Long, String>() { - @Override - public String apply(Long input) { - return input.toString(); - } - }; - BoundedSource<String> stringSource = - new TransformingSource<>(longSource, toStringFn, StringUtf8Coder.of()); - - List<String> expected = Lists.newArrayList(); - for (int i = 0; i < numElements; i++) { - expected.add(String.valueOf(i)); - } - - PipelineOptions options = PipelineOptionsFactory.create(); - Assert.assertThat( - SourceTestUtils.readFromSource(stringSource, options), CoreMatchers.is(expected)); - SourceTestUtils.assertSplitAtFractionBehavior( - stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); - - SourceTestUtils.assertSourcesEqualReferenceSource( - stringSource, stringSource.split(100, options), options); } @Test
