Require deterministic window coders.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d98294c2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d98294c2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d98294c2 Branch: refs/heads/master Commit: d98294c2bd13b45522ea584485bd62e900144c88 Parents: 72f5020 Author: Robert Bradshaw <[email protected]> Authored: Tue Apr 11 10:49:13 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Thu Apr 20 08:55:03 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coders.py | 16 ---------------- .../apache_beam/coders/coders_test_common.py | 1 - sdks/python/apache_beam/transforms/core.py | 4 ++++ sdks/python/apache_beam/transforms/window.py | 18 +++++++++++++++++- 4 files changed, 21 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 8ef0a46..4f75182 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -688,22 +688,6 @@ class IterableCoder(FastCoder): return hash((type(self), self._elem_coder)) -class WindowCoder(PickleCoder): - """Coder for windows in windowed values.""" - - def _create_impl(self): - return coder_impl.CallbackCoderImpl(pickle.dumps, pickle.loads) - - def is_deterministic(self): - # Note that WindowCoder as implemented is not deterministic because the - # implementation simply pickles windows. See the corresponding comments - # on PickleCoder for more details. - return False - - def as_cloud_object(self): - return super(WindowCoder, self).as_cloud_object(is_pair_like=False) - - class GlobalWindowCoder(SingletonCoder): """Coder for global windows.""" http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 6491ea8..da0bde3 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -62,7 +62,6 @@ class CodersTest(unittest.TestCase): coders.FastCoder, coders.ProtoCoder, coders.ToStringCoder, - coders.WindowCoder, coders.IntervalWindowCoder]) assert not standard - cls.seen, standard - cls.seen assert not standard - cls.seen_nested, standard - cls.seen_nested http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 2d28eec..9f66c39 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1185,6 +1185,10 @@ class Windowing(object): else: raise ValueError( 'accumulation_mode must be provided for non-trivial triggers') + if not windowfn.get_window_coder().is_deterministic(): + raise ValueError( + 'window fn (%s) does not have a determanistic coder (%s)' % ( + window_fn, windowfn.get_window_coder())) self.windowfn = windowfn self.triggerfn = triggerfn self.accumulation_mode = accumulation_mode http://git-wip-us.apache.org/repos/asf/beam/blob/d98294c2/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 931a17d..643cb99 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -49,6 +49,8 @@ WindowFn. from __future__ import absolute_import +import abc + from google.protobuf import struct_pb2 from google.protobuf import wrappers_pb2 @@ -93,6 +95,8 @@ class OutputTimeFn(object): class WindowFn(object): """An abstract windowing function defining a basic assign and merge.""" + __metaclass__ = abc.ABCMeta + class AssignContext(object): """Context passed to WindowFn.assign().""" @@ -100,6 +104,7 @@ class WindowFn(object): self.timestamp = Timestamp.of(timestamp) self.element = element + @abc.abstractmethod def assign(self, assign_context): """Associates a timestamp to an element.""" raise NotImplementedError @@ -113,6 +118,7 @@ class WindowFn(object): def merge(self, to_be_merged, merge_result): raise NotImplementedError + @abc.abstractmethod def merge(self, merge_context): """Returns a window that is the result of merging a set of windows.""" raise NotImplementedError @@ -121,8 +127,9 @@ class WindowFn(object): """Returns whether this WindowFn merges windows.""" return True + @abc.abstractmethod def get_window_coder(self): - return coders.WindowCoder() + raise NotImplementedError def get_transformed_output_time(self, window, input_timestamp): # pylint: disable=unused-argument """Given input time and output window, returns output time for window. @@ -344,6 +351,9 @@ class FixedWindows(NonMergingWindowFn): start = timestamp - (timestamp - self.offset) % self.size return [IntervalWindow(start, start + self.size)] + def get_window_coder(self): + return coders.IntervalWindowCoder() + def __eq__(self, other): if type(self) == type(other) == FixedWindows: return self.size == other.size and self.offset == other.offset @@ -398,6 +408,9 @@ class SlidingWindows(NonMergingWindowFn): for s in range(start.micros, timestamp.micros - self.size.micros, -self.period.micros)] + def get_window_coder(self): + return coders.IntervalWindowCoder() + def __eq__(self, other): if type(self) == type(other) == SlidingWindows: return (self.size == other.size @@ -443,6 +456,9 @@ class Sessions(WindowFn): timestamp = context.timestamp return [IntervalWindow(timestamp, timestamp + self.gap_size)] + def get_window_coder(self): + return coders.IntervalWindowCoder() + def merge(self, merge_context): to_merge = [] end = timeutil.MIN_TIMESTAMP
