This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ad6229c [BEAM-7389] Add Python snippet for Partition transform (#8904)
ad6229c is described below
commit ad6229c785cd075283cc2fd7970e9fdb57a9a614
Author: David Cavazos <[email protected]>
AuthorDate: Wed Jul 10 14:20:39 2019 -0700
[BEAM-7389] Add Python snippet for Partition transform (#8904)
* Add Python snippet for Partition transform
* Simplified check functions and fixed star imports
---
.../snippets/transforms/element_wise/partition.py | 136 +++++++++++++++++++++
.../transforms/element_wise/partition_test.py | 84 +++++++++++++
2 files changed, 220 insertions(+)
diff --git
a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
new file mode 100644
index 0000000..6f839d4
--- /dev/null
+++
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
@@ -0,0 +1,136 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def partition_function(test=None):
+ # [START partition_function]
+ import apache_beam as beam
+
+ durations = ['annual', 'biennial', 'perennial']
+
+ def by_duration(plant, num_partitions):
+ return durations.index(plant['duration'])
+
+ with beam.Pipeline() as pipeline:
+ annuals, biennials, perennials = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ])
+ | 'Partition' >> beam.Partition(by_duration, len(durations))
+ )
+ _ = (
+ annuals
+ | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
+ )
+ _ = (
+ biennials
+ | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
+ )
+ _ = (
+ perennials
+ | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
+ )
+ # [END partition_function]
+ if test:
+ test(annuals, biennials, perennials)
+
+
+def partition_lambda(test=None):
+ # [START partition_lambda]
+ import apache_beam as beam
+
+ durations = ['annual', 'biennial', 'perennial']
+
+ with beam.Pipeline() as pipeline:
+ annuals, biennials, perennials = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ])
+ | 'Partition' >> beam.Partition(
+ lambda plant, num_partitions: durations.index(plant['duration']),
+ len(durations),
+ )
+ )
+ _ = (
+ annuals
+ | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
+ )
+ _ = (
+ biennials
+ | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
+ )
+ _ = (
+ perennials
+ | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
+ )
+ # [END partition_lambda]
+ if test:
+ test(annuals, biennials, perennials)
+
+
+def partition_multiple_arguments(test=None):
+ # [START partition_multiple_arguments]
+ import apache_beam as beam
+ import json
+
+ def split_dataset(plant, num_partitions, ratio):
+ assert num_partitions == len(ratio)
+ bucket = sum(map(ord, json.dumps(plant))) % sum(ratio)
+ total = 0
+ for i, part in enumerate(ratio):
+ total += part
+ if bucket < total:
+ return i
+ return len(ratio) - 1
+
+ with beam.Pipeline() as pipeline:
+ train_dataset, test_dataset = (
+ pipeline
+ | 'Gardening plants' >> beam.Create([
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ])
+ | 'Partition' >> beam.Partition(split_dataset, 2, ratio=[8, 2])
+ )
+ _ = (
+ train_dataset
+ | 'Train' >> beam.Map(lambda x: print('train: ' + str(x)))
+ )
+ _ = (
+ test_dataset
+ | 'Test' >> beam.Map(lambda x: print('test: ' + str(x)))
+ )
+ # [END partition_multiple_arguments]
+ if test:
+ test(train_dataset, test_dataset)
diff --git
a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
new file mode 100644
index 0000000..48f83da
--- /dev/null
+++
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
@@ -0,0 +1,84 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from ..element_wise import partition
+
+
+def check_partitions(actual1, actual2, actual3):
+ # [START partitions]
+ annuals = [
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ ]
+ biennials = [
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ ]
+ perennials = [
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ]
+ # [END partitions]
+ assert_that(actual1, equal_to(annuals), label='assert annuals')
+ assert_that(actual2, equal_to(biennials), label='assert biennials')
+ assert_that(actual3, equal_to(perennials), label='assert perennials')
+
+
+def check_split_datasets(actual1, actual2):
+ # [START train_test]
+ train_dataset = [
+ {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+ {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+ {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+ ]
+ test_dataset = [
+ {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+ {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+ ]
+ # [END train_test]
+ assert_that(actual1, equal_to(train_dataset), label='assert train')
+ assert_that(actual2, equal_to(test_dataset), label='assert test')
+
+
[email protected]('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
[email protected]('apache_beam.examples.snippets.transforms.element_wise.partition.print',
lambda elem: elem)
+# pylint: enable=line-too-long
+class PartitionTest(unittest.TestCase):
+ def test_partition_function(self):
+ partition.partition_function(check_partitions)
+
+ def test_partition_lambda(self):
+ partition.partition_lambda(check_partitions)
+
+ def test_partition_multiple_arguments(self):
+ partition.partition_multiple_arguments(check_split_datasets)
+
+
+if __name__ == '__main__':
+ unittest.main()