Repository: incubator-beam Updated Branches: refs/heads/master d60a0a0e4 -> 5049011a2
Add ViewFn and port SDK to use it This is a preliminary step towards the architecture at https://s.apache.org/beam-side-input-1-pager This separates the ViewFn part of each PCollectionView class/transform, toward eliminating extraneous public subclasses of PCollectionView and PTransform. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c376b45c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c376b45c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c376b45c Branch: refs/heads/master Commit: c376b45cac8568d7242d29725f4a9a701673df75 Parents: 2b5c6bc Author: Kenneth Knowles <[email protected]> Authored: Wed Jun 22 08:39:33 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Aug 8 12:31:11 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/SideInputContainer.java | 2 +- .../runners/direct/ViewEvaluatorFactory.java | 5 +- .../functions/FlinkProcessContext.java | 2 +- .../functions/SideInputInitializer.java | 2 +- .../spark/translation/SparkProcessContext.java | 2 +- .../apache/beam/sdk/transforms/DoFnTester.java | 2 +- .../org/apache/beam/sdk/transforms/ViewFn.java | 45 ++++ .../beam/sdk/util/DirectSideInputReader.java | 4 +- .../apache/beam/sdk/util/PCollectionViews.java | 228 +++++++++++++------ .../apache/beam/sdk/values/PCollectionView.java | 29 ++- .../sdk/testing/PCollectionViewTesting.java | 35 +-- 11 files changed, 262 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java index 7a19ed9..6458215 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java @@ -247,7 +247,7 @@ class SideInputContainer { @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view, window)).get(); - return view.fromIterableInternal(values); + return view.getViewFn().apply(values); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index 7a0b0f7..362e903 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -133,8 +133,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { /** * An in-process implementation of the {@link CreatePCollectionView} primitive. * - * This implementation requires the input {@link PCollection} to be an iterable, which is provided - * to {@link PCollectionView#fromIterableInternal(Iterable)}. + * This implementation requires the input {@link PCollection} to be an iterable + * of {@code WindowedValue<ElemT>}, which is provided + * to {@link PCollectionView#getViewFn()} for conversion to {@link ViewT}. */ public static final class WriteView<ElemT, ViewT> extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java index 3954d1f..64b93c8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java @@ -249,7 +249,7 @@ class FlinkProcessContext<InputT, OutputT> view.getTagInternal().getId(), new SideInputInitializer<>(view)); ViewT result = sideInputs.get(sideInputWindow); if (result == null) { - result = view.fromIterableInternal(Collections.<WindowedValue<?>>emptyList()); + result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList()); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java index 451b31b..a577b68 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java @@ -67,7 +67,7 @@ public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow> Iterable<WindowedValue<?>> elementsIterable = (List<WindowedValue<?>>) (List<?>) elements.getValue(); - resultMap.put(elements.getKey(), view.fromIterableInternal(elementsIterable)); + resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable)); } return resultMap; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 58ac03c..2f06a1c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -84,7 +84,7 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> BroadcastHelper<Iterable<WindowedValue<?>>> broadcastHelper = (BroadcastHelper<Iterable<WindowedValue<?>>>) mSideInputs.get(view.getTagInternal()); Iterable<WindowedValue<?>> contents = broadcastHelper.getValue(); - return view.fromIterableInternal(contents); + return view.getViewFn().apply(contents); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 8de1066..e2764eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -543,7 +543,7 @@ public class DoFnTester<InputT, OutputT> { return windowValue; } } - return view.fromIterableInternal(Collections.<WindowedValue<?>>emptyList()); + return view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java new file mode 100644 index 0000000..aa3cb0d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +import java.io.Serializable; + +/** + * A function to adapt a primitive "view" of a {@link PCollection} - some materialization + * specified in the Beam model and implemented by the runner - to a user-facing view type + * for side input. + * + * <p>Both the underlying primitive view and the user-facing view are immutable. + * + * <p>The most common case is using the {@link View} transforms to prepare a {@link PCollection} + * for use as a side input to {@link ParDo}. See {@link View#asSingleton()}, + * {@link View#asIterable()}, and {@link View#asMap()} for more detail on specific views + * available in the SDK. + * + * @param <PrimitiveViewT> the type of the underlying primitive view, provided by the runner + * <ViewT> the type of the value(s) accessible via this {@link PCollectionView} + */ +public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable { + /** + * A function to adapt a primitive view type to a desired view type. + */ + public abstract ViewT apply(PrimitiveViewT contents); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectSideInputReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectSideInputReader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectSideInputReader.java index f44c06e..c8d360c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectSideInputReader.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectSideInputReader.java @@ -60,9 +60,9 @@ public class DirectSideInputReader implements SideInputReader { } if (view.getWindowingStrategyInternal().getWindowFn() instanceof GlobalWindows) { - return view.fromIterableInternal(sideInputValues.get(tag)); + return view.getViewFn().apply(sideInputValues.get(tag)); } else { - return view.fromIterableInternal( + return view.getViewFn().apply( Iterables.filter(sideInputValues.get(tag), new Predicate<WindowedValue<?>>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java index d63fb96..581a98a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; import org.apache.beam.sdk.values.KV; @@ -63,8 +64,14 @@ public class PCollectionViews { Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, boolean hasDefault, - T defaultValue, + @Nullable T defaultValue, Coder<T> valueCoder) { + // TODO: as soon as runners are ported off the indicator classes, + // return new SimplePCollectionView<>( + // pipeline, + // new SingletonViewFn<K, V>(hasDefault, defaultValue, valueCoder), + // windowingStrategy, + // valueCoder); return new SingletonPCollectionView<>( pipeline, windowingStrategy, hasDefault, defaultValue, valueCoder); } @@ -77,6 +84,9 @@ public class PCollectionViews { Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) { + // TODO: as soon as runners are ported off the indicator classes, + // return new SimplePCollectionView<>( + // pipeline, new IterableViewFn<T>(), windowingStrategy, valueCoder); return new IterablePCollectionView<>(pipeline, windowingStrategy, valueCoder); } @@ -88,6 +98,9 @@ public class PCollectionViews { Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) { + // TODO: as soon as runners are ported off the indicator classes, + // return new SimplePCollectionView<>( + // pipeline, new ListViewFn<T>(), windowingStrategy, valueCoder); return new ListPCollectionView<>(pipeline, windowingStrategy, valueCoder); } @@ -99,8 +112,10 @@ public class PCollectionViews { Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<KV<K, V>> valueCoder) { - - return new MapPCollectionView<K, V, W>(pipeline, windowingStrategy, valueCoder); + // TODO: as soon as runners are ported off the indicator classes, + // return new SimplePCollectionView<>( + // pipeline, new MapViewFn<K, V>(), windowingStrategy, valueCoder); + return new MapPCollectionView<>(pipeline, windowingStrategy, valueCoder); } /** @@ -111,27 +126,119 @@ public class PCollectionViews { Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<KV<K, V>> valueCoder) { - return new MultimapPCollectionView<K, V, W>(pipeline, windowingStrategy, valueCoder); + // TODO: as soon as runners are ported off the indicator classes, + // return new SimplePCollectionView<>( + // pipeline, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder); + return new MultimapPCollectionView<>(pipeline, windowingStrategy, valueCoder); + } + + /** + * A public indicator class that this view is a singleton view. + * + * @deprecated Runners should not inspect the {@link PCollectionView} subclass, as it is an + * implementation detail. To specialize a side input, a runner should inspect the + * language-independent metadata of the {@link ViewFn}. + */ + @Deprecated + public static class SingletonPCollectionView<T, W extends BoundedWindow> + extends SimplePCollectionView<T, T, W> { + public SingletonPCollectionView( + Pipeline pipeline, + WindowingStrategy<?, W> windowingStrategy, + boolean hasDefault, + T defaultValue, + Coder<T> valueCoder) { + super( + pipeline, + new SingletonViewFn<>(hasDefault, defaultValue, valueCoder), + windowingStrategy, + valueCoder); + } + + public T getDefaultValue() { + return ((SingletonViewFn<T>) viewFn).getDefaultValue(); + } + } + + /** + * A public indicator class that this view is an iterable view. + * + * @deprecated Runners should not inspect the {@link PCollectionView} subclass, as it is an + * implementation detail. To specialize a side input, a runner should inspect the + * language-independent metadata of the {@link ViewFn}. + */ + @Deprecated + public static class IterablePCollectionView<ElemT, W extends BoundedWindow> + extends SimplePCollectionView<ElemT, Iterable<ElemT>, W> { + public IterablePCollectionView( + Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<ElemT> valueCoder) { + super(pipeline, new IterableViewFn<ElemT>(), windowingStrategy, valueCoder); + } + } + + /** + * A public indicator class that this view is a list view. + * + * @deprecated Runners should not inspect the {@link PCollectionView} subclass, as it is an + * implementation detail. To specialize a side input, a runner should inspect the + * language-independent metadata of the {@link ViewFn}. + */ + @Deprecated + public static class ListPCollectionView<ElemT, W extends BoundedWindow> + extends SimplePCollectionView<ElemT, List<ElemT>, W> { + public ListPCollectionView( + Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<ElemT> valueCoder) { + super(pipeline, new ListViewFn<ElemT>(), windowingStrategy, valueCoder); + } + } + + /** + * A public indicator class that this view is a map view. + * + * @deprecated Runners should not inspect the {@link PCollectionView} subclass, as it is an + * implementation detail. To specialize a side input, a runner should inspect the + * language-independent metadata of the {@link ViewFn}. + */ + @Deprecated + public static class MapPCollectionView<K, V, W extends BoundedWindow> + extends SimplePCollectionView<KV<K, V>, Map<K, V>, W> { + public MapPCollectionView( + Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<KV<K, V>> valueCoder) { + super(pipeline, new MapViewFn<K, V>(), windowingStrategy, valueCoder); + } } /** + * A public indicator class that this view is a multimap view. + * + * @deprecated Runners should not inspect the {@link PCollectionView} subclass, as it is an + * implementation detail. To specialize a side input, a runner should inspect the + * language-independent metadata of the {@link ViewFn}. + */ + @Deprecated + public static class MultimapPCollectionView<K, V, W extends BoundedWindow> + extends SimplePCollectionView<KV<K, V>, Map<K, Iterable<V>>, W> { + public MultimapPCollectionView( + Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<KV<K, V>> valueCoder) { + super(pipeline, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder); + } + } + + + /** * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}. * * <p>For internal use only. * * <p>Instantiate via {@link PCollectionViews#singletonView}. */ - public static class SingletonPCollectionView<T, W extends BoundedWindow> - extends PCollectionViewBase<T, T, W> { + private static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, T> { @Nullable private byte[] encodedDefaultValue; @Nullable private transient T defaultValue; @Nullable private Coder<T> valueCoder; private boolean hasDefault; - private SingletonPCollectionView( - Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, - boolean hasDefault, T defaultValue, Coder<T> valueCoder) { - super(pipeline, windowingStrategy, valueCoder); + private SingletonViewFn(boolean hasDefault, T defaultValue, Coder<T> valueCoder) { this.hasDefault = hasDefault; this.defaultValue = defaultValue; this.valueCoder = valueCoder; @@ -170,7 +277,7 @@ public class PCollectionViews { } @Override - protected T fromElements(Iterable<WindowedValue<T>> contents) { + public T apply(Iterable<WindowedValue<T>> contents) { try { return Iterables.getOnlyElement(contents).getValue(); } catch (NoSuchElementException exc) { @@ -178,7 +285,7 @@ public class PCollectionViews { } catch (IllegalArgumentException exc) { throw new IllegalArgumentException( "PCollection with more than one element " - + "accessed as a singleton view."); + + "accessed as a singleton view."); } } } @@ -190,15 +297,11 @@ public class PCollectionViews { * * <p>Instantiate via {@link PCollectionViews#iterableView}. */ - public static class IterablePCollectionView<T, W extends BoundedWindow> - extends PCollectionViewBase<T, Iterable<T>, W> { - private IterablePCollectionView( - Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) { - super(pipeline, windowingStrategy, valueCoder); - } + private static class IterableViewFn<T> + extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> { @Override - protected Iterable<T> fromElements(Iterable<WindowedValue<T>> contents) { + public Iterable<T> apply(Iterable<WindowedValue<T>> contents) { return Iterables.unmodifiableIterable( Iterables.transform(contents, new Function<WindowedValue<T>, T>() { @SuppressWarnings("unchecked") @@ -217,15 +320,9 @@ public class PCollectionViews { * * <p>Instantiate via {@link PCollectionViews#listView}. */ - public static class ListPCollectionView<T, W extends BoundedWindow> - extends PCollectionViewBase<T, List<T>, W> { - private ListPCollectionView( - Pipeline pipeline, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) { - super(pipeline, windowingStrategy, valueCoder); - } - + private static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, List<T>> { @Override - protected List<T> fromElements(Iterable<WindowedValue<T>> contents) { + public List<T> apply(Iterable<WindowedValue<T>> contents) { return ImmutableList.copyOf( Iterables.transform(contents, new Function<WindowedValue<T>, T>() { @SuppressWarnings("unchecked") @@ -240,20 +337,12 @@ public class PCollectionViews { /** * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>} * to {@code Map<K, Iterable<V>>}. - * - * <p>For internal use only. */ - public static class MultimapPCollectionView<K, V, W extends BoundedWindow> - extends PCollectionViewBase<KV<K, V>, Map<K, Iterable<V>>, W> { - private MultimapPCollectionView( - Pipeline pipeline, - WindowingStrategy<?, W> windowingStrategy, - Coder<KV<K, V>> valueCoder) { - super(pipeline, windowingStrategy, valueCoder); - } + private static class MultimapViewFn<K, V> + extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> { @Override - protected Map<K, Iterable<V>> fromElements(Iterable<WindowedValue<KV<K, V>>> elements) { + public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> elements) { Multimap<K, V> multimap = HashMultimap.create(); for (WindowedValue<KV<K, V>> elem : elements) { KV<K, V> kv = elem.getValue(); @@ -267,25 +356,16 @@ public class PCollectionViews { } /** - * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with - * one value per key to {@code Map<K, V>}. - * - * <p>For internal use only. + * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with one value per key + * to {@code Map<K, V>}. */ - public static class MapPCollectionView<K, V, W extends BoundedWindow> - extends PCollectionViewBase<KV<K, V>, Map<K, V>, W> { - private MapPCollectionView( - Pipeline pipeline, - WindowingStrategy<?, W> windowingStrategy, - Coder<KV<K, V>> valueCoder) { - super(pipeline, windowingStrategy, valueCoder); - } - + private static class MapViewFn<K, V> + extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> { /** * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}. */ @Override - protected Map<K, V> fromElements(Iterable<WindowedValue<KV<K, V>>> elements) { + public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) { Map<K, V> map = new HashMap<>(); for (WindowedValue<KV<K, V>> elem : elements) { KV<K, V> kv = elem.getValue(); @@ -302,7 +382,7 @@ public class PCollectionViews { * A base class for {@link PCollectionView} implementations, with additional type parameters * that are not visible at pipeline assembly time when the view is used as a side input. */ - private abstract static class PCollectionViewBase<ElemT, ViewT, W extends BoundedWindow> + private static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow> extends PValueBase implements PCollectionView<ViewT> { /** A unique tag for the view, typed according to the elements underlying the view. */ @@ -315,18 +395,23 @@ public class PCollectionViews { private Coder<Iterable<WindowedValue<ElemT>>> coder; /** - * Implement this to complete the implementation. It is a conversion function from - * all of the elements of the underlying {@link PCollection} to the value of the view. + * The typed {@link ViewFn} for this view. + * + * @deprecated Access to this variable from subclasses is temporary, for migrating away + * from language-specific inspections. */ - protected abstract ViewT fromElements(Iterable<WindowedValue<ElemT>> elements); + @Deprecated + protected ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn; /** * Call this constructor to initialize the fields for which this base class provides * boilerplate accessors. */ - protected PCollectionViewBase( + // TODO: make private as soon as runners are ported off indicator subclasses + protected SimplePCollectionView( Pipeline pipeline, TupleTag<Iterable<WindowedValue<ElemT>>> tag, + ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, WindowingStrategy<?, W> windowingStrategy, Coder<ElemT> valueCoder) { super(pipeline); @@ -335,6 +420,7 @@ public class PCollectionViews { } this.tag = tag; this.windowingStrategy = windowingStrategy; + this.viewFn = viewFn; this.coder = IterableCoder.of(WindowedValue.getFullCoder( valueCoder, windowingStrategy.getWindowFn().windowCoder())); @@ -344,30 +430,42 @@ public class PCollectionViews { * Call this constructor to initialize the fields for which this base class provides * boilerplate accessors, with an auto-generated tag. */ - protected PCollectionViewBase( + // TODO: make private as soon as runners are ported off indicator subclasses + protected SimplePCollectionView( Pipeline pipeline, + ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, WindowingStrategy<?, W> windowingStrategy, Coder<ElemT> valueCoder) { - this(pipeline, new TupleTag<Iterable<WindowedValue<ElemT>>>(), windowingStrategy, valueCoder); + this( + pipeline, + new TupleTag<Iterable<WindowedValue<ElemT>>>(), + viewFn, + windowingStrategy, + valueCoder); } /** - * For serialization only. Do not use directly. Subclasses should call from their own - * protected no-argument constructor. + * For serialization only. Do not use directly. */ @SuppressWarnings("unused") // used for serialization - protected PCollectionViewBase() { + protected SimplePCollectionView() { super(); } @Override - public ViewT fromIterableInternal(Iterable<WindowedValue<?>> elements) { + public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() { // Safe cast: it is required that the rest of the SDK maintain the invariant // that a PCollectionView is only provided an iterable for the elements of an // appropriately typed PCollection. @SuppressWarnings({"rawtypes", "unchecked"}) - Iterable<WindowedValue<ElemT>> typedElements = (Iterable) elements; - return fromElements(typedElements); + ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn; + return untypedViewFn; + } + + @Override + @Deprecated + public ViewT fromIterableInternal(Iterable<WindowedValue<?>> elements) { + return getViewFn().apply(elements); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java index fb3bfab..20f1071 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.values; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -44,22 +45,42 @@ import java.io.Serializable; */ public interface PCollectionView<T> extends PValue, Serializable { /** - * A unique identifier, for internal use. + * @deprecated this method will be removed entirely. The {@link PCollection} underlying a side + * input, is part of the side input's specification with a {@link ParDo} transform, which will + * obtain that information via a package-private channel. */ + @Deprecated public TupleTag<Iterable<WindowedValue<?>>> getTagInternal(); /** - * For internal use only. + * @deprecated use {@link #getViewFn()} for now, but eventually get the needed information via the + * side input specification on the {@link ParDo} transform. */ + @Deprecated public T fromIterableInternal(Iterable<WindowedValue<?>> contents); /** - * For internal use only. + * @deprecated this method will be removed entirely. The {@link ViewFn} for a side input is an + * attribute of the side input's specification with a {@link ParDo} transform, which will + * obtain this specification via a package-private channel. */ + @Deprecated + public ViewFn<Iterable<WindowedValue<?>>, T> getViewFn(); + + /** + * @deprecated this method will be removed entirely. The {@link PCollection} underlying a side + * input, including its {@link WindowingStrategy}, is part of the side input's specification + * with a {@link ParDo} transform, which will obtain that information via a package-private + * channel. + */ + @Deprecated public WindowingStrategy<?, ?> getWindowingStrategyInternal(); /** - * For internal use only. + * @deprecated this method will be removed entirely. The {@link PCollection} underlying a side + * input, including its {@link Coder}, is part of the side input's specification with a {@link + * ParDo} transform, which will obtain that information via a package-private channel. */ + @Deprecated public Coder<Iterable<WindowedValue<?>>> getCoderInternal(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java index 427f2da..517ed68 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.testing; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; @@ -83,16 +83,9 @@ public final class PCollectionViewTesting { DEFAULT_NONEMPTY_WINDOW.maxTimestamp().plus(DEFAULT_WINDOW_MSECS)); /** - * A specialization of {@link SerializableFunction} just for putting together - * {@link PCollectionView} instances. - */ - public static interface ViewFn<ElemT, ViewT> - extends SerializableFunction<Iterable<WindowedValue<ElemT>>, ViewT> { } - - /** * A {@link ViewFn} that returns the provided contents as a fully lazy iterable. */ - public static class IdentityViewFn<T> implements ViewFn<T, Iterable<T>> { + public static class IdentityViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> { @Override public Iterable<T> apply(Iterable<WindowedValue<T>> contents) { return Iterables.transform(contents, new Function<WindowedValue<T>, T>() { @@ -110,7 +103,7 @@ public final class PCollectionViewTesting { * <p>Only for use in testing scenarios with small collections. If there are more elements * provided than {@code Integer.MAX_VALUE} then behavior is unpredictable. */ - public static class LengthViewFn<T> implements ViewFn<T, Long> { + public static class LengthViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Long> { @Override public Long apply(Iterable<WindowedValue<T>> contents) { return (long) Iterables.size(contents); @@ -120,7 +113,8 @@ public final class PCollectionViewTesting { /** * A {@link ViewFn} that always returns the value with which it is instantiated. */ - public static class ConstantViewFn<ElemT, ViewT> implements ViewFn<ElemT, ViewT> { + public static class ConstantViewFn<ElemT, ViewT> + extends ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> { private ViewT value; public ConstantViewFn(ViewT value) { @@ -148,7 +142,7 @@ public final class PCollectionViewTesting { */ public static <ElemT, ViewT> PCollectionView<ViewT> testingView( TupleTag<Iterable<WindowedValue<ElemT>>> tag, - ViewFn<ElemT, ViewT> viewFn, + ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, Coder<ElemT> elemCoder) { return testingView( tag, @@ -178,7 +172,7 @@ public final class PCollectionViewTesting { */ public static <ElemT, ViewT> PCollectionView<ViewT> testingView( TupleTag<Iterable<WindowedValue<ElemT>>> tag, - ViewFn<ElemT, ViewT> viewFn, + ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, Coder<ElemT> elemCoder, WindowingStrategy<?, ?> windowingStrategy) { return new PCollectionViewFromParts<>( @@ -233,13 +227,13 @@ public final class PCollectionViewTesting { extends PValueBase implements PCollectionView<ViewT> { private TupleTag<Iterable<WindowedValue<ElemT>>> tag; - private ViewFn<ElemT, ViewT> viewFn; + private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn; private WindowingStrategy<?, ?> windowingStrategy; private Coder<Iterable<WindowedValue<ElemT>>> coder; public PCollectionViewFromParts( TupleTag<Iterable<WindowedValue<ElemT>>> tag, - ViewFn<ElemT, ViewT> viewFn, + ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, WindowingStrategy<?, ?> windowingStrategy, Coder<Iterable<WindowedValue<ElemT>>> coder) { this.tag = tag; @@ -256,8 +250,17 @@ public final class PCollectionViewTesting { @SuppressWarnings({"unchecked", "rawtypes"}) @Override + @Deprecated public ViewT fromIterableInternal(Iterable<WindowedValue<?>> contents) { - return (ViewT) viewFn.apply((Iterable) contents); + return getViewFn().apply(contents); + } + + @Override + public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() { + // Safe cast; runners must maintain type safety + @SuppressWarnings({"unchecked", "rawtypes"}) + ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn; + return untypedViewFn; } @Override
