This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3c1acb0 Add Python snippet for WithTimestamps transform
new 4169307 Merge pull request #8907 from
davidcavazos/element-wise-with-timestamps
3c1acb0 is described below
commit 3c1acb01770b3714566e14b08469c6d2d46d916b
Author: David Cavazos <[email protected]>
AuthorDate: Mon Jun 10 16:26:18 2019 -0700
Add Python snippet for WithTimestamps transform
---
.../transforms/element_wise/with_timestamps.py | 106 +++++++++++++++++++++
.../element_wise/with_timestamps_test.py | 95 ++++++++++++++++++
2 files changed, 201 insertions(+)
diff --git
a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
new file mode 100644
index 0000000..d45bb1d
--- /dev/null
+++
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps.py
@@ -0,0 +1,106 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def event_time(test=None):
+ # [START event_time]
+ import apache_beam as beam
+
+ class GetTimestamp(beam.DoFn):
+ def process(self, plant, timestamp=beam.DoFn.TimestampParam):
+ yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])
+
+ with beam.Pipeline() as pipeline:
+ plant_timestamps = (
+ pipeline
+ | 'Garden plants' >> beam.Create([
+ {'name': 'Strawberry', 'season': 1585699200}, # April, 2020
+ {'name': 'Carrot', 'season': 1590969600}, # June, 2020
+ {'name': 'Artichoke', 'season': 1583020800}, # March, 2020
+ {'name': 'Tomato', 'season': 1588291200}, # May, 2020
+ {'name': 'Potato', 'season': 1598918400}, # September, 2020
+ ])
+ | 'With timestamps' >> beam.Map(
+ lambda plant: beam.window.TimestampedValue(plant, plant['season']))
+ | 'Get timestamp' >> beam.ParDo(GetTimestamp())
+ | beam.Map(print)
+ )
+ # [END event_time]
+ if test:
+ test(plant_timestamps)
+
+
+def logical_clock(test=None):
+ # [START logical_clock]
+ import apache_beam as beam
+
+ class GetTimestamp(beam.DoFn):
+ def process(self, plant, timestamp=beam.DoFn.TimestampParam):
+ event_id = int(timestamp.micros / 1e6) # equivalent to seconds
+ yield '{} - {}'.format(event_id, plant['name'])
+
+ with beam.Pipeline() as pipeline:
+ plant_events = (
+ pipeline
+ | 'Garden plants' >> beam.Create([
+ {'name': 'Strawberry', 'event_id': 1},
+ {'name': 'Carrot', 'event_id': 4},
+ {'name': 'Artichoke', 'event_id': 2},
+ {'name': 'Tomato', 'event_id': 3},
+ {'name': 'Potato', 'event_id': 5},
+ ])
+ | 'With timestamps' >> beam.Map(lambda plant: \
+ beam.window.TimestampedValue(plant, plant['event_id']))
+ | 'Get timestamp' >> beam.ParDo(GetTimestamp())
+ | beam.Map(print)
+ )
+ # [END logical_clock]
+ if test:
+ test(plant_events)
+
+
+def processing_time(test=None):
+ # [START processing_time]
+ import apache_beam as beam
+ import time
+
+ class GetTimestamp(beam.DoFn):
+ def process(self, plant, timestamp=beam.DoFn.TimestampParam):
+ yield '{} - {}'.format(timestamp.to_utc_datetime(), plant['name'])
+
+ with beam.Pipeline() as pipeline:
+ plant_processing_times = (
+ pipeline
+ | 'Garden plants' >> beam.Create([
+ {'name': 'Strawberry'},
+ {'name': 'Carrot'},
+ {'name': 'Artichoke'},
+ {'name': 'Tomato'},
+ {'name': 'Potato'},
+ ])
+ | 'With timestamps' >> beam.Map(lambda plant: \
+ beam.window.TimestampedValue(plant, time.time()))
+ | 'Get timestamp' >> beam.ParDo(GetTimestamp())
+ | beam.Map(print)
+ )
+ # [END processing_time]
+ if test:
+ test(plant_processing_times)
diff --git
a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
new file mode 100644
index 0000000..41c25d3
--- /dev/null
+++
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/with_timestamps_test.py
@@ -0,0 +1,95 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+# pylint: disable=line-too-long
+from apache_beam.examples.snippets.transforms.element_wise.with_timestamps
import *
+# pylint: enable=line-too-long
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
[email protected]('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
[email protected]('apache_beam.examples.snippets.transforms.element_wise.with_timestamps.print',
lambda elem: elem)
+# pylint: enable=line-too-long
+class WithTimestampsTest(unittest.TestCase):
+ def __init__(self, methodName):
+ super(WithTimestampsTest, self).__init__(methodName)
+ # [START plant_seasons]
+ plant_seasons = [
+ '2020-04-01 00:00:00 - Strawberry',
+ '2020-06-01 00:00:00 - Carrot',
+ '2020-03-01 00:00:00 - Artichoke',
+ '2020-05-01 00:00:00 - Tomato',
+ '2020-09-01 00:00:00 - Potato',
+ ]
+ # [END plant_seasons]
+ self.plant_seasons_test = lambda actual: \
+ assert_that(actual, equal_to(plant_seasons))
+
+ # [START plant_events]
+ plant_events = [
+ '1 - Strawberry',
+ '4 - Carrot',
+ '2 - Artichoke',
+ '3 - Tomato',
+ '5 - Potato',
+ ]
+ # [END plant_events]
+ self.plant_events_test = lambda actual: \
+ assert_that(actual, equal_to(plant_events))
+
+ # [START plant_processing_times]
+ plant_processing_times = [
+ '2020-03-20 20:12:42.145594 - Strawberry',
+ '2020-03-20 20:12:42.145827 - Carrot',
+ '2020-03-20 20:12:42.145962 - Artichoke',
+ '2020-03-20 20:12:42.146093 - Tomato',
+ '2020-03-20 20:12:42.146216 - Potato',
+ ]
+ # [END plant_processing_times]
+
+ def plant_processing_times_test(actual):
+ # Since `time.time()` will always give something different, we'll
+ # simply strip the timestamp information before testing the results.
+ import apache_beam as beam
+ actual = actual | beam.Map(lambda row: row.split('-')[-1].strip())
+ expected = [row.split('-')[-1].strip() for row in plant_processing_times]
+ assert_that(actual, equal_to(expected))
+ self.plant_processing_times_test = plant_processing_times_test
+
+ def test_event_time(self):
+ event_time(self.plant_seasons_test)
+
+ def test_logical_clock(self):
+ logical_clock(self.plant_events_test)
+
+ def test_processing_time(self):
+ processing_time(self.plant_processing_times_test)
+
+
+if __name__ == '__main__':
+ unittest.main()