http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java index 6c91088..932ccd6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java @@ -20,11 +20,17 @@ package org.apache.beam.runners.spark.util; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.InMemoryMultimapSideInputView; import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.Materializations.MultimapView; +import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -34,7 +40,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; /** - * A {@link SideInputReader} for thw SparkRunner. + * A {@link SideInputReader} for the SparkRunner. */ public class SparkSideInputReader implements SideInputReader { private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs; @@ -60,26 +66,30 @@ public class SparkSideInputReader implements SideInputReader { //--- match the appropriate sideInput window. // a tag will point to all matching sideInputs, that is all windows. // now that we've obtained the appropriate sideInputWindow, all that's left is to filter by it. - Iterable<WindowedValue<?>> availableSideInputs = - (Iterable<WindowedValue<?>>) windowedBroadcastHelper.getValue().getValue(); - Iterable<WindowedValue<?>> sideInputForWindow = - Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() { - @Override - public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) { - if (sideInputCandidate == null) { - return false; - } - // first match of a sideInputWindow to the elementWindow is good enough. - for (BoundedWindow sideInputCandidateWindow: sideInputCandidate.getWindows()) { - if (sideInputCandidateWindow.equals(sideInputWindow)) { - return true; + Iterable<WindowedValue<KV<?, ?>>> availableSideInputs = + (Iterable<WindowedValue<KV<?, ?>>>) windowedBroadcastHelper.getValue().getValue(); + Iterable<KV<?, ?>> sideInputForWindow = + Iterables.transform( + Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() { + @Override + public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) { + if (sideInputCandidate == null) { + return false; + } + return Iterables.contains(sideInputCandidate.getWindows(), sideInputWindow); } - } - // no match found. - return false; - } - }); - return view.getViewFn().apply(sideInputForWindow); + }), + new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() { + @Override + public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) { + return windowedValue.getValue(); + } + }); + + ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn(); + Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder(); + return viewFn.apply( + InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) sideInputForWindow)); } @Override
http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 3c5b55b..f86e9cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.transforms.View.VoidKeyToMultimapMaterialization; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; @@ -1274,14 +1275,16 @@ public class Combine { public PCollectionView<OutputT> expand(PCollection<InputT> input) { PCollection<OutputT> combined = input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout)); - PCollectionView<OutputT> view = - PCollectionViews.singletonView( - combined, + PCollection<KV<Void, OutputT>> materializationInput = + combined.apply(new VoidKeyToMultimapMaterialization<OutputT>()); + PCollectionView<OutputT> view = PCollectionViews.singletonView( + materializationInput, input.getWindowingStrategy(), insertDefault, insertDefault ? fn.defaultValue() : null, - combined.getCoder()); - combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view)); + combined.getCoder()); + materializationInput.apply( + CreatePCollectionView.<KV<Void, OutputT>, OutputT>of(view)); return view; } http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 6168710..d71f0fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; @@ -602,7 +601,24 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { return windowValue; } } - return view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList()); + // Fallback to returning the default materialization if no data was supplied. + // This is really to support singleton views with default values. + + // TODO: Update this to supply a materialization dependent on actual URN of materialization. + // Currently the SDK only supports the multimap materialization and it expects a + // mapping function. + checkState(Materializations.MULTIMAP_MATERIALIZATION_URN.equals( + view.getViewFn().getMaterialization().getUrn()), + "Only materializations of type %s supported, received %s", + Materializations.MULTIMAP_MATERIALIZATION_URN, + view.getViewFn().getMaterialization().getUrn()); + return ((ViewFn<Materializations.MultimapView, T>) view.getViewFn()).apply( + new Materializations.MultimapView<Object, Object>() { + @Override + public Iterable<Object> get(Object o) { + return Collections.emptyList(); + } + }); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java index 6e4f83d..e606919 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Materializations.java @@ -21,7 +21,6 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.util.WindowedValue; /** * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> @@ -32,29 +31,37 @@ import org.apache.beam.sdk.util.WindowedValue; @Internal public class Materializations { /** - * The URN for a {@link Materialization} where the primitive view type is an iterable of fully + * The URN for a {@link Materialization} where the primitive view type is an multimap of fully * specified windowed values. */ @Experimental(Kind.CORE_RUNNERS_ONLY) - public static final String ITERABLE_MATERIALIZATION_URN = - "urn:beam:sideinput:materialization:iterable:0.1"; + public static final String MULTIMAP_MATERIALIZATION_URN = + "urn:beam:sideinput:materialization:multimap:0.1"; + + /** + * Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when it declares to + * use the {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap materialization}. + */ + public interface MultimapView<K, V> { + Iterable<V> get(K k); + } /** * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> * - * <p>A {@link Materialization} where the primitive view type is an iterable of fully specified - * windowed values. + * <p>A {@link Materialization} where the primitive view type is a multimap with fully + * specified windowed keys. */ @Internal - public static <T> Materialization<Iterable<WindowedValue<T>>> iterable() { - return new IterableMaterialization<>(); + public static <K, V> Materialization<MultimapView<K, V>> multimap() { + return new MultimapMaterialization<>(); } - private static class IterableMaterialization<T> - implements Materialization<Iterable<WindowedValue<T>>> { + private static class MultimapMaterialization<K, V> + implements Materialization<MultimapView<K, V>> { @Override public String getUrn() { - return ITERABLE_MATERIALIZATION_URN; + return MULTIMAP_MATERIALIZATION_URN; } } } http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index eaa7925..ec8233e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -24,6 +24,8 @@ import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; @@ -258,9 +260,13 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - PCollectionView<List<T>> view = - PCollectionViews.listView(input, input.getWindowingStrategy(), input.getCoder()); - input.apply(CreatePCollectionView.<T, List<T>>of(view)); + PCollection<KV<Void, T>> materializationInput = + input.apply(new VoidKeyToMultimapMaterialization<T>()); + PCollectionView<List<T>> view = PCollectionViews.listView( + materializationInput, + materializationInput.getWindowingStrategy()); + materializationInput.apply( + CreatePCollectionView.<KV<Void, T>, List<T>>of(view)); return view; } } @@ -285,9 +291,13 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - PCollectionView<Iterable<T>> view = - PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder()); - input.apply(CreatePCollectionView.<T, Iterable<T>>of(view)); + PCollection<KV<Void, T>> materializationInput = + input.apply(new VoidKeyToMultimapMaterialization<T>()); + PCollectionView<Iterable<T>> view = PCollectionViews.iterableView( + materializationInput, + materializationInput.getWindowingStrategy()); + materializationInput.apply( + CreatePCollectionView.<KV<Void, T>, Iterable<T>>of(view)); return view; } } @@ -428,9 +438,13 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - PCollectionView<Map<K, Iterable<V>>> view = - PCollectionViews.multimapView(input, input.getWindowingStrategy(), input.getCoder()); - input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); + PCollection<KV<Void, KV<K, V>>> materializationInput = + input.apply(new VoidKeyToMultimapMaterialization<KV<K, V>>()); + PCollectionView<Map<K, Iterable<V>>> view = PCollectionViews.multimapView( + materializationInput, + materializationInput.getWindowingStrategy()); + materializationInput.apply( + CreatePCollectionView.<KV<Void, KV<K, V>>, Map<K, Iterable<V>>>of(view)); return view; } } @@ -463,9 +477,13 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - PCollectionView<Map<K, V>> view = - PCollectionViews.mapView(input, input.getWindowingStrategy(), input.getCoder()); - input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view)); + PCollection<KV<Void, KV<K, V>>> materializationInput = + input.apply(new VoidKeyToMultimapMaterialization<KV<K, V>>()); + PCollectionView<Map<K, V>> view = PCollectionViews.mapView( + materializationInput, + materializationInput.getWindowingStrategy()); + materializationInput.apply( + CreatePCollectionView.<KV<Void, KV<K, V>>, Map<K, V>>of(view)); return view; } } @@ -474,6 +492,31 @@ public class View { // Internal details below /** + * A {@link PTransform} which converts all values into {@link KV}s with {@link Void} keys. + * + * <p>TODO: Replace this materialization with specializations that optimize the various SDK + * requested views. + */ + @Internal + static class VoidKeyToMultimapMaterialization<T> + extends PTransform<PCollection<T>, PCollection<KV<Void, T>>> { + + private static class VoidKeyToMultimapMaterializationDoFn<T> extends DoFn<T, KV<Void, T>> { + @ProcessElement + public void processElement(ProcessContext ctxt) { + ctxt.output(KV.of((Void) null, ctxt.element())); + } + } + + @Override + public PCollection<KV<Void, T>> expand(PCollection<T> input) { + PCollection output = input.apply(ParDo.of(new VoidKeyToMultimapMaterializationDoFn<>())); + output.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder())); + return output; + } + } + + /** * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> * * <p>Creates a primitive {@link PCollectionView}. http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java index d51a917..9291bc6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ViewFn.java @@ -36,8 +36,8 @@ import org.apache.beam.sdk.values.PCollectionView; * {@link View#asIterable()}, and {@link View#asMap()} for more detail on specific views * available in the SDK. * - * @param <PrimitiveViewT> the type of the underlying primitive view, provided by the runner - * {@code <ViewT>} the type of the value(s) accessible via this {@link PCollectionView} + * @param <PrimitiveViewT> the type of the underlying primitive view required + * @param <ViewT> the type of the value(s) accessible via this {@link PCollectionView} */ @Internal public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable { @@ -49,5 +49,5 @@ public abstract class ViewFn<PrimitiveViewT, ViewT> implements Serializable { /** * A function to adapt a primitive view type to a desired view type. */ - public abstract ViewT apply(PrimitiveViewT contents); + public abstract ViewT apply(PrimitiveViewT primitiveViewT); } http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java index 7d87412..c212c34 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.apache.beam.sdk.util.WindowedValue; /** * A {@link PCollectionView PCollectionView<T>} is an immutable view of a {@link PCollection} @@ -72,7 +71,7 @@ public interface PCollectionView<T> extends PValue, Serializable { */ @Deprecated @Internal - TupleTag<Iterable<WindowedValue<?>>> getTagInternal(); + TupleTag<?> getTagInternal(); /** * <b>For internal use only.</b> @@ -83,7 +82,7 @@ public interface PCollectionView<T> extends PValue, Serializable { */ @Deprecated @Internal - ViewFn<Iterable<WindowedValue<?>>, T> getViewFn(); + ViewFn<?, T> getViewFn(); /** * <b>For internal use only.</b> @@ -116,5 +115,5 @@ public interface PCollectionView<T> extends PValue, Serializable { */ @Deprecated @Internal - Coder<Iterable<WindowedValue<?>>> getCoderInternal(); + Coder<?> getCoderInternal(); } http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java index ed8fb76..30277f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java @@ -17,14 +17,13 @@ */ package org.apache.beam.sdk.values; -import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -36,16 +35,15 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.transforms.Materialization; import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.Materializations.MultimapView; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.WindowedValue; /** * <b>For internal use only; no backwards compatibility guarantees.</b> @@ -56,88 +54,79 @@ import org.apache.beam.sdk.util.WindowedValue; public class PCollectionViews { /** - * Returns a {@code PCollectionView<T>} capable of processing elements encoded using the provided - * {@link Coder} and windowed using the provided * {@link WindowingStrategy}. + * Returns a {@code PCollectionView<T>} capable of processing elements windowed + * using the provided {@link WindowingStrategy}. * * <p>If {@code hasDefault} is {@code true}, then the view will take on the value * {@code defaultValue} for any empty windows. */ public static <T, W extends BoundedWindow> PCollectionView<T> singletonView( - PCollection<T> pCollection, + PCollection<KV<Void, T>> pCollection, WindowingStrategy<?, W> windowingStrategy, boolean hasDefault, @Nullable T defaultValue, - Coder<T> valueCoder) { + Coder<T> defaultValueCoder) { return new SimplePCollectionView<>( pCollection, - new SingletonViewFn<>(hasDefault, defaultValue, valueCoder), + new SingletonViewFn<>(hasDefault, defaultValue, defaultValueCoder), windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - windowingStrategy, - valueCoder); + windowingStrategy); } /** - * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements encoded using the - * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. + * Returns a {@code PCollectionView<Iterable<T>>} capable of processing elements windowed + * using the provided {@link WindowingStrategy}. */ public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableView( - PCollection<T> pCollection, - WindowingStrategy<?, W> windowingStrategy, - Coder<T> valueCoder) { + PCollection<KV<Void, T>> pCollection, + WindowingStrategy<?, W> windowingStrategy) { return new SimplePCollectionView<>( pCollection, new IterableViewFn<T>(), windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - windowingStrategy, - valueCoder); + windowingStrategy); } /** - * Returns a {@code PCollectionView<List<T>>} capable of processing elements encoded using the - * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. + * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed + * using the provided {@link WindowingStrategy}. */ public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView( - PCollection<T> pCollection, - WindowingStrategy<?, W> windowingStrategy, - Coder<T> valueCoder) { + PCollection<KV<Void, T>> pCollection, + WindowingStrategy<?, W> windowingStrategy) { return new SimplePCollectionView<>( pCollection, new ListViewFn<T>(), windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - windowingStrategy, - valueCoder); + windowingStrategy); } /** - * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements encoded using the - * provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. + * Returns a {@code PCollectionView<Map<K, V>>} capable of processing elements windowed + * using the provided {@link WindowingStrategy}. */ public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapView( - PCollection<KV<K, V>> pCollection, - WindowingStrategy<?, W> windowingStrategy, - Coder<KV<K, V>> valueCoder) { + PCollection<KV<Void, KV<K, V>>> pCollection, + WindowingStrategy<?, W> windowingStrategy) { return new SimplePCollectionView<>( pCollection, new MapViewFn<K, V>(), windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - windowingStrategy, - valueCoder); + windowingStrategy); } /** - * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements encoded - * using the provided {@link Coder} and windowed using the provided {@link WindowingStrategy}. + * Returns a {@code PCollectionView<Map<K, Iterable<V>>>} capable of processing elements windowed + * using the provided {@link WindowingStrategy}. */ public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> multimapView( - PCollection<KV<K, V>> pCollection, - WindowingStrategy<?, W> windowingStrategy, - Coder<KV<K, V>> valueCoder) { + PCollection<KV<Void, KV<K, V>>> pCollection, + WindowingStrategy<?, W> windowingStrategy) { return new SimplePCollectionView<>( pCollection, new MultimapViewFn<K, V>(), windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - windowingStrategy, - valueCoder); + windowingStrategy); } /** @@ -153,18 +142,15 @@ public class PCollectionViews { } /** - * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}. + * Implementation which is able to adapt a multimap materialization to a {@code T}. * * <p>For internal use only. * * <p>Instantiate via {@link PCollectionViews#singletonView}. - * - * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive - * view type. */ - @Deprecated @Experimental(Kind.CORE_RUNNERS_ONLY) - public static class SingletonViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, T> { + public static class SingletonViewFn<T> + extends ViewFn<MultimapView<Void, T>, T> { @Nullable private byte[] encodedDefaultValue; @Nullable private transient T defaultValue; @Nullable private Coder<T> valueCoder; @@ -204,9 +190,12 @@ public class PCollectionViews { } // Lazily decode the default value once synchronized (this) { - if (encodedDefaultValue != null && defaultValue == null) { + if (encodedDefaultValue != null) { try { defaultValue = CoderUtils.decodeFromByteArray(valueCoder, encodedDefaultValue); + // Clear the encoded default value to free the reference once we have the object + // version. Also, this will guarantee that the value will only be decoded once. + encodedDefaultValue = null; } catch (IOException e) { throw new RuntimeException("Unexpected IOException: ", e); } @@ -216,84 +205,67 @@ public class PCollectionViews { } @Override - public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { - return Materializations.iterable(); + public Materialization<MultimapView<Void, T>> getMaterialization() { + return Materializations.multimap(); } @Override - public T apply(Iterable<WindowedValue<T>> contents) { + public T apply(MultimapView<Void, T> primitiveViewT) { try { - return Iterables.getOnlyElement(contents).getValue(); + return Iterables.getOnlyElement(primitiveViewT.get(null)); } catch (NoSuchElementException exc) { return getDefaultValue(); } catch (IllegalArgumentException exc) { throw new IllegalArgumentException( - "PCollection with more than one element " - + "accessed as a singleton view."); + "PCollection with more than one element accessed as a singleton view."); } } } /** - * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code Iterable<T>}. + * Implementation which is able to adapt a multimap materialization to a {@code Iterable<T>}. * * <p>For internal use only. * * <p>Instantiate via {@link PCollectionViews#iterableView}. - * - * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive - * view type. */ - @Deprecated @Experimental(Kind.CORE_RUNNERS_ONLY) public static class IterableViewFn<T> - extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> { + extends ViewFn<MultimapView<Void, T>, Iterable<T>> { + @Override - public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { - return Materializations.iterable(); + public Materialization<MultimapView<Void, T>> getMaterialization() { + return Materializations.multimap(); } @Override - public Iterable<T> apply(Iterable<WindowedValue<T>> contents) { - return Iterables.unmodifiableIterable( - Iterables.transform(contents, new Function<WindowedValue<T>, T>() { - @SuppressWarnings("unchecked") - @Override - public T apply(WindowedValue<T> input) { - return input.getValue(); - } - })); + public Iterable<T> apply(MultimapView<Void, T> primitiveViewT) { + return Iterables.unmodifiableIterable(primitiveViewT.get(null)); } } /** - * Implementation of conversion {@code Iterable<WindowedValue<T>>} to {@code List<T>}. + * Implementation which is able to adapt a multimap materialization to a {@code List<T>}. * * <p>For internal use only. * * <p>Instantiate via {@link PCollectionViews#listView}. - * - * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive - * view type. */ - @Deprecated @Experimental(Kind.CORE_RUNNERS_ONLY) - public static class ListViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, List<T>> { + public static class ListViewFn<T> + extends ViewFn<MultimapView<Void, T>, List<T>> { @Override - public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { - return Materializations.iterable(); + public Materialization<MultimapView<Void, T>> getMaterialization() { + return Materializations.multimap(); } @Override - public List<T> apply(Iterable<WindowedValue<T>> contents) { - return ImmutableList.copyOf( - Iterables.transform(contents, new Function<WindowedValue<T>, T>() { - @SuppressWarnings("unchecked") - @Override - public T apply(WindowedValue<T> input) { - return input.getValue(); + public List<T> apply(MultimapView<Void, T> primitiveViewT) { + List<T> list = new ArrayList<>(); + for (T t : primitiveViewT.get(null)) { + list.add(t); } - })); + return Collections.unmodifiableList(list); } @Override @@ -308,27 +280,29 @@ public class PCollectionViews { } /** - * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>>} - * to {@code Map<K, Iterable<V>>}. + * Implementation which is able to adapt a multimap materialization to a + * {@code Map<K, Iterable<V>>}. + * + * <p>For internal use only. * - * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive - * view type. + * <p>Instantiate via {@link PCollectionViews#multimapView}. */ - @Deprecated @Experimental(Kind.CORE_RUNNERS_ONLY) public static class MultimapViewFn<K, V> - extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, Iterable<V>>> { + extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, Iterable<V>>> { @Override - public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() { - return Materializations.iterable(); + public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() { + return Materializations.multimap(); } @Override - public Map<K, Iterable<V>> apply(Iterable<WindowedValue<KV<K, V>>> elements) { + public Map<K, Iterable<V>> apply( + MultimapView<Void, KV<K, V>> primitiveViewT) { + // TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are + // using structural value equality. Multimap<K, V> multimap = HashMultimap.create(); - for (WindowedValue<KV<K, V>> elem : elements) { - KV<K, V> kv = elem.getValue(); - multimap.put(kv.getKey(), kv.getValue()); + for (KV<K, V> elem : primitiveViewT.get(null)) { + multimap.put(elem.getKey(), elem.getValue()); } // Safe covariant cast that Java cannot express without rawtypes, even with unchecked casts @SuppressWarnings({"unchecked", "rawtypes"}) @@ -338,32 +312,31 @@ public class PCollectionViews { } /** - * Implementation of conversion {@code Iterable<WindowedValue<KV<K, V>>} with one value per key to - * {@code Map<K, V>}. + * Implementation which is able to adapt a multimap materialization to a {@code Map<K, V>}. + * + * <p>For internal use only. * - * @deprecated Beam views are migrating off of {@code Iterable<WindowedValue<T>>} as a primitive - * view type. + * <p>Instantiate via {@link PCollectionViews#mapView}. */ - @Deprecated @Experimental(Kind.CORE_RUNNERS_ONLY) - public static class MapViewFn<K, V> extends ViewFn<Iterable<WindowedValue<KV<K, V>>>, Map<K, V>> { + public static class MapViewFn<K, V> + extends ViewFn<MultimapView<Void, KV<K, V>>, Map<K, V>> { + @Override - public Materialization<Iterable<WindowedValue<KV<K, V>>>> getMaterialization() { - return Materializations.iterable(); + public Materialization<MultimapView<Void, KV<K, V>>> getMaterialization() { + return Materializations.multimap(); } - /** - * Input iterable must actually be {@code Iterable<WindowedValue<KV<K, V>>>}. - */ @Override - public Map<K, V> apply(Iterable<WindowedValue<KV<K, V>>> elements) { + public Map<K, V> apply(MultimapView<Void, KV<K, V>> primitiveViewT) { + // TODO: BEAM-3071 - fix this so that we aren't relying on Java equality and are + // using structural value equality. Map<K, V> map = new HashMap<>(); - for (WindowedValue<KV<K, V>> elem : elements) { - KV<K, V> kv = elem.getValue(); - if (map.containsKey(kv.getKey())) { - throw new IllegalArgumentException("Duplicate values for " + kv.getKey()); + for (KV<K, V> elem : primitiveViewT.get(null)) { + if (map.containsKey(elem.getKey())) { + throw new IllegalArgumentException("Duplicate values for " + elem.getKey()); } - map.put(kv.getKey(), kv.getValue()); + map.put(elem.getKey(), elem.getValue()); } return Collections.unmodifiableMap(map); } @@ -375,14 +348,14 @@ public class PCollectionViews { * * <p>For internal use only. */ - public static class SimplePCollectionView<ElemT, ViewT, W extends BoundedWindow> + public static class SimplePCollectionView<ElemT, PrimitiveViewT, ViewT, W extends BoundedWindow> extends PValueBase implements PCollectionView<ViewT> { /** The {@link PCollection} this view was originally created from. */ private transient PCollection<ElemT> pCollection; /** A unique tag for the view, typed according to the elements underlying the view. */ - private TupleTag<Iterable<WindowedValue<ElemT>>> tag; + private TupleTag<PrimitiveViewT> tag; private WindowMappingFn<W> windowMappingFn; @@ -390,12 +363,12 @@ public class PCollectionViews { private WindowingStrategy<?, W> windowingStrategy; /** The coder for the elements underlying the view. */ - private @Nullable Coder<Iterable<WindowedValue<ElemT>>> coder; + private @Nullable Coder<ElemT> coder; /** * The typed {@link ViewFn} for this view. */ - private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn; + private ViewFn<PrimitiveViewT, ViewT> viewFn; /** * Call this constructor to initialize the fields for which this base class provides @@ -403,11 +376,10 @@ public class PCollectionViews { */ private SimplePCollectionView( PCollection<ElemT> pCollection, - TupleTag<Iterable<WindowedValue<ElemT>>> tag, - ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, + TupleTag<PrimitiveViewT> tag, + ViewFn<PrimitiveViewT, ViewT> viewFn, WindowMappingFn<W> windowMappingFn, - WindowingStrategy<?, W> windowingStrategy, - Coder<ElemT> valueCoder) { + WindowingStrategy<?, W> windowingStrategy) { super(pCollection.getPipeline()); this.pCollection = pCollection; if (windowingStrategy.getWindowFn() instanceof InvalidWindows) { @@ -417,9 +389,7 @@ public class PCollectionViews { this.tag = tag; this.windowingStrategy = windowingStrategy; this.viewFn = viewFn; - this.coder = - IterableCoder.of(WindowedValue.getFullCoder( - valueCoder, windowingStrategy.getWindowFn().windowCoder())); + this.coder = pCollection.getCoder(); } /** @@ -428,27 +398,20 @@ public class PCollectionViews { */ private SimplePCollectionView( PCollection<ElemT> pCollection, - ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, + ViewFn<PrimitiveViewT, ViewT> viewFn, WindowMappingFn<W> windowMappingFn, - WindowingStrategy<?, W> windowingStrategy, - Coder<ElemT> valueCoder) { + WindowingStrategy<?, W> windowingStrategy) { this( pCollection, - new TupleTag<Iterable<WindowedValue<ElemT>>>(), + new TupleTag<PrimitiveViewT>(), viewFn, windowMappingFn, - windowingStrategy, - valueCoder); + windowingStrategy); } @Override - public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() { - // Safe cast: it is required that the rest of the SDK maintain the invariant - // that a PCollectionView is only provided an iterable for the elements of an - // appropriately typed PCollection. - @SuppressWarnings({"rawtypes", "unchecked"}) - ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn; - return untypedViewFn; + public ViewFn<PrimitiveViewT, ViewT> getViewFn() { + return viewFn; } @Override @@ -467,13 +430,8 @@ public class PCollectionViews { * <p>For internal use only by runner implementors. */ @Override - public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() { - // Safe cast: It is required that the rest of the SDK maintain the invariant that - // this tag is only used to access the contents of an appropriately typed underlying - // PCollection - @SuppressWarnings({"rawtypes", "unchecked"}) - TupleTag<Iterable<WindowedValue<?>>> untypedTag = (TupleTag) tag; - return untypedTag; + public TupleTag<?> getTagInternal() { + return tag; } /** @@ -488,12 +446,8 @@ public class PCollectionViews { } @Override - public Coder<Iterable<WindowedValue<?>>> getCoderInternal() { - // Safe cast: It is required that the rest of the SDK only use this untyped coder - // for the elements of an appropriately typed underlying PCollection. - @SuppressWarnings({"rawtypes", "unchecked"}) - Coder<Iterable<WindowedValue<?>>> untypedCoder = (Coder) coder; - return untypedCoder; + public Coder<?> getCoderInternal() { + return coder; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java index aaf8b91..e7fd9b8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java @@ -18,344 +18,57 @@ package org.apache.beam.sdk.testing; -import com.google.common.base.Function; -import com.google.common.base.MoreObjects; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; +import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.Objects; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.transforms.Materialization; -import org.apache.beam.sdk.transforms.Materializations; -import org.apache.beam.sdk.transforms.ViewFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.PValueBase; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.joda.time.Duration; -import org.joda.time.Instant; /** - * Methods for creating and using {@link PCollectionView} instances. + * Methods for testing {@link PCollectionView}s. */ public final class PCollectionViewTesting { - - // Do not instantiate; static methods only - private PCollectionViewTesting() { } - - /** - * The length of the default window, which is an {@link IntervalWindow}, but kept encapsulated - * as it is not for the user to know what sort of window it is. - */ - private static final long DEFAULT_WINDOW_MSECS = 1000 * 60 * 60; - - /** - * A default windowing strategy. Tests that are not concerned with the windowing - * strategy should not specify it, and all views will use this. - */ - public static final WindowingStrategy<?, ?> DEFAULT_WINDOWING_STRATEGY = - WindowingStrategy.of(FixedWindows.of(new Duration(DEFAULT_WINDOW_MSECS))); - - /** - * A default window into which test elements will be placed, if the window is - * not explicitly overridden. - */ - public static final BoundedWindow DEFAULT_NONEMPTY_WINDOW = - new IntervalWindow(new Instant(0), new Instant(DEFAULT_WINDOW_MSECS)); - - /** - * A timestamp in the {@link #DEFAULT_NONEMPTY_WINDOW}. - */ - public static final Instant DEFAULT_TIMESTAMP = DEFAULT_NONEMPTY_WINDOW.maxTimestamp().minus(1); - - /** - * A window into which no element will be placed by methods in this class, unless explicitly - * requested. - */ - public static final BoundedWindow DEFAULT_EMPTY_WINDOW = new IntervalWindow( - DEFAULT_NONEMPTY_WINDOW.maxTimestamp(), - DEFAULT_NONEMPTY_WINDOW.maxTimestamp().plus(DEFAULT_WINDOW_MSECS)); - - /** - * A {@link ViewFn} that returns the provided contents as a fully lazy iterable. - */ - public static class IdentityViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Iterable<T>> { - @Override - public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { - return Materializations.iterable(); - } - - @Override - public Iterable<T> apply(Iterable<WindowedValue<T>> contents) { - return Iterables.transform(contents, new Function<WindowedValue<T>, T>() { - @Override - public T apply(WindowedValue<T> windowedValue) { - return windowedValue.getValue(); - } - }); - } - } - - /** - * A {@link ViewFn} that traverses the whole iterable eagerly and returns the number of elements. - * - * <p>Only for use in testing scenarios with small collections. If there are more elements - * provided than {@code Integer.MAX_VALUE} then behavior is unpredictable. - */ - public static class LengthViewFn<T> extends ViewFn<Iterable<WindowedValue<T>>, Long> { - @Override - public Materialization<Iterable<WindowedValue<T>>> getMaterialization() { - return Materializations.iterable(); - } - - @Override - public Long apply(Iterable<WindowedValue<T>> contents) { - return (long) Iterables.size(contents); - } - } - - /** - * A {@link ViewFn} that always returns the value with which it is instantiated. - */ - public static class ConstantViewFn<ElemT, ViewT> - extends ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> { - private ViewT value; - - public ConstantViewFn(ViewT value) { - this.value = value; - } - - @Override - public Materialization<Iterable<WindowedValue<ElemT>>> getMaterialization() { - return Materializations.iterable(); - } - - @Override - public ViewT apply(Iterable<WindowedValue<ElemT>> contents) { - return value; - } - } - - /** - * A {@link PCollectionView} explicitly built from a {@link TupleTag} - * and conversion {@link ViewFn}, and an element coder, using the - * {@link #DEFAULT_WINDOWING_STRATEGY}. - * - * <p>This method is only recommended for use by runner implementors to test their - * implementations. It is very easy to construct a {@link PCollectionView} that does - * not respect the invariants required for proper functioning. - * - * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed - * values provided to the view during execution, results are unpredictable. It is recommended - * that the values be prepared via {@link #contentsInDefaultWindow}. - */ - public static <ElemT, ViewT> PCollectionView<ViewT> testingView( - TupleTag<Iterable<WindowedValue<ElemT>>> tag, - ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, - Coder<ElemT> elemCoder, - WindowingStrategy<?, ?> windowingStrategy) { - return testingView(null, tag, viewFn, elemCoder, windowingStrategy); - } - - /** - * The default {@link Coder} used for windowed values, given an element {@link Coder}. - */ - public static <T> Coder<WindowedValue<T>> defaultWindowedValueCoder(Coder<T> elemCoder) { - return WindowedValue.getFullCoder( - elemCoder, DEFAULT_WINDOWING_STRATEGY.getWindowFn().windowCoder()); - } - - /** - * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link - * WindowingStrategy}, {@link Coder}, and conversion function. - * - * <p>This method is only recommended for use by runner implementors to test their - * implementations. It is very easy to construct a {@link PCollectionView} that does not respect - * the invariants required for proper functioning. - * - * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed - * values provided to the view during execution, results are unpredictable. - */ - public static <ElemT, ViewT> PCollectionView<ViewT> testingView( - PCollection<ElemT> pCollection, - TupleTag<Iterable<WindowedValue<ElemT>>> tag, - ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, - Coder<ElemT> elemCoder, - WindowingStrategy<?, ?> windowingStrategy) { - return testingView( - pCollection, - tag, - viewFn, - windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), - elemCoder, - windowingStrategy); - } - - /** - * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link - * WindowingStrategy}, {@link Coder}, {@link ViewFn} and {@link WindowMappingFn}. - * - * <p>This method is only recommended for use by runner implementors to test their - * implementations. It is very easy to construct a {@link PCollectionView} that does not respect - * the invariants required for proper functioning. - * - * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed - * values provided to the view during execution, results are unpredictable. - */ - public static <ElemT, ViewT> PCollectionView<ViewT> testingView( - PCollection<ElemT> pCollection, - TupleTag<Iterable<WindowedValue<ElemT>>> tag, - ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, - WindowMappingFn<?> windowMappingFn, - Coder<ElemT> elemCoder, - WindowingStrategy<?, ?> windowingStrategy) { - return new PCollectionViewFromParts<>( - pCollection, - tag, - viewFn, - windowMappingFn, - windowingStrategy, - IterableCoder.of( - WindowedValue.getFullCoder(elemCoder, windowingStrategy.getWindowFn().windowCoder()))); - } - - /** - * Places the given {@code value} in the {@link #DEFAULT_NONEMPTY_WINDOW}. - */ - public static <T> WindowedValue<T> valueInDefaultWindow(T value) { - return WindowedValue.of(value, DEFAULT_TIMESTAMP, DEFAULT_NONEMPTY_WINDOW, PaneInfo.NO_FIRING); - } - - /** - * Prepares {@code values} for reading as the contents of a {@link PCollectionView} side input. - */ - @SafeVarargs - public static <T> Iterable<WindowedValue<T>> contentsInDefaultWindow(T... values) - throws Exception { - List<WindowedValue<T>> windowedValues = Lists.newArrayList(); - for (T value : values) { - windowedValues.add(valueInDefaultWindow(value)); - } - return windowedValues; - } - - /** - * Prepares {@code values} for reading as the contents of a {@link PCollectionView} side input. - */ - public static <T> Iterable<WindowedValue<T>> contentsInDefaultWindow(Iterable<T> values) - throws Exception { - List<WindowedValue<T>> windowedValues = Lists.newArrayList(); - for (T value : values) { - windowedValues.add(valueInDefaultWindow(value)); - } - return windowedValues; - } - - // Internal details below here - - /** - * A {@link PCollectionView} explicitly built from its {@link TupleTag}, - * {@link WindowingStrategy}, and conversion function. - * - * <p>Instantiate via {@link #testingView}. - */ - private static class PCollectionViewFromParts<ElemT, ViewT> - extends PValueBase - implements PCollectionView<ViewT> { - private PCollection<ElemT> pCollection; - private TupleTag<Iterable<WindowedValue<ElemT>>> tag; - private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn; - private WindowMappingFn<?> windowMappingFn; - private WindowingStrategy<?, ?> windowingStrategy; - private Coder<Iterable<WindowedValue<ElemT>>> coder; - - public PCollectionViewFromParts( - PCollection<ElemT> pCollection, - TupleTag<Iterable<WindowedValue<ElemT>>> tag, - ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, - WindowMappingFn<?> windowMappingFn, - WindowingStrategy<?, ?> windowingStrategy, - Coder<Iterable<WindowedValue<ElemT>>> coder) { - this.pCollection = pCollection; - this.tag = tag; - this.viewFn = viewFn; - this.windowMappingFn = windowMappingFn; - this.windowingStrategy = windowingStrategy; - this.coder = coder; - } - - @Override - public PCollection<?> getPCollection() { - return pCollection; - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() { - return (TupleTag) tag; - } - - @Override - public ViewFn<Iterable<WindowedValue<?>>, ViewT> getViewFn() { - // Safe cast; runners must maintain type safety - @SuppressWarnings({"unchecked", "rawtypes"}) - ViewFn<Iterable<WindowedValue<?>>, ViewT> untypedViewFn = (ViewFn) viewFn; - return untypedViewFn; - } - - @Override - public WindowMappingFn<?> getWindowMappingFn() { - return windowMappingFn; - } - - @Override - public WindowingStrategy<?, ?> getWindowingStrategyInternal() { - return windowingStrategy; - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - public Coder<Iterable<WindowedValue<?>>> getCoderInternal() { - return (Coder) coder; - } - - @Override - public int hashCode() { - return Objects.hash(tag); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof PCollectionView)) { - return false; + public static List<Object> materializeValuesFor( + PTransform<?, ? extends PCollectionView<?>> viewTransformClass, Object ... values) { + List<Object> rval = new ArrayList<>(); + // Currently all view materializations are the same where the data is shared underneath + // the void/null key. Once this changes, these materializations will differ but test code + // should not worry about what these look like if they are relying on the ViewFn to "undo" + // the conversion. + if (View.AsSingleton.class.equals(viewTransformClass.getClass())) { + for (Object value : values) { + rval.add(KV.of(null, value)); } - @SuppressWarnings("unchecked") - PCollectionView<?> otherView = (PCollectionView<?>) other; - return tag.equals(otherView.getTagInternal()); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("tag", tag) - .add("viewFn", viewFn) - .toString(); - } - - @Override - public Map<TupleTag<?>, PValue> expand() { - return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection); - } + } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) { + for (Object value : values) { + rval.add(KV.of(null, value)); + } + } else if (View.AsList.class.equals(viewTransformClass.getClass())) { + for (Object value : values) { + rval.add(KV.of(null, value)); + } + } else if (View.AsMap.class.equals(viewTransformClass.getClass())) { + for (Object value : values) { + rval.add(KV.of(null, value)); + } + } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) { + for (Object value : values) { + rval.add(KV.of(null, value)); + } + } else { + throw new IllegalArgumentException(String.format( + "Unknown type of view %s. Supported views are %s.", + viewTransformClass.getClass(), + ImmutableSet.of( + View.AsSingleton.class, + View.AsIterable.class, + View.AsList.class, + View.AsMap.class, + View.AsMultimap.class))); + } + return Collections.unmodifiableList(rval); } } http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 5cb9e18..cff6b2d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -37,9 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; @@ -297,9 +295,8 @@ public class DoFnTesterTest { @Test public void fnWithSideInputDefault() throws Exception { PCollection<Integer> pCollection = p.apply(Create.empty(VarIntCoder.of())); - final PCollectionView<Integer> value = - PCollectionViews.singletonView( - pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); + final PCollectionView<Integer> value = pCollection.apply( + View.<Integer>asSingleton().withDefaultValue(0)); try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) { tester.processElement(1); @@ -313,9 +310,8 @@ public class DoFnTesterTest { @Test public void fnWithSideInputExplicit() throws Exception { PCollection<Integer> pCollection = p.apply(Create.of(-2)); - final PCollectionView<Integer> value = - PCollectionViews.singletonView( - pCollection, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); + final PCollectionView<Integer> value = pCollection.apply( + View.<Integer>asSingleton().withDefaultValue(0)); try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) { tester.setSideInput(value, GlobalWindow.INSTANCE, -2); http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 1ccd5d6..7d20532 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -209,7 +209,7 @@ class BatchLoads<DestinationT> checkArgument(numFileShards > 0); Pipeline p = input.getPipeline(); final PCollectionView<String> jobIdTokenView = createJobIdView(p); - final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView); + final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(p, jobIdTokenView); // The user-supplied triggeringDuration is often chosen to to control how many BigQuery load // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this // is set to a large value, currently we have to buffer all the data unti the trigger fires. @@ -295,7 +295,7 @@ class BatchLoads<DestinationT> public WriteResult expandUntriggered(PCollection<KV<DestinationT, TableRow>> input) { Pipeline p = input.getPipeline(); final PCollectionView<String> jobIdTokenView = createJobIdView(p); - final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView); + final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(p, jobIdTokenView); PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow = input.apply( "rewindowIntoGlobal", @@ -364,8 +364,10 @@ class BatchLoads<DestinationT> } // Generate the temporary-file prefix. - private PCollectionView<String> createTempFilePrefixView(PCollectionView<String> jobIdView) { - return ((PCollection<String>) jobIdView.getPCollection()) + private PCollectionView<String> createTempFilePrefixView( + Pipeline p, final PCollectionView<String> jobIdView) { + return p + .apply(Create.of("")) .apply( "GetTempFilePrefix", ParDo.of( @@ -382,13 +384,13 @@ class BatchLoads<DestinationT> resolveTempLocation( tempLocationRoot, "BigQueryWriteTemp", - c.element()); + c.sideInput(jobIdView)); LOG.info( "Writing BigQuery temporary files to {} before loading them.", tempLocation); c.output(tempLocation); } - })) + }).withSideInputs(jobIdView)) .apply("TempFilePrefixView", View.<String>asSingleton()); }
