This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ad6229c  [BEAM-7389] Add Python snippet for Partition transform (#8904)
ad6229c is described below

commit ad6229c785cd075283cc2fd7970e9fdb57a9a614
Author: David Cavazos <[email protected]>
AuthorDate: Wed Jul 10 14:20:39 2019 -0700

    [BEAM-7389] Add Python snippet for Partition transform (#8904)
    
    * Add Python snippet for Partition transform
    
    * Simplified check functions and fixed star imports
---
 .../snippets/transforms/element_wise/partition.py  | 136 +++++++++++++++++++++
 .../transforms/element_wise/partition_test.py      |  84 +++++++++++++
 2 files changed, 220 insertions(+)

diff --git 
a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
 
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
new file mode 100644
index 0000000..6f839d4
--- /dev/null
+++ 
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition.py
@@ -0,0 +1,136 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+def partition_function(test=None):
+  # [START partition_function]
+  import apache_beam as beam
+
+  durations = ['annual', 'biennial', 'perennial']
+
+  def by_duration(plant, num_partitions):
+    return durations.index(plant['duration'])
+
+  with beam.Pipeline() as pipeline:
+    annuals, biennials, perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Partition' >> beam.Partition(by_duration, len(durations))
+    )
+    _ = (
+        annuals
+        | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
+    )
+    _ = (
+        biennials
+        | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
+    )
+    _ = (
+        perennials
+        | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
+    )
+    # [END partition_function]
+    if test:
+      test(annuals, biennials, perennials)
+
+
+def partition_lambda(test=None):
+  # [START partition_lambda]
+  import apache_beam as beam
+
+  durations = ['annual', 'biennial', 'perennial']
+
+  with beam.Pipeline() as pipeline:
+    annuals, biennials, perennials = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Partition' >> beam.Partition(
+            lambda plant, num_partitions: durations.index(plant['duration']),
+            len(durations),
+        )
+    )
+    _ = (
+        annuals
+        | 'Annuals' >> beam.Map(lambda x: print('annual: ' + str(x)))
+    )
+    _ = (
+        biennials
+        | 'Biennials' >> beam.Map(lambda x: print('biennial: ' + str(x)))
+    )
+    _ = (
+        perennials
+        | 'Perennials' >> beam.Map(lambda x: print('perennial: ' + str(x)))
+    )
+    # [END partition_lambda]
+    if test:
+      test(annuals, biennials, perennials)
+
+
+def partition_multiple_arguments(test=None):
+  # [START partition_multiple_arguments]
+  import apache_beam as beam
+  import json
+
+  def split_dataset(plant, num_partitions, ratio):
+    assert num_partitions == len(ratio)
+    bucket = sum(map(ord, json.dumps(plant))) % sum(ratio)
+    total = 0
+    for i, part in enumerate(ratio):
+      total += part
+      if bucket < total:
+        return i
+    return len(ratio) - 1
+
+  with beam.Pipeline() as pipeline:
+    train_dataset, test_dataset = (
+        pipeline
+        | 'Gardening plants' >> beam.Create([
+            {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+            {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+            {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+            {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+            {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+        ])
+        | 'Partition' >> beam.Partition(split_dataset, 2, ratio=[8, 2])
+    )
+    _ = (
+        train_dataset
+        | 'Train' >> beam.Map(lambda x: print('train: ' + str(x)))
+    )
+    _ = (
+        test_dataset
+        | 'Test'  >> beam.Map(lambda x: print('test: ' + str(x)))
+    )
+    # [END partition_multiple_arguments]
+    if test:
+      test(train_dataset, test_dataset)
diff --git 
a/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
 
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
new file mode 100644
index 0000000..48f83da
--- /dev/null
+++ 
b/sdks/python/apache_beam/examples/snippets/transforms/element_wise/partition_test.py
@@ -0,0 +1,84 @@
+# coding=utf-8
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import unittest
+
+import mock
+
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+from ..element_wise import partition
+
+
+def check_partitions(actual1, actual2, actual3):
+  # [START partitions]
+  annuals = [
+      {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+  ]
+  biennials = [
+      {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+  ]
+  perennials = [
+      {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+      {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+      {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+  ]
+  # [END partitions]
+  assert_that(actual1, equal_to(annuals), label='assert annuals')
+  assert_that(actual2, equal_to(biennials), label='assert biennials')
+  assert_that(actual3, equal_to(perennials), label='assert perennials')
+
+
+def check_split_datasets(actual1, actual2):
+  # [START train_test]
+  train_dataset = [
+      {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
+      {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
+      {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
+  ]
+  test_dataset = [
+      {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
+      {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
+  ]
+  # [END train_test]
+  assert_that(actual1, equal_to(train_dataset), label='assert train')
+  assert_that(actual2, equal_to(test_dataset), label='assert test')
+
+
[email protected]('apache_beam.Pipeline', TestPipeline)
+# pylint: disable=line-too-long
[email protected]('apache_beam.examples.snippets.transforms.element_wise.partition.print',
 lambda elem: elem)
+# pylint: enable=line-too-long
+class PartitionTest(unittest.TestCase):
+  def test_partition_function(self):
+    partition.partition_function(check_partitions)
+
+  def test_partition_lambda(self):
+    partition.partition_lambda(check_partitions)
+
+  def test_partition_multiple_arguments(self):
+    partition.partition_multiple_arguments(check_split_datasets)
+
+
+if __name__ == '__main__':
+  unittest.main()

Reply via email to