More complicated window tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/989a189d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/989a189d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/989a189d Branch: refs/heads/python-sdk Commit: 989a189d6495163bb4f80e310834ef927dbefef5 Parents: 7386bcc Author: Robert Bradshaw <rober...@google.com> Authored: Thu Oct 13 18:02:12 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Tue Oct 18 12:17:16 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/common.py | 2 +- .../python/apache_beam/transforms/sideinputs.py | 2 +- .../apache_beam/transforms/sideinputs_test.py | 69 +++++++++++++++++++- 3 files changed, 69 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/989a189d/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 86fd819..cd06879 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -150,7 +150,7 @@ class DoFnRunner(Receiver): if self.has_windowed_side_inputs and len(element.windows) > 1: for w in element.windows: self.context.set_element( - WindowedValue(element.value, element.timestamp, w)) + WindowedValue(element.value, element.timestamp, (w,))) self._process_outputs(element, self.dofn_process(self.context)) else: self.context.set_element(element) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/989a189d/sdks/python/apache_beam/transforms/sideinputs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 00c2852..2079c93 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -170,7 +170,7 @@ def default_window_mapping_fn(target_window_fn): else: def map_via_end(source_window): return list(target_window_fn.assign( - window.WindowFn.AssignContext(source_window.max_timestamp())))[0] + window.WindowFn.AssignContext(source_window.max_timestamp())))[-1] return map_via_end http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/989a189d/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 8b5b4e0..f84ff57 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -38,9 +38,13 @@ class SideInputsTest(unittest.TestCase): main = pcoll | 'WindowMain' >> beam.WindowInto(main_window_fn) side = pcoll | 'WindowSide' >> beam.WindowInto( side_window_fn or main_window_fn) + kw = {} if combine_fn is not None: - side |= beam.CombineGlobally(combine_fn) - res = main | beam.Map(lambda x, s: (x, s), side_input_type(side)) + side |= beam.CombineGlobally(combine_fn).without_defaults() + kw['default_value'] = 0 + elif side_input_type == beam.pvalue.AsDict: + side |= beam.Map(lambda x: ('k%s' % x, 'v%s' % x)) + res = main | beam.Map(lambda x, s: (x, s), side_input_type(side, **kw)) if side_input_type in (beam.pvalue.AsIter, beam.pvalue.AsList): res |= beam.Map(lambda (x, s): (x, sorted(s))) assert_that(res, equal_to(expected)) @@ -57,6 +61,67 @@ class SideInputsTest(unittest.TestCase): window.FixedWindows(10), expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])]) + def test_different_fixed_windows(self): + self.run_windowed_side_inputs( + [1, 2, 11, 21, 31], + window.FixedWindows(10), + window.FixedWindows(20), + expected=[(1, [1, 2, 11]), (2, [1, 2, 11]), (11, [1, 2, 11]), + (21, [21, 31]), (31, [21, 31])]) + + def test_fixed_global_window(self): + self.run_windowed_side_inputs( + [1, 2, 11], + window.FixedWindows(10), + window.GlobalWindows(), + expected=[(1, [1, 2, 11]), (2, [1, 2, 11]), (11, [1, 2, 11])]) + + def test_sliding_windows(self): + self.run_windowed_side_inputs( + [1, 2, 4], + window.SlidingWindows(size=6, period=2), + window.SlidingWindows(size=6, period=2), + expected=[ + # Element 1 falls in three windows + (1, [1]), # [-4, 2) + (1, [1, 2]), # [-2, 4) + (1, [1, 2, 4]), # [0, 6) + # as does 2, + (2, [1, 2]), # [-2, 4) + (2, [1, 2, 4]), # [0, 6) + (2, [2, 4]), # [2, 8) + # and 4. + (4, [1, 2, 4]), # [0, 6) + (4, [2, 4]), # [2, 8) + (4, [4]), # [4, 10) + ]) + + def test_windowed_iter(self): + self.run_windowed_side_inputs( + [1, 2, 11], + window.FixedWindows(10), + side_input_type=beam.pvalue.AsIter, + expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])]) + + def test_windowed_singleton(self): + self.run_windowed_side_inputs( + [1, 2, 11], + window.FixedWindows(10), + side_input_type=beam.pvalue.AsSingleton, + combine_fn=sum, + expected=[(1, 3), (2, 3), (11, 11)]) + + def test_windowed_dict(self): + self.run_windowed_side_inputs( + [1, 2, 11], + window.FixedWindows(10), + side_input_type=beam.pvalue.AsDict, + expected=[ + (1, {'k1': 'v1', 'k2': 'v2'}), + (2, {'k1': 'v1', 'k2': 'v2'}), + (11, {'k11': 'v11'}), + ]) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG)