More complicated window tests.

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/989a189d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/989a189d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/989a189d

Branch: refs/heads/python-sdk
Commit: 989a189d6495163bb4f80e310834ef927dbefef5
Parents: 7386bcc
Author: Robert Bradshaw <rober...@google.com>
Authored: Thu Oct 13 18:02:12 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Tue Oct 18 12:17:16 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/common.py       |  2 +-
 .../python/apache_beam/transforms/sideinputs.py |  2 +-
 .../apache_beam/transforms/sideinputs_test.py   | 69 +++++++++++++++++++-
 3 files changed, 69 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/989a189d/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 86fd819..cd06879 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -150,7 +150,7 @@ class DoFnRunner(Receiver):
       if self.has_windowed_side_inputs and len(element.windows) > 1:
         for w in element.windows:
           self.context.set_element(
-              WindowedValue(element.value, element.timestamp, w))
+              WindowedValue(element.value, element.timestamp, (w,)))
           self._process_outputs(element, self.dofn_process(self.context))
       else:
         self.context.set_element(element)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/989a189d/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py 
b/sdks/python/apache_beam/transforms/sideinputs.py
index 00c2852..2079c93 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -170,7 +170,7 @@ def default_window_mapping_fn(target_window_fn):
   else:
     def map_via_end(source_window):
       return list(target_window_fn.assign(
-        window.WindowFn.AssignContext(source_window.max_timestamp())))[0]
+        window.WindowFn.AssignContext(source_window.max_timestamp())))[-1]
     return map_via_end
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/989a189d/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py 
b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 8b5b4e0..f84ff57 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -38,9 +38,13 @@ class SideInputsTest(unittest.TestCase):
       main = pcoll | 'WindowMain' >> beam.WindowInto(main_window_fn)
       side = pcoll | 'WindowSide' >> beam.WindowInto(
           side_window_fn or main_window_fn)
+      kw = {}
       if combine_fn is not None:
-        side |= beam.CombineGlobally(combine_fn)
-      res = main | beam.Map(lambda x, s: (x, s), side_input_type(side))
+        side |= beam.CombineGlobally(combine_fn).without_defaults()
+        kw['default_value'] = 0
+      elif side_input_type == beam.pvalue.AsDict:
+        side |= beam.Map(lambda x: ('k%s' % x, 'v%s' % x))
+      res = main | beam.Map(lambda x, s: (x, s), side_input_type(side, **kw))
       if side_input_type in (beam.pvalue.AsIter, beam.pvalue.AsList):
         res |= beam.Map(lambda (x, s): (x, sorted(s)))
       assert_that(res, equal_to(expected))
@@ -57,6 +61,67 @@ class SideInputsTest(unittest.TestCase):
         window.FixedWindows(10),
         expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])])
 
+  def test_different_fixed_windows(self):
+    self.run_windowed_side_inputs(
+        [1, 2, 11, 21, 31],
+        window.FixedWindows(10),
+        window.FixedWindows(20),
+        expected=[(1, [1, 2, 11]), (2, [1, 2, 11]), (11, [1, 2, 11]),
+                  (21, [21, 31]), (31, [21, 31])])
+
+  def test_fixed_global_window(self):
+    self.run_windowed_side_inputs(
+        [1, 2, 11],
+        window.FixedWindows(10),
+        window.GlobalWindows(),
+        expected=[(1, [1, 2, 11]), (2, [1, 2, 11]), (11, [1, 2, 11])])
+
+  def test_sliding_windows(self):
+    self.run_windowed_side_inputs(
+        [1, 2, 4],
+        window.SlidingWindows(size=6, period=2),
+        window.SlidingWindows(size=6, period=2),
+        expected=[
+            # Element 1 falls in three windows
+            (1, [1]),        # [-4, 2)
+            (1, [1, 2]),     # [-2, 4)
+            (1, [1, 2, 4]),  # [0, 6)
+            # as does 2,
+            (2, [1, 2]),     # [-2, 4)
+            (2, [1, 2, 4]),  # [0, 6)
+            (2, [2, 4]),     # [2, 8)
+            # and 4.
+            (4, [1, 2, 4]),  # [0, 6)
+            (4, [2, 4]),     # [2, 8)
+            (4, [4]),        # [4, 10)
+        ])
+
+  def test_windowed_iter(self):
+    self.run_windowed_side_inputs(
+        [1, 2, 11],
+        window.FixedWindows(10),
+        side_input_type=beam.pvalue.AsIter,
+        expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])])    
+
+  def test_windowed_singleton(self):
+    self.run_windowed_side_inputs(
+        [1, 2, 11],
+        window.FixedWindows(10),
+        side_input_type=beam.pvalue.AsSingleton,
+        combine_fn=sum,
+        expected=[(1, 3), (2, 3), (11, 11)])
+
+  def test_windowed_dict(self):
+    self.run_windowed_side_inputs(
+        [1, 2, 11],
+        window.FixedWindows(10),
+        side_input_type=beam.pvalue.AsDict,
+        expected=[
+            (1, {'k1': 'v1', 'k2': 'v2'}),
+            (2, {'k1': 'v1', 'k2': 'v2'}),
+            (11, {'k11': 'v11'}),
+        ])
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)

Reply via email to