Repository: beam Updated Branches: refs/heads/master 9d3b0db2d -> 3a8ae530c
Add Windowing snippets for Python SDK. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c0cabb0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c0cabb0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c0cabb0 Branch: refs/heads/master Commit: 5c0cabb070d2d013dd2b5156e930a578d5790c4a Parents: 9d3b0db Author: Charles Chen <[email protected]> Authored: Thu Feb 16 16:47:04 2017 -0800 Committer: Ahmet Altay <[email protected]> Committed: Thu Feb 23 14:33:07 2017 -0800 ---------------------------------------------------------------------- .../examples/snippets/snippets_test.py | 105 +++++++++++++++++++ 1 file changed, 105 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5c0cabb0/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index efb400d..9635c7a 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -793,6 +793,111 @@ class CombineTest(unittest.TestCase): perkey_counts = occurrences | snippets.Count() self.assertEqual({('cat', 3), ('dog', 2)}, set(perkey_counts)) + def test_setting_fixed_windows(self): + p = TestPipeline() + unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_fixed_windows] + from apache_beam import window + fixed_windowed_items = ( + items | 'window' >> beam.WindowInto(window.FixedWindows(60))) + # [END setting_fixed_windows] + summed = (fixed_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + beam.assert_that(unkeyed, beam.equal_to([110, 215, 120])) + p.run() + + def test_setting_sliding_windows(self): + p = TestPipeline() + unkeyed_items = p | beam.Create([2, 16, 23]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_sliding_windows] + from apache_beam import window + sliding_windowed_items = ( + items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5))) + # [END setting_sliding_windows] + summed = (sliding_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + beam.assert_that(unkeyed, + beam.equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41])) + p.run() + + def test_setting_session_windows(self): + p = TestPipeline() + unkeyed_items = p | beam.Create([2, 11, 16, 27]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_session_windows] + from apache_beam import window + session_windowed_items = ( + items | 'window' >> beam.WindowInto(window.Sessions(10))) + # [END setting_session_windows] + summed = (session_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + beam.assert_that(unkeyed, + beam.equal_to([29, 27])) + p.run() + + def test_setting_global_window(self): + p = TestPipeline() + unkeyed_items = p | beam.Create([2, 11, 16, 27]) + items = (unkeyed_items + | 'key' >> beam.Map( + lambda x: beam.window.TimestampedValue(('k', x), x))) + # [START setting_global_window] + from apache_beam import window + session_windowed_items = ( + items | 'window' >> beam.WindowInto(window.GlobalWindows())) + # [END setting_global_window] + summed = (session_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + beam.assert_that(unkeyed, beam.equal_to([56])) + p.run() + + def test_setting_timestamp(self): + p = TestPipeline() + unkeyed_items = p | beam.Create([12, 30, 60, 61, 66]) + items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x))) + + def extract_timestamp_from_log_entry(entry): + return entry[1] + + # [START setting_timestamp] + class AddTimestampDoFn(beam.DoFn): + + def process(self, element): + # Extract the numeric Unix seconds-since-epoch timestamp to be + # associated with the current log entry. + unix_timestamp = extract_timestamp_from_log_entry(element) + # Wrap and emit the current entry and new timestamp in a + # TimestampedValue. + yield beam.TimestampedValue(element, unix_timestamp) + + timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn()) + # [END setting_timestamp] + fixed_windowed_items = ( + timestamped_items | 'window' >> beam.WindowInto( + beam.window.FixedWindows(60))) + summed = (fixed_windowed_items + | 'group' >> beam.GroupByKey() + | 'combine' >> beam.CombineValues(sum)) + unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) + beam.assert_that(unkeyed, beam.equal_to([42, 187])) + p.run() + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)
