This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 4aaac9c [BEAM-7390] Add code snippet for Min new 02efc22 Merge pull request #10177 from davidcavazos/min-code 4aaac9c is described below commit 4aaac9ccd8be9c5bd2abf9e334589f6029505753 Author: David Cavazos <dcava...@google.com> AuthorDate: Tue Nov 19 13:09:35 2019 -0800 [BEAM-7390] Add code snippet for Min --- .../snippets/transforms/aggregation/min.py | 60 ++++++++++++++++++++++ .../snippets/transforms/aggregation/min_test.py | 60 ++++++++++++++++++++++ 2 files changed, 120 insertions(+) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min.py new file mode 100644 index 0000000..d004cf2 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min.py @@ -0,0 +1,60 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + + +def min_globally(test=None): + # [START min_globally] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + min_element = ( + pipeline + | 'Create numbers' >> beam.Create([3, 4, 1, 2]) + | 'Get min value' >> beam.CombineGlobally( + lambda elements: min(elements or [-1])) + | beam.Map(print) + ) + # [END min_globally] + if test: + test(min_element) + + +def min_per_key(test=None): + # [START min_per_key] + import apache_beam as beam + + with beam.Pipeline() as pipeline: + elements_with_min_value_per_key = ( + pipeline + | 'Create produce' >> beam.Create([ + ('🥕', 3), + ('🥕', 2), + ('🍆', 1), + ('🍅', 4), + ('🍅', 5), + ('🍅', 3), + ]) + | 'Get min value per key' >> beam.CombinePerKey(min) + | beam.Map(print) + ) + # [END min_per_key] + if test: + test(elements_with_min_value_per_key) diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min_test.py new file mode 100644 index 0000000..321fd12 --- /dev/null +++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/min_test.py @@ -0,0 +1,60 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import absolute_import +from __future__ import print_function + +import unittest + +import mock + +from apache_beam.examples.snippets.util import assert_matches_stdout +from apache_beam.testing.test_pipeline import TestPipeline + +from . import min as beam_min + + +def check_min_element(actual): + expected = '''[START min_element] +1 +[END min_element]'''.splitlines()[1:-1] + assert_matches_stdout(actual, expected) + + +def check_elements_with_min_value_per_key(actual): + expected = '''[START elements_with_min_value_per_key] +('🥕', 2) +('🍆', 1) +('🍅', 3) +[END elements_with_min_value_per_key]'''.splitlines()[1:-1] + assert_matches_stdout(actual, expected) + + +@mock.patch('apache_beam.Pipeline', TestPipeline) +@mock.patch( + 'apache_beam.examples.snippets.transforms.aggregation.min.print', str) +class MinTest(unittest.TestCase): + def test_min_globally(self): + beam_min.min_globally(check_min_element) + + def test_min_per_key(self): + beam_min.min_per_key(check_elements_with_min_value_per_key) + + +if __name__ == '__main__': + unittest.main()