Repository: beam
Updated Branches:
  refs/heads/master 9d3b0db2d -> 3a8ae530c


Add Windowing snippets for Python SDK.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c0cabb0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c0cabb0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c0cabb0

Branch: refs/heads/master
Commit: 5c0cabb070d2d013dd2b5156e930a578d5790c4a
Parents: 9d3b0db
Author: Charles Chen <[email protected]>
Authored: Thu Feb 16 16:47:04 2017 -0800
Committer: Ahmet Altay <[email protected]>
Committed: Thu Feb 23 14:33:07 2017 -0800

----------------------------------------------------------------------
 .../examples/snippets/snippets_test.py          | 105 +++++++++++++++++++
 1 file changed, 105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5c0cabb0/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index efb400d..9635c7a 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -793,6 +793,111 @@ class CombineTest(unittest.TestCase):
     perkey_counts = occurrences | snippets.Count()
     self.assertEqual({('cat', 3), ('dog', 2)}, set(perkey_counts))
 
+  def test_setting_fixed_windows(self):
+    p = TestPipeline()
+    unkeyed_items = p | beam.Create([22, 33, 55, 100, 115, 120])
+    items = (unkeyed_items
+             | 'key' >> beam.Map(
+                 lambda x: beam.window.TimestampedValue(('k', x), x)))
+    # [START setting_fixed_windows]
+    from apache_beam import window
+    fixed_windowed_items = (
+        items | 'window' >> beam.WindowInto(window.FixedWindows(60)))
+    # [END setting_fixed_windows]
+    summed = (fixed_windowed_items
+              | 'group' >> beam.GroupByKey()
+              | 'combine' >> beam.CombineValues(sum))
+    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+    beam.assert_that(unkeyed, beam.equal_to([110, 215, 120]))
+    p.run()
+
+  def test_setting_sliding_windows(self):
+    p = TestPipeline()
+    unkeyed_items = p | beam.Create([2, 16, 23])
+    items = (unkeyed_items
+             | 'key' >> beam.Map(
+                 lambda x: beam.window.TimestampedValue(('k', x), x)))
+    # [START setting_sliding_windows]
+    from apache_beam import window
+    sliding_windowed_items = (
+        items | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5)))
+    # [END setting_sliding_windows]
+    summed = (sliding_windowed_items
+              | 'group' >> beam.GroupByKey()
+              | 'combine' >> beam.CombineValues(sum))
+    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+    beam.assert_that(unkeyed,
+                     beam.equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41]))
+    p.run()
+
+  def test_setting_session_windows(self):
+    p = TestPipeline()
+    unkeyed_items = p | beam.Create([2, 11, 16, 27])
+    items = (unkeyed_items
+             | 'key' >> beam.Map(
+                 lambda x: beam.window.TimestampedValue(('k', x), x)))
+    # [START setting_session_windows]
+    from apache_beam import window
+    session_windowed_items = (
+        items | 'window' >> beam.WindowInto(window.Sessions(10)))
+    # [END setting_session_windows]
+    summed = (session_windowed_items
+              | 'group' >> beam.GroupByKey()
+              | 'combine' >> beam.CombineValues(sum))
+    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+    beam.assert_that(unkeyed,
+                     beam.equal_to([29, 27]))
+    p.run()
+
+  def test_setting_global_window(self):
+    p = TestPipeline()
+    unkeyed_items = p | beam.Create([2, 11, 16, 27])
+    items = (unkeyed_items
+             | 'key' >> beam.Map(
+                 lambda x: beam.window.TimestampedValue(('k', x), x)))
+    # [START setting_global_window]
+    from apache_beam import window
+    session_windowed_items = (
+        items | 'window' >> beam.WindowInto(window.GlobalWindows()))
+    # [END setting_global_window]
+    summed = (session_windowed_items
+              | 'group' >> beam.GroupByKey()
+              | 'combine' >> beam.CombineValues(sum))
+    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+    beam.assert_that(unkeyed, beam.equal_to([56]))
+    p.run()
+
+  def test_setting_timestamp(self):
+    p = TestPipeline()
+    unkeyed_items = p | beam.Create([12, 30, 60, 61, 66])
+    items = (unkeyed_items | 'key' >> beam.Map(lambda x: ('k', x)))
+
+    def extract_timestamp_from_log_entry(entry):
+      return entry[1]
+
+    # [START setting_timestamp]
+    class AddTimestampDoFn(beam.DoFn):
+
+      def process(self, element):
+        # Extract the numeric Unix seconds-since-epoch timestamp to be
+        # associated with the current log entry.
+        unix_timestamp = extract_timestamp_from_log_entry(element)
+        # Wrap and emit the current entry and new timestamp in a
+        # TimestampedValue.
+        yield beam.TimestampedValue(element, unix_timestamp)
+
+    timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
+    # [END setting_timestamp]
+    fixed_windowed_items = (
+        timestamped_items | 'window' >> beam.WindowInto(
+            beam.window.FixedWindows(60)))
+    summed = (fixed_windowed_items
+              | 'group' >> beam.GroupByKey()
+              | 'combine' >> beam.CombineValues(sum))
+    unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1])
+    beam.assert_that(unkeyed, beam.equal_to([42, 187]))
+    p.run()
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

Reply via email to