Include the creating PCollection in PCollectionView This is available on the client that created the view, but may not be available elsewhere.
Update signatures and callers to match. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4bcc6413 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4bcc6413 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4bcc6413 Branch: refs/heads/master Commit: 4bcc64130fa842acdfd4ebb7168bea73b2d8313d Parents: d9b053d Author: Thomas Groh <[email protected]> Authored: Thu Mar 30 10:47:12 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri Mar 31 09:47:55 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/apex/ApexRunner.java | 6 +-- .../beam/runners/core/SideInputHandlerTest.java | 22 ++++++----- .../runners/direct/SideInputContainerTest.java | 19 ++-------- .../direct/ViewEvaluatorFactoryTest.java | 3 +- .../direct/WriteWithShardingFactoryTest.java | 5 ++- .../flink/FlinkStreamingViewOverrides.java | 10 ++--- .../flink/streaming/DoFnOperatorTest.java | 23 +++++------ .../runners/dataflow/BatchViewOverrides.java | 10 ++--- .../org/apache/beam/sdk/transforms/Combine.java | 2 +- .../org/apache/beam/sdk/transforms/View.java | 8 ++-- .../apache/beam/sdk/util/PCollectionViews.java | 40 ++++++++++++-------- .../apache/beam/sdk/values/PCollectionView.java | 9 +++++ .../sdk/testing/PCollectionViewTesting.java | 20 +++++++--- .../beam/sdk/transforms/DoFnTesterTest.java | 7 +++- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 16 +++++--- 15 files changed, 114 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 79a2dd7..dfc8f63 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -242,7 +242,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { .apply(Combine.globally(transform.getCombineFn()) .withoutDefaults().withFanout(transform.getFanout())); - PCollectionView<OutputT> view = PCollectionViews.singletonView(combined.getPipeline(), + PCollectionView<OutputT> view = PCollectionViews.singletonView(combined, combined.getWindowingStrategy(), transform.getInsertDefault(), transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null, combined.getCoder()); @@ -338,8 +338,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { @Override public PCollectionView<Iterable<T>> expand(PCollection<T> input) { - PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(input.getPipeline(), - input.getWindowingStrategy(), input.getCoder()); + PCollectionView<Iterable<T>> view = + PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder()); return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) .apply(CreateApexPCollectionView.<T, Iterable<T>> of(view)); http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java index 3a5d346..335aede 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java @@ -51,20 +51,22 @@ public class SideInputHandlerTest { private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 = WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1))); - private PCollectionView<Iterable<String>> view1 = PCollectionViewTesting.testingView( - new TupleTag<Iterable<WindowedValue<String>>>() {}, - new PCollectionViewTesting.IdentityViewFn<String>(), - StringUtf8Coder.of(), - windowingStrategy1); + private PCollectionView<Iterable<String>> view1 = + PCollectionViewTesting.testingView( + new TupleTag<Iterable<WindowedValue<String>>>() {}, + new PCollectionViewTesting.IdentityViewFn<String>(), + StringUtf8Coder.of(), + windowingStrategy1); private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 = WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2))); - private PCollectionView<Iterable<String>> view2 = PCollectionViewTesting.testingView( - new TupleTag<Iterable<WindowedValue<String>>>() {}, - new PCollectionViewTesting.IdentityViewFn<String>(), - StringUtf8Coder.of(), - windowingStrategy2); + private PCollectionView<Iterable<String>> view2 = + PCollectionViewTesting.testingView( + new TupleTag<Iterable<WindowedValue<String>>>() {}, + new PCollectionViewTesting.IdentityViewFn<String>(), + StringUtf8Coder.of(), + windowingStrategy2); @Test public void testIsEmpty() { http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java index 183decd..f4de883 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; @@ -204,24 +205,12 @@ public class SideInputContainerTest { } @Test - public void withPCollectionViewsErrorsForContainsNotInViews() { - PCollectionView<Map<String, Iterable<String>>> newView = - PCollectionViews.multimapView( - pipeline, - WindowingStrategy.globalDefault(), - KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString()); - - container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView)); - } - - @Test public void withViewsForViewNotInContainerFails() { + PCollection<KV<String, String>> input = + pipeline.apply(Create.empty(new TypeDescriptor<KV<String, String>>() {})); PCollectionView<Map<String, Iterable<String>>> newView = PCollectionViews.multimapView( - pipeline, + input, WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index b094d17..b56bd74 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -63,7 +63,8 @@ public class ViewEvaluatorFactoryTest { PCollection<String> input = p.apply(Create.of("foo", "bar")); CreatePCollectionView<String, Iterable<String>> createView = CreatePCollectionView.of( - PCollectionViews.iterableView(p, input.getWindowingStrategy(), StringUtf8Coder.of())); + PCollectionViews.iterableView( + input, input.getWindowingStrategy(), StringUtf8Coder.of())); PCollection<Iterable<String>> concat = input.apply(WithKeys.<Void, String>of((Void) null)) .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 16b6312..8720fd1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -170,13 +170,14 @@ public class WriteWithShardingFactoryTest { @Test public void keyBasedOnCountFnFewElementsExtraShards() throws Exception { + long countValue = (long) WriteWithShardingFactory.MIN_SHARDS_FOR_LOG + 3; + PCollection<Long> inputCount = p.apply(Create.of(countValue)); PCollectionView<Long> elementCountView = PCollectionViews.singletonView( - p, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); + inputCount, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); CalculateShardsFn fn = new CalculateShardsFn(3); DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn); - long countValue = (long) WriteWithShardingFactory.MIN_SHARDS_FOR_LOG + 3; fnTester.setSideInput(elementCountView, GlobalWindow.INSTANCE, countValue); List<Integer> kvs = fnTester.processBundle(10L); http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java index 0ff6367..f955f2a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java @@ -59,7 +59,7 @@ class FlinkStreamingViewOverrides { public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { PCollectionView<Map<K, V>> view = PCollectionViews.mapView( - input.getPipeline(), + input, input.getWindowingStrategy(), input.getCoder()); @@ -104,7 +104,7 @@ class FlinkStreamingViewOverrides { public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView( - input.getPipeline(), + input, input.getWindowingStrategy(), input.getCoder()); @@ -144,7 +144,7 @@ class FlinkStreamingViewOverrides { public PCollectionView<List<T>> expand(PCollection<T> input) { PCollectionView<List<T>> view = PCollectionViews.listView( - input.getPipeline(), + input, input.getWindowingStrategy(), input.getCoder()); @@ -175,7 +175,7 @@ class FlinkStreamingViewOverrides { public PCollectionView<Iterable<T>> expand(PCollection<T> input) { PCollectionView<Iterable<T>> view = PCollectionViews.iterableView( - input.getPipeline(), + input, input.getWindowingStrategy(), input.getCoder()); @@ -272,7 +272,7 @@ class FlinkStreamingViewOverrides { .withFanout(transform.getFanout())); PCollectionView<OutputT> view = PCollectionViews.singletonView( - combined.getPipeline(), + combined, combined.getWindowingStrategy(), transform.getInsertDefault(), transform.getInsertDefault() http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 25154fa..c1fdea3 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -58,7 +58,6 @@ import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; - import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -85,20 +84,22 @@ public class DoFnOperatorTest { private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 = WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1))); - private PCollectionView<Iterable<String>> view1 = PCollectionViewTesting.testingView( - new TupleTag<Iterable<WindowedValue<String>>>() {}, - new PCollectionViewTesting.IdentityViewFn<String>(), - StringUtf8Coder.of(), - windowingStrategy1); + private PCollectionView<Iterable<String>> view1 = + PCollectionViewTesting.testingView( + new TupleTag<Iterable<WindowedValue<String>>>() {}, + new PCollectionViewTesting.IdentityViewFn<String>(), + StringUtf8Coder.of(), + windowingStrategy1); private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 = WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2))); - private PCollectionView<Iterable<String>> view2 = PCollectionViewTesting.testingView( - new TupleTag<Iterable<WindowedValue<String>>>() {}, - new PCollectionViewTesting.IdentityViewFn<String>(), - StringUtf8Coder.of(), - windowingStrategy2); + private PCollectionView<Iterable<String>> view2 = + PCollectionViewTesting.testingView( + new TupleTag<Iterable<WindowedValue<String>>>() {}, + new PCollectionViewTesting.IdentityViewFn<String>(), + StringUtf8Coder.of(), + windowingStrategy2); @Test @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 3689d3d..af96403 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -214,7 +214,7 @@ class BatchViewOverrides { KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); try { PCollectionView<Map<K, V>> view = PCollectionViews.mapView( - input.getPipeline(), input.getWindowingStrategy(), inputCoder); + input, input.getWindowingStrategy(), inputCoder); return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */); } catch (NonDeterministicException e) { runner.recordViewUsesNonDeterministicKeyCoder(this); @@ -701,7 +701,7 @@ class BatchViewOverrides { KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); try { PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView( - input.getPipeline(), input.getWindowingStrategy(), inputCoder); + input, input.getWindowingStrategy(), inputCoder); return applyForMapLike(runner, input, view, false /* unique keys not expected */); } catch (NonDeterministicException e) { @@ -959,7 +959,7 @@ class BatchViewOverrides { @SuppressWarnings({"rawtypes", "unchecked"}) PCollectionView<ViewT> view = (PCollectionView<ViewT>) PCollectionViews.<FinalT, W>singletonView( - input.getPipeline(), + (PCollection) input, (WindowingStrategy) input.getWindowingStrategy(), hasDefault, defaultValue, @@ -1092,7 +1092,7 @@ class BatchViewOverrides { @Override public PCollectionView<List<T>> expand(PCollection<T> input) { PCollectionView<List<T>> view = PCollectionViews.listView( - input.getPipeline(), input.getWindowingStrategy(), input.getCoder()); + input, input.getWindowingStrategy(), input.getCoder()); return applyForIterableLike(runner, input, view); } @@ -1177,7 +1177,7 @@ class BatchViewOverrides { @Override public PCollectionView<Iterable<T>> expand(PCollection<T> input) { PCollectionView<Iterable<T>> view = PCollectionViews.iterableView( - input.getPipeline(), input.getWindowingStrategy(), input.getCoder()); + input, input.getWindowingStrategy(), input.getCoder()); return BatchViewAsList.applyForIterableLike(runner, input, view); } } http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index b403691..8fe4831 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -1601,7 +1601,7 @@ public class Combine { return combined.apply( CreatePCollectionView.<OutputT, OutputT>of( PCollectionViews.singletonView( - input.getPipeline(), + combined, input.getWindowingStrategy(), insertDefault, insertDefault ? fn.defaultValue() : null, http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 67a41e4..14035b0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -257,7 +257,7 @@ public class View { @Override public PCollectionView<List<T>> expand(PCollection<T> input) { return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView( - input.getPipeline(), input.getWindowingStrategy(), input.getCoder()))); + input, input.getWindowingStrategy(), input.getCoder()))); } } @@ -283,7 +283,7 @@ public class View { @Override public PCollectionView<Iterable<T>> expand(PCollection<T> input) { return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView( - input.getPipeline(), input.getWindowingStrategy(), input.getCoder()))); + input, input.getWindowingStrategy(), input.getCoder()))); } } @@ -427,7 +427,7 @@ public class View { public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of( PCollectionViews.multimapView( - input.getPipeline(), + input, input.getWindowingStrategy(), input.getCoder()))); } @@ -464,7 +464,7 @@ public class View { public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of( PCollectionViews.mapView( - input.getPipeline(), + input, input.getWindowingStrategy(), input.getCoder()))); } http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java index 0794703..c2e3153 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import javax.annotation.Nullable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; @@ -60,13 +59,13 @@ public class PCollectionViews { * {@code defaultValue} for any empty windows. */ public static <T, W extends BoundedWindow> PCollectionView<T> singletonView( - Pipeline pipeline, + PCollection<T> pCollection, WindowingStrategy<?, W> windowingStrategy, boolean hasDefault, @Nullable T defaultValue, Coder<T> valueCoder) { return new SimplePCollectionView<>( - pipeline, + pCollection, new SingletonViewFn<>(hasDefault, defaultValue, valueCoder), windowingStrategy, valueCoder); @@ -77,11 +76,11 @@ public class PCollectionViews { * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. */ public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView( - Pipeline pipeline, + PCollection<T> pCollection, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) { return new SimplePCollectionView<>( - pipeline, new IterableViewFn<T>(), windowingStrategy, valueCoder); + pCollection, new IterableViewFn<T>(), windowingStrategy, valueCoder); } /** @@ -89,11 +88,11 @@ public class PCollectionViews { * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. */ public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView( - Pipeline pipeline, + PCollection<T> pCollection, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) { return new SimplePCollectionView<>( - pipeline, new ListViewFn<T>(), windowingStrategy, valueCoder); + pCollection, new ListViewFn<T>(), windowingStrategy, valueCoder); } /** @@ -101,9 +100,11 @@ public class PCollectionViews { * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. */ public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView( - Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<KV<K, V>> valueCoder) { + PCollection<KV<K, V>> pCollection, + WindowingStrategy<?, W> windowingStrategy, + Coder<KV<K, V>> valueCoder) { return new SimplePCollectionView<>( - pipeline, new MapViewFn<K, V>(), windowingStrategy, valueCoder); + pCollection, new MapViewFn<K, V>(), windowingStrategy, valueCoder); } /** @@ -111,11 +112,11 @@ public class PCollectionViews { * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. */ public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView( - Pipeline pipeline, + PCollection<KV<K, V>> pCollection, WindowingStrategy<?, W> windowingStrategy, Coder<KV<K, V>> valueCoder) { return new SimplePCollectionView<>( - pipeline, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder); + pCollection, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder); } /** @@ -301,6 +302,9 @@ public class PCollectionViews { private static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow> extends PValueBase implements PCollectionView<ViewT> { + /** The {@link PCollection} this view was originally created from. */ + private transient PCollection<ElemT> pCollection; + /** A unique tag for the view, typed according to the elements underlying the view. */ private TupleTag<Iterable<WindowedValue<ElemT>>> tag; @@ -320,12 +324,13 @@ public class PCollectionViews { * boilerplate accessors. */ private SimplePCollectionView( - Pipeline pipeline, + PCollection<ElemT> pCollection, TupleTag<Iterable<WindowedValue<ElemT>>> tag, ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, WindowingStrategy<?, W> windowingStrategy, Coder<ElemT> valueCoder) { - super(pipeline); + super(pCollection.getPipeline()); + this.pCollection = pCollection; if (windowingStrategy.getWindowFn() instanceof InvalidWindows) { throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows"); } @@ -342,12 +347,12 @@ public class PCollectionViews { * boilerplate accessors, with an auto-generated tag. */ private SimplePCollectionView( - Pipeline pipeline, + PCollection<ElemT> pCollection, ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, WindowingStrategy<?, W> windowingStrategy, Coder<ElemT> valueCoder) { this( - pipeline, + pCollection, new TupleTag<Iterable<WindowedValue<ElemT>>>(), viewFn, windowingStrategy, @@ -372,6 +377,11 @@ public class PCollectionViews { return untypedViewFn; } + @Override + public PCollection<?> getPCollection() { + return pCollection; + } + /** * Returns a unique {@link TupleTag} identifying this {@link PCollectionView}. * http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java index a351723..f2ddf55 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.values; import java.io.Serializable; +import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -44,6 +45,14 @@ import org.apache.beam.sdk.util.WindowingStrategy; */ public interface PCollectionView<T> extends PValue, Serializable { /** + * Gets the {@link PCollection} this {@link PCollectionView} was created from. + * + * <p>The {@link PCollection} may not be available in all contexts. + */ + @Nullable + PCollection<?> getPCollection(); + + /** * @deprecated this method will be removed entirely. The {@link PCollection} underlying a side * input, is part of the side input's specification with a {@link ParDo} transform, which will * obtain that information via a package-private channel. http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java index 99fb1fb..b544812 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValueBase; import org.apache.beam.sdk.values.TupleTag; @@ -140,12 +141,9 @@ public final class PCollectionViewTesting { public static <ElemT, ViewT> PCollectionView<ViewT> testingView( TupleTag<Iterable<WindowedValue<ElemT>>> tag, ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, - Coder<ElemT> elemCoder) { - return testingView( - tag, - viewFn, - elemCoder, - DEFAULT_WINDOWING_STRATEGY); + Coder<ElemT> elemCoder, + WindowingStrategy<?, ?> windowingStrategy) { + return testingView(null, tag, viewFn, elemCoder, windowingStrategy); } /** @@ -168,11 +166,13 @@ public final class PCollectionViewTesting { * values provided to the view during execution, results are unpredictable. */ public static <ElemT, ViewT> PCollectionView<ViewT> testingView( + PCollection<ElemT> pCollection, TupleTag<Iterable<WindowedValue<ElemT>>> tag, ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, Coder<ElemT> elemCoder, WindowingStrategy<?, ?> windowingStrategy) { return new PCollectionViewFromParts<>( + pCollection, tag, viewFn, windowingStrategy, @@ -223,22 +223,30 @@ public final class PCollectionViewTesting { private static class PCollectionViewFromParts<ElemT, ViewT> extends PValueBase implements PCollectionView<ViewT> { + private PCollection<ElemT> pCollection; private TupleTag<Iterable<WindowedValue<ElemT>>> tag; private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn; private WindowingStrategy<?, ?> windowingStrategy; private Coder<Iterable<WindowedValue<ElemT>>> coder; public PCollectionViewFromParts( + PCollection<ElemT> pCollection, TupleTag<Iterable<WindowedValue<ElemT>>> tag, ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, WindowingStrategy<?, ?> windowingStrategy, Coder<Iterable<WindowedValue<ElemT>>> coder) { + this.pCollection = pCollection; this.tag = tag; this.viewFn = viewFn; this.windowingStrategy = windowingStrategy; this.coder = coder; } + @Override + public PCollection<?> getPCollection() { + return pCollection; + } + @SuppressWarnings({"unchecked", "rawtypes"}) @Override public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() { http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 699687f..3b6fbfb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.Matchers; @@ -324,9 +325,10 @@ public class DoFnTesterTest { @Test public void fnWithSideInputDefault() throws Exception { + PCollection<Integer> pCollection = p.apply(Create.empty(VarIntCoder.of())); final PCollectionView<Integer> value = PCollectionViews.singletonView( - p, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); + pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) { tester.processElement(1); @@ -339,9 +341,10 @@ public class DoFnTesterTest { @Test public void fnWithSideInputExplicit() throws Exception { + PCollection<Integer> pCollection = p.apply(Create.of(-2)); final PCollectionView<Integer> value = PCollectionViews.singletonView( - p, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); + pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) { tester.setSideInput(value, GlobalWindow.INSTANCE, -2); http://git-wip-us.apache.org/repos/asf/beam/blob/4bcc6413/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index e7db8a2..5ef8b2c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -153,6 +153,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; @@ -2108,8 +2109,10 @@ public class BigQueryIOTest implements Serializable { TupleTag<KV<Long, List<String>>> singlePartitionTag = new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {}; + PCollection<KV<String, Long>> filesPCollection = + p.apply(Create.of(files).withType(new TypeDescriptor<KV<String, Long>>() {})); PCollectionView<Iterable<KV<String, Long>>> filesView = PCollectionViews.iterableView( - p, + filesPCollection, WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())); @@ -2173,8 +2176,9 @@ public class BigQueryIOTest implements Serializable { expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i)); } + PCollection<String> expectedTempTablesPCollection = p.apply(Create.of(expectedTempTables)); PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView( - p, + expectedTempTablesPCollection, WindowingStrategy.globalDefault(), StringUtf8Coder.of()); PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId")); @@ -2251,10 +2255,10 @@ public class BigQueryIOTest implements Serializable { tempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i)); } - PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView( - p, - WindowingStrategy.globalDefault(), - StringUtf8Coder.of()); + PCollection<String> tempTablesPCollection = p.apply(Create.of(tempTables)); + PCollectionView<Iterable<String>> tempTablesView = + PCollectionViews.iterableView( + tempTablesPCollection, WindowingStrategy.globalDefault(), StringUtf8Coder.of()); PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId")); PCollectionView<String> jobIdTokenView = jobIdTokenCollection.apply(View.<String>asSingleton());
