kerrydc commented on code in PR #23577: URL: https://github.com/apache/beam/pull/23577#discussion_r1128401510
########## learning/tour-of-beam/learning-content/common-transforms/aggregation/mean/description.md: ########## @@ -0,0 +1,141 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Mean + +You can use Mean transforms to compute the arithmetic mean of the elements in a collection or the mean of the values associated with each key in a collection of key-value pairs. +{{if (eq .Sdk "go")}} +`Mean()` returns a transformation that returns a collection whose content is the average of the elements of the input collection. If there are no elements in the input collection, 0 is returned. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Mean(s, input) +} +``` + +You can use `MeanPerKey()` to calculate the mean of the elements associated with each unique key. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + Review Comment: create input and add the expected output, similar to java ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/min/python-example/task.py: ########## @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground: +# name: Min +# description: Min example. +# multifile: false +# context_line: 48 +# categories: +# - Quickstart +# complexity: BASIC +# tags: +# - hellobeam + + +import apache_beam as beam + +# Output PCollection +class Output(beam.PTransform): + class _OutputFn(beam.DoFn): + def __init__(self, prefix=''): + super().__init__() + self.prefix = prefix + + def process(self, element): + print(self.prefix+str(element)) + + def __init__(self, label=None,prefix=''): + super().__init__(label) + self.prefix = prefix + + def expand(self, input): + input | beam.ParDo(self._OutputFn(self.prefix)) + +with beam.Pipeline() as p: + Review Comment: remove whitespace line ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/description.md: ########## @@ -0,0 +1,188 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Sum + +You can use Sum transforms to compute the sum of the elements in a collection or the sum of the values associated with each key in a collection of key-value pairs. +{{if (eq .Sdk "go")}} +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Sum(s, input) +} +``` + +You can use `SumPerKey()` to calculate the sum Integer associated with each unique key. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.SumPerKey(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +You can find the global sum value from the `PCollection` by using `Sum.doublesGlobally()` + +``` +PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Double> sum = input.apply(Sum.doublesGlobally()); +``` + +Output + +``` +55 +``` + +You can use `Sum.integersPerKey()` to calculate the sum Integer associated with each unique key (which is of type String). + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 4), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Integer>> sumPerKey = input.apply(Sum.integersPerKey()); +``` + +Output + +``` +KV{π, 1} +KV{π , 12} +KV{π₯, 5} +``` +{{end}} +{{if (eq .Sdk "python")}} +### Sum of the elements in a PCollection + +You can find the global sum value from the `PCollection` by using `CombineGlobally(sum)` + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + total = ( + p + | 'Create numbers' >> beam.Create([3, 4, 1, 2]) + | 'Sum values' >> beam.CombineGlobally(sum) + | beam.Map(print)) +``` + +Output + +``` +10 +``` + +### Sum of the elements for each key + +You can use `Combine.PerKey()` to get the sum of all the element values for each unique key in a `PCollection` of key-values. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + totals_per_key = ( + p Review Comment: (p = [first transform] on one line ########## learning/tour-of-beam/learning-content/common-transforms/filter/description.md: ########## @@ -0,0 +1,420 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +### Using Filter + +`PCollection` datasets can be filtered using the `Filter` transform. You can create a filter by supplying a predicate and, when applied, filtering out all the elements of `PCollection` that donβt satisfy the predicate. + +{{if (eq .Sdk "go")}} +``` +import ( + "github.com/apache/fbeam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return filter.Exclude(s, input, func(element int) bool { + return element % 2 == 1 + }) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +``` +PCollection<String> input = pipeline + .apply(Create.of(List.of("Hello","world","Hi"))); + +PCollection<String> filteredStrings = input + .apply(Filter.by(new SerializableFunction<String, Boolean>() { + @Override + public Boolean apply(String input) { + return input.length() > 3; + } + })); +``` + +Output + +``` +Hello +world +``` + +### Built-in filters + +The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThen` With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThen` to filter out elements of the input `PCollection` whose values are greater than the specified amount. + +Other built-in filters are: + +* Filter.greaterThanEq +* Filter.greaterThan +* Filter.lessThan +* Filter.lessThanEq +* Filter.equal + + +## Example 2: Filtering with a built-in methods + +``` +// List of integers +PCollection<Integer> input = pipeline.apply(Create.of(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))); + +PCollection<Integer> greaterThanEqNumbers = input.apply(Filter.greaterThanEq(3)); +// PCollection will contain [3, 4, 5, 6, 7, 8, 9, 10] at this point + +PCollection<Integer> greaterThanNumbers = input.apply(Filter.greaterThan(4)); +// PCollection will contain [5, 6, 7, 8, 9, 10] at this point + + +PCollection<Integer> lessThanNumbers = input.apply(Filter.lessThan(10)); +// PCollection will contain [1, 2, 3, 4, 5, 6, 7, 8, 9] at this point + + +PCollection<Integer> lessThanEqNumbers = input.apply(Filter.lessThanEq(7)); +// PCollection will contain [1, 2, 3, 4 5, 6, 7] at this point + + +PCollection<Integer> equalNumbers = input.apply(Filter.equal(9)); +// PCollection will contain [9] at this point +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +import apache_beam as beam + +from log_elements import LogElements + +with beam.Pipeline() as p: + (p | beam.Create(range(1, 11)) + | beam.Filter(lambda num: num % 2 == 0) + | LogElements()) +``` + + +### Example 1: Filtering with a function + +You can define a function `is_perennial()` which returns True if the elementβs duration equals 'perennial', and False otherwise. + +``` +import apache_beam as beam + +def is_perennial(plant): + return plant['duration'] == 'perennial' + +with beam.Pipeline() as p: + perennials = ( + p + | 'Gardening plants' >> beam.Create([ + { + 'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial' + }, + { + 'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial' + }, + { + 'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial' + }, + { + 'icon': 'π ', 'name': 'Tomato', 'duration': 'annual' + }, + { + 'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial' + }, + ]) + | 'Filter perennials' >> beam.Filter(is_perennial) + | beam.Map(print)) +``` + +Output + +``` +{'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'} +{'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'} +{'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'} +``` + +### Example 2: Filtering with a lambda function + +You can also use lambda functions to simplify Example 1. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + perennials = ( + p + | 'Gardening plants' >> beam.Create([ + { + 'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial' + }, + { + 'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial' + }, + { + 'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial' + }, + { + 'icon': 'π ', 'name': 'Tomato', 'duration': 'annual' + }, + { + 'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial' + }, + ]) + | 'Filter perennials' >> + beam.Filter(lambda plant: plant['duration'] == 'perennial') + | beam.Map(print)) +``` + +Output + +``` +{'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'} +{'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'} +{'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'} +``` + +### Example 3: Filtering with multiple arguments + +You can pass functions with multiple arguments to `Filter`. They are passed as additional positional arguments or keyword arguments to the function. + +In this example, `has_duration` takes `plant` and `duration` as arguments. + +``` +import apache_beam as beam + +def has_duration(plant, duration): + return plant['duration'] == duration + +with beam.Pipeline() as p: + perennials = ( + p + | 'Gardening plants' >> beam.Create([ + { + 'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial' + }, + { + 'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial' + }, + { + 'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial' + }, + { + 'icon': 'π ', 'name': 'Tomato', 'duration': 'annual' + }, + { + 'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial' + }, + ]) + | 'Filter perennials' >> beam.Filter(has_duration, 'perennial') + | beam.Map(print)) +``` + +Output + +``` +{'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'} +{'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'} +{'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'} +``` + +### Example 4: Filtering with side inputs as singletons + +If the `PCollection` has a single value, such as the average from another computation, passing the `PCollection` as a singleton accesses that value. + +In this example, we pass a `PCollection` the value **perennial** as a singleton. We then use that value to filter out perennials. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + perennial = p | 'Perennial' >> beam.Create(['perennial']) + + perennials = ( + pipeline Review Comment: (p | [first transform] on one line ########## learning/tour-of-beam/learning-content/common-transforms/filter/hint1.md: ########## @@ -0,0 +1,84 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +**Hint** + +You can use the following code snippet to create an input PCollection: +{{if (eq .Sdk "go")}} +Don't forget to add import: + +``` +import ( + "strings" + ... +) +``` + +Create data for PCollection: + +``` +str:= "To be, or not to be: that is the question: Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune, Or to take arms against a sea of troubles, And by opposing end them. To die: to sleep" + +input := beam.CreateList(s,strings.Split(str, " ")) +``` + +And filtering: + +``` +func applyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return filter.Include(s, input, func(word string) bool { + return strings.HasPrefix(strings.ToUpper(word), "A") + }) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +Don't forget to add import: + +``` +import java.util.Arrays; +``` + +Create data for PCollection: + +``` +String str = "To be, or not to be: that is the question:Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,And by opposing end them. To die: to sleep"; + +PCollection<String> input = pipeline.apply(Create.of(Arrays.asList(str.split(" ")))); +``` +And filtering: + +``` +static PCollection<String> applyTransform(PCollection<String> input) { + return input.apply(Filter.by(word -> word.toUpperCase().startsWith("A"))); +} +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +(p | beam.Create(["To be, or not to be: that is the question:Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,And by opposing end them. To die: to sleep"]) + | beam.ParDo(SplitWords()) + | beam.Filter(lambda word: word.upper().startswith("A")) + | Output(prefix='PCollection filtered value: ')) +``` + +For split word you can use: Review Comment: For SplitWords() you can use: ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/count/description.md: ########## @@ -0,0 +1,311 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Count + +`Count` provides many transformations for calculating the count of values in a `PCollection`, either globally or for each key. + +{{if (eq .Sdk "go")}} +Counts the number of elements within each aggregation. The Count transform has two varieties: + +You can count the number of elements in ```PCollection``` with ```CountElms()```, it will return one element. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.CountElms(s, input) +} +``` + +You can use ```Count()``` to count how many elements are associated with a particular key, the result will be one output for each key. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Count(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +Counts the number of elements within each aggregation. The Count transform has three varieties: + +### Counting all elements in a PCollection + +```Count.globally()``` counts the number of elements in the entire PCollection. The result is a collection with a single element. + +``` +PCollection<Integer> numbers = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Long> output = numbers.apply(Count.globally()); +``` + +Output +``` +10 +``` + +### Counting elements for each key + +```Count.perKey()``` counts how many elements are associated with each key. It ignores the values. The resulting collection has one output for every key in the input collection. + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 4), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Long>> output = input.apply(Count.perKey()); +``` + +Output + +``` +KV{π₯, 2} +KV{π , 3} +KV{π, 1} +``` + +### Counting all unique elements + +```Count.perElement()``` counts how many times each element appears in the input collection. The output collection is a key-value pair, containing each unique element and the number of times it appeared in the original collection. + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 3), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Long>> output = input.apply(Count.perElement()); +``` + +Output + +``` +KV{KV{π , 3}, 2} +KV{KV{π₯, 2}, 1} +KV{KV{π, 1}, 1} +KV{KV{π₯, 3}, 1} +KV{KV{π , 5}, 1} +``` +{{end}} +{{if (eq .Sdk "python")}} +### Counting all elements in a PCollection + +You can use ```Count.Globally()``` to count all elements in a PCollection, even if there are duplicate elements. + +``` +import apache_beam as beam + +with beam.Pipeline() as pipeline: + total_elements = ( + pipeline + | 'Create plants' >> beam.Create( + ['π', 'π₯', 'π₯', 'π₯', 'π', 'π', 'π ', 'π ', 'π ', 'π½']) + | 'Count all elements' >> beam.combiners.Count.Globally() + | beam.Map(print)) +``` + +Output + +``` +10 +``` + +### Counting elements for each key + +You can use ```Count.PerKey()``` to count the elements for each unique key in a PCollection of key-values. + +``` +import apache_beam as beam + +with beam.Pipeline() as pipeline: + total_elements_per_keys = ( + pipeline + | 'Create plants' >> beam.Create([ + ('spring', 'π'), + ('spring', 'π₯'), + ('summer', 'π₯'), + ('fall', 'π₯'), + ('spring', 'π'), + ('winter', 'π'), + ('spring', 'π '), + ('summer', 'π '), + ('fall', 'π '), + ('summer', 'π½'), + ]) + | 'Count elements per key' >> beam.combiners.Count.PerKey() + | beam.Map(print)) +``` + +Output + +``` +('spring', 4) +('summer', 3) +('fall', 2) +('winter', 1) +``` + +### Counting all unique elements + +You can use ```Count.PerElement()``` to count only the unique elements in a PCollection. + +``` +import apache_beam as beam + +with beam.Pipeline() as pipeline: + total_unique_elements = ( + pipeline + | 'Create produce' >> beam.Create( + ['π', 'π₯', 'π₯', 'π₯', 'π', 'π', 'π ', 'π ', 'π ', 'π½']) + | 'Count unique elements' >> beam.combiners.Count.PerElement() + | beam.Map(print)) +``` + +Output + +``` +('π', 1) +('π₯', 3) +('π', 2) +('π ', 3) +('π½', 1) +``` +{{end}} +### Playground exercise + +You can find the full code of this example in the playground window, which you can run and experiment with. +{{if (eq .Sdk "python")}} +`Count.globally` returns the number of integers from the `PCollection`. If you replace the `integers input` with this `map input` and replace `beam.combiners.Count.Globally` on `beam.combiners.Count.PerKey` it will output the count numbers by key : + +``` +beam.Create([ + (1, 36), + (2, 91), + (3, 33), + (3, 11), + (4, 67), +]) | beam.combiners.Count.PerKey() +``` + +And Count transforms work with strings too! Can you change the example to count the number of words in a given sentence and how often each word occurs? + +Count how many words are repeated with `Count`: + +``` + (p | beam.Create(["To be, or not to be: that is the question:Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,And by opposing end them. To die: to sleep"]) + | beam.ParDo(SplitWords()) + | beam.combiners.Count.PerElement() + | Output(prefix='PCollection filtered value: ')) +``` + +For split word you can use: Review Comment: please fix ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/count/description.md: ########## @@ -0,0 +1,311 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Count + +`Count` provides many transformations for calculating the count of values in a `PCollection`, either globally or for each key. + +{{if (eq .Sdk "go")}} +Counts the number of elements within each aggregation. The Count transform has two varieties: + +You can count the number of elements in ```PCollection``` with ```CountElms()```, it will return one element. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.CountElms(s, input) +} +``` + +You can use ```Count()``` to count how many elements are associated with a particular key, the result will be one output for each key. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Count(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +Counts the number of elements within each aggregation. The Count transform has three varieties: + +### Counting all elements in a PCollection + +```Count.globally()``` counts the number of elements in the entire PCollection. The result is a collection with a single element. + +``` +PCollection<Integer> numbers = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Long> output = numbers.apply(Count.globally()); +``` + +Output +``` +10 +``` + +### Counting elements for each key + +```Count.perKey()``` counts how many elements are associated with each key. It ignores the values. The resulting collection has one output for every key in the input collection. + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 4), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Long>> output = input.apply(Count.perKey()); +``` + +Output + +``` +KV{π₯, 2} +KV{π , 3} +KV{π, 1} +``` + +### Counting all unique elements + +```Count.perElement()``` counts how many times each element appears in the input collection. The output collection is a key-value pair, containing each unique element and the number of times it appeared in the original collection. + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 3), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Long>> output = input.apply(Count.perElement()); +``` + +Output + +``` +KV{KV{π , 3}, 2} +KV{KV{π₯, 2}, 1} +KV{KV{π, 1}, 1} +KV{KV{π₯, 3}, 1} +KV{KV{π , 5}, 1} +``` +{{end}} +{{if (eq .Sdk "python")}} +### Counting all elements in a PCollection + +You can use ```Count.Globally()``` to count all elements in a PCollection, even if there are duplicate elements. + +``` +import apache_beam as beam + +with beam.Pipeline() as pipeline: + total_elements = ( + pipeline + | 'Create plants' >> beam.Create( + ['π', 'π₯', 'π₯', 'π₯', 'π', 'π', 'π ', 'π ', 'π ', 'π½']) + | 'Count all elements' >> beam.combiners.Count.Globally() + | beam.Map(print)) +``` + +Output + +``` +10 +``` + +### Counting elements for each key + +You can use ```Count.PerKey()``` to count the elements for each unique key in a PCollection of key-values. + +``` +import apache_beam as beam + +with beam.Pipeline() as pipeline: + total_elements_per_keys = ( + pipeline + | 'Create plants' >> beam.Create([ + ('spring', 'π'), + ('spring', 'π₯'), + ('summer', 'π₯'), + ('fall', 'π₯'), + ('spring', 'π'), + ('winter', 'π'), + ('spring', 'π '), + ('summer', 'π '), + ('fall', 'π '), + ('summer', 'π½'), + ]) + | 'Count elements per key' >> beam.combiners.Count.PerKey() + | beam.Map(print)) +``` + +Output + +``` +('spring', 4) +('summer', 3) +('fall', 2) +('winter', 1) +``` + +### Counting all unique elements + +You can use ```Count.PerElement()``` to count only the unique elements in a PCollection. + +``` +import apache_beam as beam + +with beam.Pipeline() as pipeline: + total_unique_elements = ( + pipeline + | 'Create produce' >> beam.Create( + ['π', 'π₯', 'π₯', 'π₯', 'π', 'π', 'π ', 'π ', 'π ', 'π½']) + | 'Count unique elements' >> beam.combiners.Count.PerElement() + | beam.Map(print)) +``` + +Output + +``` +('π', 1) +('π₯', 3) +('π', 2) +('π ', 3) +('π½', 1) +``` +{{end}} +### Playground exercise + +You can find the full code of this example in the playground window, which you can run and experiment with. +{{if (eq .Sdk "python")}} +`Count.globally` returns the number of integers from the `PCollection`. If you replace the `integers input` with this `map input` and replace `beam.combiners.Count.Globally` on `beam.combiners.Count.PerKey` it will output the count numbers by key : + +``` +beam.Create([ + (1, 36), + (2, 91), + (3, 33), + (3, 11), + (4, 67), +]) | beam.combiners.Count.PerKey() +``` + +And Count transforms work with strings too! Can you change the example to count the number of words in a given sentence and how often each word occurs? + +Count how many words are repeated with `Count`: + +``` + (p | beam.Create(["To be, or not to be: that is the question:Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,And by opposing end them. To die: to sleep"]) + | beam.ParDo(SplitWords()) + | beam.combiners.Count.PerElement() + | Output(prefix='PCollection filtered value: ')) +``` + +For split word you can use: + +``` +class SplitWords(beam.DoFn): + def __init__(self, delimiter=' '): + self.delimiter = delimiter + + def process(self, text): + for word in text.split(self.delimiter): + yield word +``` +{{end}} +{{if (eq .Sdk "go")}} +`CountElms` returns the number of integers from the `PCollection`. If you replace `CountElms` with `Count` you can count the elements by the values of how many times they met. +{{end}} +{{if (eq .Sdk "java")}} +`Count` returns the count elements from the `PCollection`. If you replace the `integers input` with this `map input`: + +``` +PCollection<KV<Integer, Integer>> input = pipeline.apply( + Create.of(KV.of(1, 11), + KV.of(1, 36), + KV.of(2, 91), + KV.of(3, 33), + KV.of(3, 11), + KV.of(4, 33))); +``` + +And replace `Count.globally` on `Count.perKey` it will output the count numbers by key. It is also necessary to replace the generic type: + +``` +PCollection<KV<Integer, Integer>> output = applyTransform(numbers); +``` + +``` +static PCollection<KV<Integer, Integer>> applyTransform(PCollection<KV<Integer, Integer>> input) { + return input.apply(Count.globally()); +} +``` +{{end}} + +{{if (eq .Sdk "go java")}} +And Count transforms work with strings too! Can you change the example to count the number of words in a given sentence and how often each word occurs? + +Don't forget to add import: +{{end}} +{{if (eq .Sdk "go")}} +``` +import ( + "strings" + ... +) +``` +{{end}} +{{if (eq .Sdk "java")}} +``` +import java.util.Arrays; +import org.apache.beam.sdk.values.KV; +``` +{{end}} + +{{if (eq .Sdk "go java")}} +Create data for PCollection: + +``` +String str = "To be, or not to be: that is the question:Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,And by opposing end them. To die: to sleep"; Review Comment: please fix, add spaces after all ,: etc ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/mean/description.md: ########## @@ -0,0 +1,141 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Mean + +You can use Mean transforms to compute the arithmetic mean of the elements in a collection or the mean of the values associated with each key in a collection of key-value pairs. +{{if (eq .Sdk "go")}} +```Mean()``` returns a transformation that returns a collection whose content is the average of the elements of the input collection. If there are no elements in the input collection, 0 is returned. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Mean(s, input) +} +``` + +You can use ```MeanPerKey()``` to calculate the mean of the elements associated with each unique key. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.MeanPerKey(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +```Mean.globally()``` returns a transformation that returns a collection whose content is the average of the elements of the input collection. If there are no elements in the input collection, 0 is returned. + +``` +PCollection<Integer> numbers = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Double> mean = numbers.apply(Mean.globally()); +``` + +Output + +``` +5.5 +``` + + +```Mean.perKey()``` returns a transform that returns a collection containing an output element mapping each distinct key in the input collection to the mean of the values associated with that key in the input collection. + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 4), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Double>> meanPerKey = input.apply(Mean.perKey()); +``` + +Output + +``` +KV{π, 1.0} +KV{π₯, 2.5} +KV{π , 4.0} +``` +{{end}} +{{if (eq .Sdk "python")}} Review Comment: please fix or reply. Python has mean: https://beam.apache.org/documentation/transforms/python/aggregation/mean/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
