Repository: incubator-beam Updated Branches: refs/heads/master a43f9b820 -> a32a26208
Move expansion of Window.Bound into DirectPipelineRunner In the Beam model, windowing is a primitive concept. The expansion provided by the SDK is not implementable except via access to privileged methods not intended for Beam pipeline authors. This change is a precursor to eliminating these privileged entirely. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42969cb6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42969cb6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42969cb6 Branch: refs/heads/master Commit: 42969cb62222744c41debe575857fb7d093ce527 Parents: 5f24cef Author: Kenneth Knowles <[email protected]> Authored: Thu Apr 7 17:34:19 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Fri Apr 8 15:03:00 2016 -0700 ---------------------------------------------------------------------- .../sdk/runners/DirectPipelineRunner.java | 47 ++++++++++++++++++++ .../inprocess/WindowEvaluatorFactory.java | 13 +++--- .../sdk/transforms/windowing/Window.java | 21 ++------- .../cloud/dataflow/sdk/util/AssignWindows.java | 46 +++++++++++++++++++ 4 files changed, 104 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java index 35e392b..57e6116 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java @@ -47,7 +47,10 @@ import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.Partition; import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; import com.google.cloud.dataflow.sdk.util.AppliedCombineFn; +import com.google.cloud.dataflow.sdk.util.AssignWindows; import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly; import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; @@ -57,6 +60,7 @@ import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunners; import com.google.cloud.dataflow.sdk.util.SerializableUtils; import com.google.cloud.dataflow.sdk.util.TestCredential; import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.util.common.Counter; import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.values.KV; @@ -255,6 +259,9 @@ public class DirectPipelineRunner } else if (transform instanceof GroupByKey) { return (OutputT) ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); + } else if (transform instanceof Window.Bound) { + return (OutputT) + ((PCollection) input).apply(new AssignWindowsAndSetStrategy((Window.Bound) transform)); } else { return super.apply(transform, input); } @@ -400,6 +407,46 @@ public class DirectPipelineRunner } } + private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow> + extends PTransform<PCollection<T>, PCollection<T>> { + + private final Window.Bound<T> wrapped; + + public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) { + this.wrapped = wrapped; + } + + @Override + public PCollection<T> apply(PCollection<T> input) { + WindowingStrategy<?, ?> outputStrategy = + wrapped.getOutputStrategyInternal(input.getWindowingStrategy()); + + WindowFn<T, BoundedWindow> windowFn = + (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn(); + + // If the Window.Bound transform only changed parts other than the WindowFn, then + // we skip AssignWindows even though it should be harmless in a perfect world. + // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly + // crash if another GBK is performed without explicitly setting the WindowFn. So we skip + // AssignWindows in this case. + if (wrapped.getWindowFn() == null) { + return input.apply("Identity", ParDo.of(new IdentityFn<T>())) + .setWindowingStrategyInternal(outputStrategy); + } else { + return input + .apply("AssignWindows", new AssignWindows<T, BoundedWindow>(windowFn)) + .setWindowingStrategyInternal(outputStrategy); + } + } + } + + private static class IdentityFn<T> extends DoFn<T, T> { + @Override + public void processElement(ProcessContext c) { + c.output(c.element()); + } + } + /** * Apply the override for AvroIO.Write.Bound if the user requested sharding controls * greater than one. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java index 0bdfac9..e553dbb 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java @@ -61,23 +61,24 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory { if (fn == null) { return PassthroughTransformEvaluator.create(transform, outputBundle); } - return new WindowIntoEvaluator<>(fn, evaluationContext, outputBundle); + return new WindowIntoEvaluator<>(transform, fn, outputBundle); } private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> { + private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> + transform; private final WindowFn<InputT, ?> windowFn; - private final InProcessEvaluationContext context; private final UncommittedBundle<InputT> outputBundle; @SuppressWarnings("unchecked") public WindowIntoEvaluator( + AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform, WindowFn<? super InputT, ?> windowFn, - InProcessEvaluationContext context, UncommittedBundle<InputT> outputBundle) { + this.outputBundle = outputBundle; + this.transform = transform; // Safe contravariant cast this.windowFn = (WindowFn<InputT, ?>) windowFn; - this.context = context; - this.outputBundle = outputBundle; } @Override @@ -98,7 +99,7 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory { @Override public InProcessTransformResult finishBundle() throws Exception { - return StepTransformResult.withoutHold(null).addOutput(outputBundle).build(); + return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java index 1e7282d..20b3ed5 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Window.java @@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -360,6 +359,7 @@ public class Window { */ public static class Bound<T> extends PTransform<PCollection<T>, PCollection<T>> { + @Nullable private final WindowFn<? super T, ?> windowFn; @Nullable private final Trigger<?> trigger; @Nullable private final AccumulationMode mode; @@ -587,21 +587,8 @@ public class Window { public PCollection<T> apply(PCollection<T> input) { WindowingStrategy<?, ?> outputStrategy = getOutputStrategyInternal(input.getWindowingStrategy()); - PCollection<T> output; - if (windowFn != null) { - // If the windowFn changed, we create a primitive, and run the AssignWindows operation here. - output = assignWindows(input, windowFn); - } else { - // If the windowFn didn't change, we just run a pass-through transform and then set the - // new windowing strategy. - output = input.apply(Window.<T>identity()); - } - return output.setWindowingStrategyInternal(outputStrategy); - } - - private <T, W extends BoundedWindow> PCollection<T> assignWindows( - PCollection<T> input, WindowFn<? super T, W> windowFn) { - return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<T, W>(windowFn))); + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), outputStrategy, input.isBounded()); } @Override @@ -639,7 +626,7 @@ public class Window { * windows to be merged again as part of the next * {@link com.google.cloud.dataflow.sdk.transforms.GroupByKey}. */ - public static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> { + private static class Remerge<T> extends PTransform<PCollection<T>, PCollection<T>> { @Override public PCollection<T> apply(PCollection<T> input) { WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42969cb6/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java new file mode 100644 index 0000000..57f489d --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/AssignWindows.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.values.PCollection; + +/** + * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a + * {@link PCollection} to windows according to the provided {@link WindowFn}. + * + * @param <T> Type of elements being windowed + * @param <W> Window type + */ +public class AssignWindows<T, W extends BoundedWindow> + extends PTransform<PCollection<T>, PCollection<T>> { + + private WindowFn<? super T, W> fn; + + public AssignWindows(WindowFn<? super T, W> fn) { + this.fn = fn; + } + + @Override + public PCollection<T> apply(PCollection<T> input) { + return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn))); + } +}
