This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4816534 [BEAM-7390] Add code snippet for Max
new 0bcff1a Merge pull request #10175 from davidcavazos/max-code
4816534 is described below
commit 4816534ad4a13dd1616bf1d158a4ead80ea20e66
Author: David Cavazos <[email protected]>
AuthorDate: Tue Nov 19 13:09:51 2019 -0800
[BEAM-7390] Add code snippet for Max
---
.../snippets/transforms/aggregation/max.py | 60 ++++++++++++++++++++++
.../snippets/transforms/aggregation/max_test.py | 60 ++++++++++++++++++++++
2 files changed, 120 insertions(+)
diff --git
a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/max.py
b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/max.py
new file mode 100644
index 0000000..7de4050
--- /dev/null
+++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/max.py
@@ -0,0 +1,60 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def max_globally(test=None):
+ # [START max_globally]
+ import apache_beam as beam
+
+ with beam.Pipeline() as pipeline:
+ max_element = (
+ pipeline
+ | 'Create numbers' >> beam.Create([3, 4, 1, 2])
+ | 'Get max value' >> beam.CombineGlobally(
+ lambda elements: max(elements or [None]))
+ | beam.Map(print)
+ )
+ # [END max_globally]
+ if test:
+ test(max_element)
+
+
+def max_per_key(test=None):
+ # [START max_per_key]
+ import apache_beam as beam
+
+ with beam.Pipeline() as pipeline:
+ elements_with_max_value_per_key = (
+ pipeline
+ | 'Create produce' >> beam.Create([
+ ('🥕', 3),
+ ('🥕', 2),
+ ('🍆', 1),
+ ('🍅', 4),
+ ('🍅', 5),
+ ('🍅', 3),
+ ])
+ | 'Get max value per key' >> beam.CombinePerKey(max)
+ | beam.Map(print)
+ )
+ # [END max_per_key]
+ if test:
+ test(elements_with_max_value_per_key)
diff --git
a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/max_test.py
b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/max_test.py
new file mode 100644
index 0000000..c229ade
--- /dev/null
+++
b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/max_test.py
@@ -0,0 +1,60 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.examples.snippets.util import assert_matches_stdout
+from apache_beam.testing.test_pipeline import TestPipeline
+
+from . import max as beam_max
+
+
+def check_max_element(actual):
+ expected = '''[START max_element]
+4
+[END max_element]'''.splitlines()[1:-1]
+ assert_matches_stdout(actual, expected)
+
+
+def check_elements_with_max_value_per_key(actual):
+ expected = '''[START elements_with_max_value_per_key]
+('🥕', 3)
+('🍆', 1)
+('🍅', 5)
+[END elements_with_max_value_per_key]'''.splitlines()[1:-1]
+ assert_matches_stdout(actual, expected)
+
+
[email protected]('apache_beam.Pipeline', TestPipeline)
[email protected](
+ 'apache_beam.examples.snippets.transforms.aggregation.max.print', str)
+class MaxTest(unittest.TestCase):
+ def test_max_globally(self):
+ beam_max.max_globally(check_max_element)
+
+ def test_max_per_key(self):
+ beam_max.max_per_key(check_elements_with_max_value_per_key)
+
+
+if __name__ == '__main__':
+ unittest.main()