Removes TransformingSource that is now unused

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c16947ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c16947ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c16947ec

Branch: refs/heads/master
Commit: c16947ec01d21ef99a5e2024d7aaead3c7a4399f
Parents: ebd0041
Author: Eugene Kirpichov <[email protected]>
Authored: Tue Jul 25 23:35:00 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Fri Jul 28 10:25:07 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/TransformingSource.java | 136 -------------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  68 ----------
 2 files changed, 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c16947ec/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
deleted file mode 100644
index b8e6b39..0000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.joda.time.Instant;
-
-/**
- * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
- * and transforms elements to type {@code V}.
-*/
-@VisibleForTesting
-class TransformingSource<T, V> extends BoundedSource<V> {
-  private final BoundedSource<T> boundedSource;
-  private final SerializableFunction<T, V> function;
-  private final Coder<V> outputCoder;
-
-  TransformingSource(
-      BoundedSource<T> boundedSource,
-      SerializableFunction<T, V> function,
-      Coder<V> outputCoder) {
-    this.boundedSource = checkNotNull(boundedSource, "boundedSource");
-    this.function = checkNotNull(function, "function");
-    this.outputCoder = checkNotNull(outputCoder, "outputCoder");
-  }
-
-  @Override
-  public List<? extends BoundedSource<V>> split(
-      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    return Lists.transform(
-        boundedSource.split(desiredBundleSizeBytes, options),
-        new Function<BoundedSource<T>, BoundedSource<V>>() {
-          @Override
-          public BoundedSource<V> apply(BoundedSource<T> input) {
-            return new TransformingSource<>(input, function, outputCoder);
-          }
-        });
-  }
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-    return boundedSource.getEstimatedSizeBytes(options);
-  }
-
-  @Override
-  public BoundedReader<V> createReader(PipelineOptions options) throws 
IOException {
-    return new TransformingReader(boundedSource.createReader(options));
-  }
-
-  @Override
-  public void validate() {
-    boundedSource.validate();
-  }
-
-  @Override
-  public Coder<V> getDefaultOutputCoder() {
-    return outputCoder;
-  }
-
-  private class TransformingReader extends BoundedReader<V> {
-    private final BoundedReader<T> boundedReader;
-
-    private TransformingReader(BoundedReader<T> boundedReader) {
-      this.boundedReader = checkNotNull(boundedReader, "boundedReader");
-    }
-
-    @Override
-    public synchronized BoundedSource<V> getCurrentSource() {
-      return new TransformingSource<>(boundedReader.getCurrentSource(), 
function, outputCoder);
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      return boundedReader.start();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      return boundedReader.advance();
-    }
-
-    @Override
-    public V getCurrent() throws NoSuchElementException {
-      T current = boundedReader.getCurrent();
-      return function.apply(current);
-    }
-
-    @Override
-    public void close() throws IOException {
-      boundedReader.close();
-    }
-
-    @Override
-    public synchronized BoundedSource<V> splitAtFraction(double fraction) {
-      BoundedSource<T> split = boundedReader.splitAtFraction(fraction);
-      return split == null ? null : new TransformingSource<>(split, function, 
outputCoder);
-    }
-
-    @Override
-    public Double getFractionConsumed() {
-      return boundedReader.getFractionConsumed();
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return boundedReader.getCurrentTimestamp();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c16947ec/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 3465b4e..8db4e94 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -86,7 +86,6 @@ import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -1510,8 +1509,6 @@ public class BigQueryIOTest implements Serializable {
     // Simulate a repeated call to split(), like a Dataflow worker will 
sometimes do.
     sources = bqSource.split(200, options);
     assertEquals(2, sources.size());
-    BoundedSource<TableRow> actual = sources.get(0);
-    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
 
     // A repeated call to split() should not have caused a duplicate extract 
job.
     assertEquals(1, fakeJobService.getNumExtractJobCalls());
@@ -1594,8 +1591,6 @@ public class BigQueryIOTest implements Serializable {
 
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, 
options);
     assertEquals(2, sources.size());
-    BoundedSource<TableRow> actual = sources.get(0);
-    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
   }
 
   @Test
@@ -1673,69 +1668,6 @@ public class BigQueryIOTest implements Serializable {
 
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, 
options);
     assertEquals(2, sources.size());
-    BoundedSource<TableRow> actual = sources.get(0);
-    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
-  }
-
-  @Test
-  public void testTransformingSource() throws Exception {
-    int numElements = 10000;
-    @SuppressWarnings("deprecation")
-    BoundedSource<Long> longSource = CountingSource.upTo(numElements);
-    SerializableFunction<Long, String> toStringFn =
-        new SerializableFunction<Long, String>() {
-          @Override
-          public String apply(Long input) {
-            return input.toString();
-         }};
-    BoundedSource<String> stringSource = new TransformingSource<>(
-        longSource, toStringFn, StringUtf8Coder.of());
-
-    List<String> expected = Lists.newArrayList();
-    for (int i = 0; i < numElements; i++) {
-      expected.add(String.valueOf(i));
-    }
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(stringSource, options),
-        CoreMatchers.is(expected));
-    SourceTestUtils.assertSplitAtFractionBehavior(
-        stringSource, 100, 0.3, 
ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options);
-
-    SourceTestUtils.assertSourcesEqualReferenceSource(
-        stringSource, stringSource.split(100, options), options);
-  }
-
-  @Test
-  public void testTransformingSourceUnsplittable() throws Exception {
-    int numElements = 10000;
-    @SuppressWarnings("deprecation")
-    BoundedSource<Long> longSource =
-        SourceTestUtils.toUnsplittableSource(CountingSource.upTo(numElements));
-    SerializableFunction<Long, String> toStringFn =
-        new SerializableFunction<Long, String>() {
-          @Override
-          public String apply(Long input) {
-            return input.toString();
-          }
-        };
-    BoundedSource<String> stringSource =
-        new TransformingSource<>(longSource, toStringFn, StringUtf8Coder.of());
-
-    List<String> expected = Lists.newArrayList();
-    for (int i = 0; i < numElements; i++) {
-      expected.add(String.valueOf(i));
-    }
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(stringSource, options), 
CoreMatchers.is(expected));
-    SourceTestUtils.assertSplitAtFractionBehavior(
-        stringSource, 100, 0.3, 
ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-
-    SourceTestUtils.assertSourcesEqualReferenceSource(
-        stringSource, stringSource.split(100, options), options);
   }
 
   @Test

Reply via email to