Repository: beam Updated Branches: refs/heads/master b6b1c8b7c -> eeb043299
Reflect #assignsToOneWindow in WindowingStrategy Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6f9fdea Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6f9fdea Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6f9fdea Branch: refs/heads/master Commit: c6f9fdeadaeda68be86e454377f8c665c22a7c0f Parents: 9f904dc Author: Thomas Groh <[email protected]> Authored: Tue Jun 27 15:03:11 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon Jul 10 14:54:44 2017 -0700 ---------------------------------------------------------------------- .../runners/core/construction/WindowingStrategyTranslation.java | 1 + .../core/construction/WindowingStrategyTranslationTest.java | 3 +++ sdks/common/runner-api/src/main/proto/beam_runner_api.proto | 5 +++++ 3 files changed, 9 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c6f9fdea/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 88ebc01..1456a3f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -307,6 +307,7 @@ public class WindowingStrategyTranslation implements Serializable { .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) .setTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger())) .setWindowFn(windowFnSpec) + .setAssignsToOneWindow(windowingStrategy.getWindowFn().assignsToOneWindow()) .setWindowCoderId( components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); http://git-wip-us.apache.org/repos/asf/beam/blob/c6f9fdea/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java index e406545..7a57fd7 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java @@ -116,5 +116,8 @@ public class WindowingStrategyTranslationTest { protoComponents.getCodersOrThrow( components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); + assertThat( + proto.getAssignsToOneWindow(), + equalTo(windowingStrategy.getWindowFn().assignsToOneWindow())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c6f9fdea/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 24e907a..93fea44 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -436,6 +436,11 @@ message WindowingStrategy { // (Required) Indicate whether empty on-time panes should be omitted. OnTimeBehavior OnTimeBehavior = 9; + + // (Required) Whether or not the window fn assigns inputs to exactly one window + // + // This knowledge is required for some optimizations + bool assigns_to_one_window = 10; } // Whether or not a PCollection's WindowFn is non-merging, merging, or
