This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c1acb0  Add Python snippet for WithTimestamps transform
     new 4169307  Merge pull request #8907 from 
davidcavazos/element-wise-with-timestamps
3c1acb0 is described below

commit 3c1acb01770b3714566e14b08469c6d2d46d916b
Author: David Cavazos <[email protected]>
AuthorDate: Mon Jun 10 16:26:18 2019 -0700

    Add Python snippet for WithTimestamps transform
---
 .../transforms/element_wise/with_timestamps.py     | 106 +++++++++++++++++++++
 .../element_wise/with_timestamps_test.py           |  95 ++++++++++++++++++
 2 files changed, 201 insertions(+)

diff --git 
a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
 
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
new file mode 100644
index 0000000..d45bb1d
--- /dev/null
+++ 
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
@@ -0,0 +1,106 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def event_time(test=None):
+  # [START event_time]
+  import apache_beam as beam
+
+  class GetTimestamp(beam.DoFn):
+    def process(self, plant, timestamp=beam.DoFn.TimestampParam):
+      yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])
+
+  with beam.Pipeline() as pipeline:
+    plant_timestamps = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            {'name': 'Strawberry', 'season': 1585699200}, # April, 2020
+            {'name': 'Carrot', 'season': 1590969600},     # June, 2020
+            {'name': 'Artichoke', 'season': 1583020800},  # March, 2020
+            {'name': 'Tomato', 'season': 1588291200},     # May, 2020
+            {'name': 'Potato', 'season': 1598918400},     # September, 2020
+        ])
+        | 'With timestamps' >> beam.Map(
+            lambda plant: beam.window.TimestampedValue(plant, plant['season']))
+        | 'Get timestamp' >> beam.ParDo(GetTimestamp())
+        | beam.Map(print)
+    )
+    # [END event_time]
+    if test:
+      test(plant_timestamps)
+
+
+def logical_clock(test=None):
+  # [START logical_clock]
+  import apache_beam as beam
+
+  class GetTimestamp(beam.DoFn):
+    def process(self, plant, timestamp=beam.DoFn.TimestampParam):
+      event_id = int(timestamp.micros / 1e6)  # equivalent to seconds
+      yield '{} - {}'.format(event_id, plant['name'])
+
+  with beam.Pipeline() as pipeline:
+    plant_events = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            {'name': 'Strawberry', 'event_id': 1},
+            {'name': 'Carrot', 'event_id': 4},
+            {'name': 'Artichoke', 'event_id': 2},
+            {'name': 'Tomato', 'event_id': 3},
+            {'name': 'Potato', 'event_id': 5},
+        ])
+        | 'With timestamps' >> beam.Map(lambda plant: \
+            beam.window.TimestampedValue(plant, plant['event_id']))
+        | 'Get timestamp' >> beam.ParDo(GetTimestamp())
+        | beam.Map(print)
+    )
+    # [END logical_clock]
+    if test:
+      test(plant_events)
+
+
+def processing_time(test=None):
+  # [START processing_time]
+  import apache_beam as beam
+  import time
+
+  class GetTimestamp(beam.DoFn):
+    def process(self, plant, timestamp=beam.DoFn.TimestampParam):
+      yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])
+
+  with beam.Pipeline() as pipeline:
+    plant_processing_times = (
+        pipeline
+        | 'Garden plants' >> beam.Create([
+            {'name': 'Strawberry'},
+            {'name': 'Carrot'},
+            {'name': 'Artichoke'},
+            {'name': 'Tomato'},
+            {'name': 'Potato'},
+        ])
+        | 'With timestamps' >> beam.Map(lambda plant: \
+            beam.window.TimestampedValue(plant, time.time()))
+        | 'Get timestamp' >> beam.ParDo(GetTimestamp())
+        | beam.Map(print)
+    )
+    # [END processing_time]
+    if test:
+      test(plant_processing_times)
diff --git 
a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
 
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
new file mode 100644
index 0000000..41c25d3
--- /dev/null
+++ 
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
@@ -0,0 +1,95 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+# pylint: disable=line-too-long
+from apache_beam.examples.snippets.transforms.element_wise.with_timestamps 
import *
+# pylint: enable=line-too-long
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
[email protected]('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
[email protected]('apache_beam.examples.snippets.transforms.element_wise.with_timestamps.print',
 lambda elem: elem)
+# pylint: enable=line-too-long
+class WithTimestampsTest(unittest.TestCase):
+  def __init__(self, methodName):
+    super(WithTimestampsTest, self).__init__(methodName)
+    # [START plant_seasons]
+    plant_seasons = [
+        '2020-04-01 00:00:00 - Strawberry',
+        '2020-06-01 00:00:00 - Carrot',
+        '2020-03-01 00:00:00 - Artichoke',
+        '2020-05-01 00:00:00 - Tomato',
+        '2020-09-01 00:00:00 - Potato',
+    ]
+    # [END plant_seasons]
+    self.plant_seasons_test = lambda actual: \
+        assert_that(actual, equal_to(plant_seasons))
+
+    # [START plant_events]
+    plant_events = [
+        '1 - Strawberry',
+        '4 - Carrot',
+        '2 - Artichoke',
+        '3 - Tomato',
+        '5 - Potato',
+    ]
+    # [END plant_events]
+    self.plant_events_test = lambda actual: \
+        assert_that(actual, equal_to(plant_events))
+
+    # [START plant_processing_times]
+    plant_processing_times = [
+        '2020-03-20 20:12:42.145594 - Strawberry',
+        '2020-03-20 20:12:42.145827 - Carrot',
+        '2020-03-20 20:12:42.145962 - Artichoke',
+        '2020-03-20 20:12:42.146093 - Tomato',
+        '2020-03-20 20:12:42.146216 - Potato',
+    ]
+    # [END plant_processing_times]
+
+    def plant_processing_times_test(actual):
+      # Since `time.time()` will always give something different, we'll
+      # simply strip the timestamp information before testing the results.
+      import apache_beam as beam
+      actual = actual | beam.Map(lambda row: row.split('-')[-1].strip())
+      expected = [row.split('-')[-1].strip() for row in plant_processing_times]
+      assert_that(actual, equal_to(expected))
+    self.plant_processing_times_test = plant_processing_times_test
+
+  def test_event_time(self):
+    event_time(self.plant_seasons_test)
+
+  def test_logical_clock(self):
+    logical_clock(self.plant_events_test)
+
+  def test_processing_time(self):
+    processing_time(self.plant_processing_times_test)
+
+
+if __name__ == '__main__':
+  unittest.main()

Reply via email to