Correctly forward dependsOnlyOnWindow(), etc, in WindowingStrategy Previously, WindowingStrategy contained an implemenation of OutputTimeFn that did not properly forward these important methods.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d94a6f15 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d94a6f15 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d94a6f15 Branch: refs/heads/master Commit: d94a6f15cf9f606495f5dfb1723333cfba5217f9 Parents: a20786d Author: Kenneth Knowles <[email protected]> Authored: Tue May 24 13:10:38 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 24 13:10:38 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/util/WindowingStrategy.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d94a6f15/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java index d98793f..19b11cd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java @@ -287,7 +287,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab * </ul> */ private static class CombineWindowFnOutputTimes<W extends BoundedWindow> - extends OutputTimeFn.Defaults<W> { + extends OutputTimeFn<W> { private final OutputTimeFn<? super W> outputTimeFn; private final WindowFn<?, W> windowFn; @@ -313,5 +313,15 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) { return outputTimeFn.merge(newWindow, timestamps); } + + @Override + public final boolean dependsOnlyOnWindow() { + return outputTimeFn.dependsOnlyOnWindow(); + } + + @Override + public boolean dependsOnlyOnEarliestInputTimestamp() { + return outputTimeFn.dependsOnlyOnEarliestInputTimestamp(); + } } }
