This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f51edc1 [BEAM-7390] Add code snippet for GroupByKey
new d032994 Merge pull request #9926 from davidcavazos/groupbykey-code
f51edc1 is described below
commit f51edc10e1c724bdf113f84ab3b7283b9fabe19c
Author: David Cavazos <[email protected]>
AuthorDate: Wed Oct 16 18:36:39 2019 -0700
[BEAM-7390] Add code snippet for GroupByKey
---
.../snippets/transforms/aggregation/groupbykey.py | 47 +++++++++++++++++++
.../transforms/aggregation/groupbykey_test.py | 54 ++++++++++++++++++++++
2 files changed, 101 insertions(+)
diff --git
a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey.py
b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey.py
new file mode 100644
index 0000000..83e4f87
--- /dev/null
+++
b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey.py
@@ -0,0 +1,47 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def groupbykey(test=None):
+ # [START groupbykey]
+ import apache_beam as beam
+
+ with beam.Pipeline() as pipeline:
+ produce_counts = (
+ pipeline
+ | 'Create produce counts' >> beam.Create([
+ ('spring', '🍓'),
+ ('spring', '🥕'),
+ ('spring', '🍆'),
+ ('spring', '🍅'),
+ ('summer', '🥕'),
+ ('summer', '🍅'),
+ ('summer', '🌽'),
+ ('fall', '🥕'),
+ ('fall', '🍅'),
+ ('winter', '🍆'),
+ ])
+ | 'Group counts per produce' >> beam.GroupByKey()
+ | beam.Map(print)
+ )
+ # [END groupbykey]
+ if test:
+ test(produce_counts)
diff --git
a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey_test.py
b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey_test.py
new file mode 100644
index 0000000..4d8283a
--- /dev/null
+++
b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupbykey_test.py
@@ -0,0 +1,54 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.examples.snippets.util import assert_matches_stdout
+from apache_beam.testing.test_pipeline import TestPipeline
+
+from . import groupbykey
+
+
+def check_produce_counts(actual):
+ expected = '''[START produce_counts]
+('spring', ['🍓', '🥕', '🍆', '🍅'])
+('summer', ['🥕', '🍅', '🌽'])
+('fall', ['🥕', '🍅'])
+('winter', ['🍆'])
+[END produce_counts]'''.splitlines()[1:-1]
+ # The elements order is non-deterministic, so sort them first.
+ assert_matches_stdout(
+ actual, expected, lambda pair: (pair[0], sorted(pair[1])))
+
+
[email protected]('apache_beam.Pipeline', TestPipeline)
[email protected](
+ 'apache_beam.examples.snippets.transforms.aggregation.groupbykey.print',
+ str)
+class GroupByKeyTest(unittest.TestCase):
+ def test_groupbykey(self):
+ groupbykey.groupbykey(check_produce_counts)
+
+
+if __name__ == '__main__':
+ unittest.main()