http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java index 6c385d7..1853248 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java @@ -42,12 +42,12 @@ import org.apache.beam.sdk.values.PCollectionView; class StreamingViewOverrides { static class StreamingCreatePCollectionViewFactory<ElemT, ViewT> extends SingleInputOutputOverrideFactory< - PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> { + PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> { @Override - public PTransformReplacement<PCollection<ElemT>, PCollectionView<ViewT>> + public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform( AppliedPTransform< - PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> + PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> transform) { StreamingCreatePCollectionView<ElemT, ViewT> streamingView = new StreamingCreatePCollectionView<>(transform.getTransform().getView()); @@ -56,7 +56,7 @@ class StreamingViewOverrides { } private static class StreamingCreatePCollectionView<ElemT, ViewT> - extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> { + extends PTransform<PCollection<ElemT>, PCollection<ElemT>> { private final PCollectionView<ViewT> view; private StreamingCreatePCollectionView(PCollectionView<ViewT> view) { @@ -64,7 +64,7 @@ class StreamingViewOverrides { } @Override - public PCollectionView<ViewT> expand(PCollection<ElemT> input) { + public PCollection<ElemT> expand(PCollection<ElemT> input) { return input .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults()) .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder())))
http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 89dc2d5..53215f6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -920,15 +920,15 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); - assertEquals(5, steps.size()); + assertEquals(9, steps.size()); @SuppressWarnings("unchecked") List<Map<String, Object>> toIsmRecordOutputs = - (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO); + (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO); assertTrue( Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format")); - Step collectionToSingletonStep = steps.get(4); + Step collectionToSingletonStep = steps.get(8); assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); } http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 64aa35a..ac5e0cd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -508,50 +508,6 @@ public final class TransformTranslator { }; } - private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton() { - return new TransformEvaluator<View.AsSingleton<T>>() { - @Override - public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) { - Iterable<? extends WindowedValue<?>> iter = - context.getWindowedValues(context.getInput(transform)); - PCollectionView<T> output = context.getOutput(transform); - Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal(); - - @SuppressWarnings("unchecked") - Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter; - - context.putPView(output, iterCast, coderInternal); - } - - @Override - public String toNativeString() { - return "collect()"; - } - }; - } - - private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter() { - return new TransformEvaluator<View.AsIterable<T>>() { - @Override - public void evaluate(View.AsIterable<T> transform, EvaluationContext context) { - Iterable<? extends WindowedValue<?>> iter = - context.getWindowedValues(context.getInput(transform)); - PCollectionView<Iterable<T>> output = context.getOutput(transform); - Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal(); - - @SuppressWarnings("unchecked") - Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter; - - context.putPView(output, iterCast, coderInternal); - } - - @Override - public String toNativeString() { - return "collect()"; - } - }; - } - private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>> createPCollView() { return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>() { @@ -560,7 +516,7 @@ public final class TransformTranslator { EvaluationContext context) { Iterable<? extends WindowedValue<?>> iter = context.getWindowedValues(context.getInput(transform)); - PCollectionView<WriteT> output = context.getOutput(transform); + PCollectionView<WriteT> output = transform.getView(); Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal(); @SuppressWarnings("unchecked") @@ -645,8 +601,8 @@ public final class TransformTranslator { EVALUATORS.put(Combine.PerKey.class, combinePerKey()); EVALUATORS.put(Flatten.PCollections.class, flattenPColl()); EVALUATORS.put(Create.Values.class, create()); - EVALUATORS.put(View.AsSingleton.class, viewAsSingleton()); - EVALUATORS.put(View.AsIterable.class, viewAsIter()); +// EVALUATORS.put(View.AsSingleton.class, viewAsSingleton()); +// EVALUATORS.put(View.AsIterable.class, viewAsIter()); EVALUATORS.put(View.CreatePCollectionView.class, createPCollView()); EVALUATORS.put(Window.Assign.class, window()); EVALUATORS.put(Reshuffle.class, reshuffle()); http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index 2f0e8ef..ee1ce7b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -24,10 +24,12 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -39,6 +41,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; @@ -165,7 +168,7 @@ public class TransformHierarchy { * nodes. */ public void setOutput(POutput output) { - for (PValue value : output.expand().values()) { + for (PCollection<?> value : fullyExpand(output).values()) { if (!producers.containsKey(value)) { producers.put(value, current); value.finishSpecifyingOutput( @@ -226,6 +229,47 @@ public class TransformHierarchy { return current; } + private Map<TupleTag<?>, PCollection<?>> fullyExpand(POutput output) { + Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>(); + for (Map.Entry<TupleTag<?>, PValue> value : output.expand().entrySet()) { + if (value.getValue() instanceof PCollection) { + PCollection<?> previous = result.put(value.getKey(), (PCollection<?>) value.getValue()); + checkArgument( + previous == null, + "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s", + output, + TupleTag.class.getSimpleName(), + value.getKey(), + previous, + value.getValue()); + } else { + if (value.getValue().expand().size() == 1 + && Iterables.getOnlyElement(value.getValue().expand().values()) + .equals(value.getValue())) { + throw new IllegalStateException( + String.format( + "Non %s %s that expands into itself %s", + PCollection.class.getSimpleName(), + PValue.class.getSimpleName(), + value.getValue())); + } + for (Map.Entry<TupleTag<?>, PCollection<?>> valueComponent : + fullyExpand(value.getValue()).entrySet()) { + PCollection<?> previous = result.put(valueComponent.getKey(), valueComponent.getValue()); + checkArgument( + previous == null, + "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s", + output, + TupleTag.class.getSimpleName(), + valueComponent.getKey(), + previous, + valueComponent.getValue()); + } + } + } + return result; + } + /** * Provides internal tracking of transform relationships with helper methods * for initialization and ordered visitation. http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 9e1cc71..6a90bcf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -1277,14 +1277,15 @@ public class Combine { public PCollectionView<OutputT> expand(PCollection<InputT> input) { PCollection<OutputT> combined = input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout)); - return combined.apply( - CreatePCollectionView.<OutputT, OutputT>of( - PCollectionViews.singletonView( - combined, - input.getWindowingStrategy(), - insertDefault, - insertDefault ? fn.defaultValue() : null, - combined.getCoder()))); + PCollectionView<OutputT> view = + PCollectionViews.singletonView( + combined, + input.getWindowingStrategy(), + insertDefault, + insertDefault ? fn.defaultValue() : null, + combined.getCoder()); + combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view)); + return view; } public int getFanout() { http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 073c750..331b143 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -257,8 +257,10 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView( - input, input.getWindowingStrategy(), input.getCoder()))); + PCollectionView<List<T>> view = + PCollectionViews.listView(input, input.getWindowingStrategy(), input.getCoder()); + input.apply(CreatePCollectionView.<T, List<T>>of(view)); + return view; } } @@ -282,8 +284,10 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView( - input, input.getWindowingStrategy(), input.getCoder()))); + PCollectionView<Iterable<T>> view = + PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder()); + input.apply(CreatePCollectionView.<T, Iterable<T>>of(view)); + return view; } } @@ -423,11 +427,10 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of( - PCollectionViews.multimapView( - input, - input.getWindowingStrategy(), - input.getCoder()))); + PCollectionView<Map<K, Iterable<V>>> view = + PCollectionViews.multimapView(input, input.getWindowingStrategy(), input.getCoder()); + input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); + return view; } } @@ -459,11 +462,10 @@ public class View { throw new IllegalStateException("Unable to create a side-input view from input", e); } - return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of( - PCollectionViews.mapView( - input, - input.getWindowingStrategy(), - input.getCoder()))); + PCollectionView<Map<K, V>> view = + PCollectionViews.mapView(input, input.getWindowingStrategy(), input.getCoder()); + input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view)); + return view; } } @@ -480,7 +482,7 @@ public class View { */ @Internal public static class CreatePCollectionView<ElemT, ViewT> - extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> { + extends PTransform<PCollection<ElemT>, PCollection<ElemT>> { private PCollectionView<ViewT> view; private CreatePCollectionView(PCollectionView<ViewT> view) { @@ -506,8 +508,10 @@ public class View { } @Override - public PCollectionView<ViewT> expand(PCollection<ElemT> input) { - return view; + public PCollection<ElemT> expand(PCollection<ElemT> input) { + return PCollection.<ElemT>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) + .setCoder(input.getCoder()); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java index f210fd8..4063d11 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java @@ -20,6 +20,8 @@ package org.apache.beam.sdk.values; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import java.util.Collections; +import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Internal; @@ -226,6 +228,11 @@ public class PCollection<T> extends PValueBase implements PValue { return super.getName(); } + @Override + public final Map<TupleTag<?>, PValue> expand() { + return Collections.<TupleTag<?>, PValue>singletonMap(tag, this); + } + /** * Sets the name of this {@link PCollection}. Returns {@code this}. * @@ -314,6 +321,11 @@ public class PCollection<T> extends PValueBase implements PValue { private IsBounded isBounded; + /** + * A local {@link TupleTag} used in the expansion of this {@link PValueBase}. + */ + private final TupleTag<?> tag = new TupleTag<>(); + private PCollection(Pipeline p) { super(p); } http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java index 74887c7..5e2e2c3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java @@ -170,6 +170,15 @@ public class PCollectionViews { } /** + * Returns if a default value was specified. + */ + @Deprecated + @Internal + public boolean hasDefault() { + return hasDefault; + } + + /** * Returns the default value that was specified. * * <p>For internal use only. @@ -491,5 +500,10 @@ public class PCollectionViews { public String toString() { return MoreObjects.toStringHelper(this).add("tag", tag).toString(); } + + @Override + public Map<TupleTag<?>, PValue> expand() { + return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java index 6f638d7..f312eac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java @@ -19,8 +19,6 @@ package org.apache.beam.sdk.values; import static com.google.common.base.Preconditions.checkState; -import java.util.Collections; -import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.PTransform; @@ -87,11 +85,6 @@ public abstract class PValueBase implements PValue { private String name; /** - * A local {@link TupleTag} used in the expansion of this {@link PValueBase}. - */ - private TupleTag<?> tag = new TupleTag<>(); - - /** * Whether this {@link PValueBase} has been finalized, and its core * properties, e.g., name, can no longer be changed. */ @@ -108,11 +101,6 @@ public abstract class PValueBase implements PValue { } @Override - public final Map<TupleTag<?>, PValue> expand() { - return Collections.<TupleTag<?>, PValue>singletonMap(tag, this); - } - - @Override public void finishSpecifying(PInput input, PTransform<?, ?> transform) { finishedSpecifying = true; } http://git-wip-us.apache.org/repos/asf/beam/blob/ccf73448/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java index adf27f8..aaf8b91 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java @@ -22,7 +22,9 @@ import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -37,6 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.PValueBase; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -349,5 +352,10 @@ public final class PCollectionViewTesting { .add("viewFn", viewFn) .toString(); } + + @Override + public Map<TupleTag<?>, PValue> expand() { + return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection); + } } }
