Repository: beam
Updated Branches:
  refs/heads/master 00f5fefc8 -> 591118e66


WindowFn assign should not access the window set


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e6e8ed51
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e6e8ed51
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e6e8ed51

Branch: refs/heads/master
Commit: e6e8ed511c7a77b7b56b3dbe81e03af14c7eb33f
Parents: 00f5fef
Author: Sourabh Bajaj <[email protected]>
Authored: Tue Feb 7 16:01:29 2017 -0800
Committer: Ahmet Altay <[email protected]>
Committed: Wed Feb 8 17:20:34 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/common.py       |  3 +--
 sdks/python/apache_beam/transforms/core.py      |  6 ++---
 sdks/python/apache_beam/transforms/window.py    |  5 ++--
 .../apache_beam/transforms/window_test.py       | 26 ++++++++++----------
 4 files changed, 18 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e6e8ed51/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index aa6c2dd..50ccf22 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -306,8 +306,7 @@ class DoFnRunner(Receiver):
         windowed_value = WindowedValue(
             value, timestamp, self.window_fn.assign(assign_context))
       elif isinstance(result, TimestampedValue):
-        assign_context = WindowFn.AssignContext(
-            result.timestamp, result.value, element.windows)
+        assign_context = WindowFn.AssignContext(result.timestamp, result.value)
         windowed_value = WindowedValue(
             result.value, result.timestamp,
             self.window_fn.assign(assign_context))

http://git-wip-us.apache.org/repos/asf/beam/blob/e6e8ed51/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 7cbca2f..eb89344 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1236,10 +1236,8 @@ class WindowInto(ParDo):
     def __init__(self, windowing):
       self.windowing = windowing
 
-    def process(self, element, context=DoFn.ContextParam):
-      context = WindowFn.AssignContext(context.timestamp,
-                                       element=element,
-                                       existing_windows=context.windows)
+    def process(self, element, timestamp=DoFn.TimestampParam):
+      context = WindowFn.AssignContext(timestamp, element=element)
       new_windows = self.windowing.windowfn.assign(context)
       yield WindowedValue(element, context.timestamp, new_windows)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e6e8ed51/sdks/python/apache_beam/transforms/window.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window.py 
b/sdks/python/apache_beam/transforms/window.py
index 70759e0..67ca83c 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -88,13 +88,12 @@ class WindowFn(object):
   class AssignContext(object):
     """Context passed to WindowFn.assign()."""
 
-    def __init__(self, timestamp, element=None, existing_windows=None):
+    def __init__(self, timestamp, element=None):
       self.timestamp = Timestamp.of(timestamp)
       self.element = element
-      self.existing_windows = existing_windows
 
   def assign(self, assign_context):
-    """Associates a timestamp and set of windows to an element."""
+    """Associates a timestamp to an element."""
     raise NotImplementedError
 
   class MergeContext(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/e6e8ed51/sdks/python/apache_beam/transforms/window_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/window_test.py 
b/sdks/python/apache_beam/transforms/window_test.py
index 9375d25..1a21709 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -40,8 +40,8 @@ from apache_beam.transforms.window import WindowedValue
 from apache_beam.transforms.window import WindowFn
 
 
-def context(element, timestamp, windows):
-  return WindowFn.AssignContext(timestamp, element, windows)
+def context(element, timestamp):
+  return WindowFn.AssignContext(timestamp, element)
 
 
 sort_values = Map(lambda (k, vs): (k, sorted(vs)))
@@ -67,40 +67,40 @@ class WindowTest(unittest.TestCase):
     # Test windows with offset: 2, 7, 12, 17, ...
     windowfn = FixedWindows(size=5, offset=2)
     self.assertEqual([IntervalWindow(7, 12)],
-                     windowfn.assign(context('v', 7, [])))
+                     windowfn.assign(context('v', 7)))
     self.assertEqual([IntervalWindow(7, 12)],
-                     windowfn.assign(context('v', 11, [])))
+                     windowfn.assign(context('v', 11)))
     self.assertEqual([IntervalWindow(12, 17)],
-                     windowfn.assign(context('v', 12, [])))
+                     windowfn.assign(context('v', 12)))
 
     # Test windows without offset: 0, 5, 10, 15, ...
     windowfn = FixedWindows(size=5)
     self.assertEqual([IntervalWindow(5, 10)],
-                     windowfn.assign(context('v', 5, [])))
+                     windowfn.assign(context('v', 5)))
     self.assertEqual([IntervalWindow(5, 10)],
-                     windowfn.assign(context('v', 9, [])))
+                     windowfn.assign(context('v', 9)))
     self.assertEqual([IntervalWindow(10, 15)],
-                     windowfn.assign(context('v', 10, [])))
+                     windowfn.assign(context('v', 10)))
 
     # Test windows with offset out of range.
     windowfn = FixedWindows(size=5, offset=12)
     self.assertEqual([IntervalWindow(7, 12)],
-                     windowfn.assign(context('v', 11, [])))
+                     windowfn.assign(context('v', 11)))
 
   def test_sliding_windows_assignment(self):
     windowfn = SlidingWindows(size=15, period=5, offset=2)
     expected = [IntervalWindow(7, 22),
                 IntervalWindow(2, 17),
                 IntervalWindow(-3, 12)]
-    self.assertEqual(expected, windowfn.assign(context('v', 7, [])))
-    self.assertEqual(expected, windowfn.assign(context('v', 8, [])))
-    self.assertEqual(expected, windowfn.assign(context('v', 11, [])))
+    self.assertEqual(expected, windowfn.assign(context('v', 7)))
+    self.assertEqual(expected, windowfn.assign(context('v', 8)))
+    self.assertEqual(expected, windowfn.assign(context('v', 11)))
 
   def test_sessions_merging(self):
     windowfn = Sessions(10)
 
     def merge(*timestamps):
-      windows = [windowfn.assign(context(None, t, [])) for t in timestamps]
+      windows = [windowfn.assign(context(None, t)) for t in timestamps]
       running = set()
 
       class TestMergeContext(WindowFn.MergeContext):

Reply via email to