Repository: beam Updated Branches: refs/heads/master 00f5fefc8 -> 591118e66
WindowFn assign should not access the window set Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6e8ed51 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6e8ed51 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6e8ed51 Branch: refs/heads/master Commit: e6e8ed511c7a77b7b56b3dbe81e03af14c7eb33f Parents: 00f5fef Author: Sourabh Bajaj <[email protected]> Authored: Tue Feb 7 16:01:29 2017 -0800 Committer: Ahmet Altay <[email protected]> Committed: Wed Feb 8 17:20:34 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.py | 3 +-- sdks/python/apache_beam/transforms/core.py | 6 ++--- sdks/python/apache_beam/transforms/window.py | 5 ++-- .../apache_beam/transforms/window_test.py | 26 ++++++++++---------- 4 files changed, 18 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e6e8ed51/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index aa6c2dd..50ccf22 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -306,8 +306,7 @@ class DoFnRunner(Receiver): windowed_value = WindowedValue( value, timestamp, self.window_fn.assign(assign_context)) elif isinstance(result, TimestampedValue): - assign_context = WindowFn.AssignContext( - result.timestamp, result.value, element.windows) + assign_context = WindowFn.AssignContext(result.timestamp, result.value) windowed_value = WindowedValue( result.value, result.timestamp, self.window_fn.assign(assign_context)) http://git-wip-us.apache.org/repos/asf/beam/blob/e6e8ed51/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 7cbca2f..eb89344 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1236,10 +1236,8 @@ class WindowInto(ParDo): def __init__(self, windowing): self.windowing = windowing - def process(self, element, context=DoFn.ContextParam): - context = WindowFn.AssignContext(context.timestamp, - element=element, - existing_windows=context.windows) + def process(self, element, timestamp=DoFn.TimestampParam): + context = WindowFn.AssignContext(timestamp, element=element) new_windows = self.windowing.windowfn.assign(context) yield WindowedValue(element, context.timestamp, new_windows) http://git-wip-us.apache.org/repos/asf/beam/blob/e6e8ed51/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 70759e0..67ca83c 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -88,13 +88,12 @@ class WindowFn(object): class AssignContext(object): """Context passed to WindowFn.assign().""" - def __init__(self, timestamp, element=None, existing_windows=None): + def __init__(self, timestamp, element=None): self.timestamp = Timestamp.of(timestamp) self.element = element - self.existing_windows = existing_windows def assign(self, assign_context): - """Associates a timestamp and set of windows to an element.""" + """Associates a timestamp to an element.""" raise NotImplementedError class MergeContext(object): http://git-wip-us.apache.org/repos/asf/beam/blob/e6e8ed51/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 9375d25..1a21709 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -40,8 +40,8 @@ from apache_beam.transforms.window import WindowedValue from apache_beam.transforms.window import WindowFn -def context(element, timestamp, windows): - return WindowFn.AssignContext(timestamp, element, windows) +def context(element, timestamp): + return WindowFn.AssignContext(timestamp, element) sort_values = Map(lambda (k, vs): (k, sorted(vs))) @@ -67,40 +67,40 @@ class WindowTest(unittest.TestCase): # Test windows with offset: 2, 7, 12, 17, ... windowfn = FixedWindows(size=5, offset=2) self.assertEqual([IntervalWindow(7, 12)], - windowfn.assign(context('v', 7, []))) + windowfn.assign(context('v', 7))) self.assertEqual([IntervalWindow(7, 12)], - windowfn.assign(context('v', 11, []))) + windowfn.assign(context('v', 11))) self.assertEqual([IntervalWindow(12, 17)], - windowfn.assign(context('v', 12, []))) + windowfn.assign(context('v', 12))) # Test windows without offset: 0, 5, 10, 15, ... windowfn = FixedWindows(size=5) self.assertEqual([IntervalWindow(5, 10)], - windowfn.assign(context('v', 5, []))) + windowfn.assign(context('v', 5))) self.assertEqual([IntervalWindow(5, 10)], - windowfn.assign(context('v', 9, []))) + windowfn.assign(context('v', 9))) self.assertEqual([IntervalWindow(10, 15)], - windowfn.assign(context('v', 10, []))) + windowfn.assign(context('v', 10))) # Test windows with offset out of range. windowfn = FixedWindows(size=5, offset=12) self.assertEqual([IntervalWindow(7, 12)], - windowfn.assign(context('v', 11, []))) + windowfn.assign(context('v', 11))) def test_sliding_windows_assignment(self): windowfn = SlidingWindows(size=15, period=5, offset=2) expected = [IntervalWindow(7, 22), IntervalWindow(2, 17), IntervalWindow(-3, 12)] - self.assertEqual(expected, windowfn.assign(context('v', 7, []))) - self.assertEqual(expected, windowfn.assign(context('v', 8, []))) - self.assertEqual(expected, windowfn.assign(context('v', 11, []))) + self.assertEqual(expected, windowfn.assign(context('v', 7))) + self.assertEqual(expected, windowfn.assign(context('v', 8))) + self.assertEqual(expected, windowfn.assign(context('v', 11))) def test_sessions_merging(self): windowfn = Sessions(10) def merge(*timestamps): - windows = [windowfn.assign(context(None, t, [])) for t in timestamps] + windows = [windowfn.assign(context(None, t)) for t in timestamps] running = set() class TestMergeContext(WindowFn.MergeContext):
