kerrydc commented on code in PR #23577: URL: https://github.com/apache/beam/pull/23577#discussion_r1128562586
########## learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/description.md: ########## @@ -0,0 +1,188 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Sum + +You can use Sum transforms to compute the sum of the elements in a collection or the sum of the values associated with each key in a collection of key-value pairs. +{{if (eq .Sdk "go")}} +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + Review Comment: create input like in Java ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/description.md: ########## @@ -0,0 +1,188 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Sum + +You can use Sum transforms to compute the sum of the elements in a collection or the sum of the values associated with each key in a collection of key-value pairs. +{{if (eq .Sdk "go")}} +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Sum(s, input) +} +``` + +You can use `SumPerKey()` to calculate the sum Integer associated with each unique key. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + Review Comment: create input like in Java ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/min/python-example/task.py: ########## @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground: +# name: Min +# description: Min example. +# multifile: false +# context_line: 48 +# categories: +# - Quickstart +# complexity: BASIC +# tags: +# - hellobeam + + +import apache_beam as beam + +# Output PCollection +class Output(beam.PTransform): + class _OutputFn(beam.DoFn): + def __init__(self, prefix=''): + super().__init__() + self.prefix = prefix + + def process(self, element): + print(self.prefix+str(element)) + + def __init__(self, label=None,prefix=''): + super().__init__(label) + self.prefix = prefix + + def expand(self, input): + input | beam.ParDo(self._OutputFn(self.prefix)) + +with beam.Pipeline() as p: + + (p | beam.Create(range(1, 11)) + # beam.combiners.Top.Smallest(5) to return the small number than 5 from `PCollection`. + | beam.combiners.Top.Smallest(5) + | Output(prefix='PCollection minimum value: ')) Review Comment: Output(prefix='PCollection 5 smallest values: ')) ########## learning/tour-of-beam/learning-content/common-transforms/motivating-challenge/description.md: ########## @@ -0,0 +1,25 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +### Common Transforms motivating challenge + +You are provided with a `PCollection` from the array of taxi order prices in a csv file. Your task is to find how many orders are below $15 and above. Return it as a map structure (key-value), make `above` or `below` the key, and the sum of all orders - the value.Although there are many ways to do this, try using another transformation presented in this module. Review Comment: You are provided with a `PCollection` created from the array of taxi order prices in a csv file. Your task is to find how many orders are below $15 and how many are equal to or above $15. Return it as a map structure (key-value), make `above` or `below` the key, and the total dollar value (sum) of orders - the value. Although there are many ways to do this, try using another transformation presented in this module. ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/max/description.md: ########## @@ -0,0 +1,192 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Max + +Max provides a variety of different transforms for computing the maximum values in a collection, either globally or for each key. + +{{if (eq .Sdk "go")}} +You can find the global maximum value from the ```PCollection``` by using ```Max()``` + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Max(s, input) +} +``` + +You can use ```MaxPerKey()``` to calculate the maximum Integer associated with each unique key (which is of type String). + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + Review Comment: please fix for both go examples ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/min/description.md: ########## @@ -0,0 +1,190 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Min + +Min transforms find the minimum values globally or for each key in the input collection. +{{if (eq .Sdk "go")}} +You can find the global minimum value from the ```PCollection``` by using ```Min()``` + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Min(s, input) +} +``` + +You can use ```MinPerKey()``` to calculate the minimum Integer associated with each unique key (which is of type String). + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + Review Comment: please fix ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/sum/description.md: ########## @@ -0,0 +1,188 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Sum + +You can use Sum transforms to compute the sum of the elements in a collection or the sum of the values associated with each key in a collection of key-value pairs. +{{if (eq .Sdk "go")}} +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Sum(s, input) +} +``` + +You can use ```SumPerKey()```to calculate the sum Integer associated with each unique key. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.SumPerKey(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +You can find the global sum value from the ```PCollection``` by using ```Sum.doublesGlobally()``` + +``` +PCollection<Integer> numbers = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Double> sum = numbers.apply(Sum.doublesGlobally()); +``` + +Output + +``` +55 +``` + +You can use ```Sum.integersPerKey()```to calculate the sum Integer associated with each unique key (which is of type String). + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 4), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Integer>> sumPerKey = input.apply(Sum.integersPerKey()); +``` + +Output + +``` +KV{π, 1} +KV{π , 12} +KV{π₯, 5} +``` +{{end}} +{{if (eq .Sdk "python")}} +### Sum of the elements in a PCollection + +You can find the global sum value from the ```PCollection``` by using ```CombineGlobally(sum)``` + +``` +import apache_beam as beam + +with beam.Pipeline() as pipeline: + total = ( + pipeline + | 'Create numbers' >> beam.Create([3, 4, 1, 2]) + | 'Sum values' >> beam.CombineGlobally(sum) + | beam.Map(print)) +``` + +Output + +``` +10 +``` + +### Sum of the elements for each key + +You can use ```Combine.PerKey()``` to get the sum of all the element values for each unique key in a ```PCollection``` of key-values. + +``` +import apache_beam as beam + +with beam.Pipeline() as pipeline: + totals_per_key = ( + pipeline + | 'Create produce' >> beam.Create([ + ('π₯', 3), + ('π₯', 2), + ('π', 1), + ('π ', 4), + ('π ', 5), + ('π ', 3), + ]) + | 'Sum values per key' >> beam.CombinePerKey(sum) + | beam.Map(print)) +``` + +Output +``` +('π₯', 5) +('π', 1) +('π ', 12) +``` +{{end}} +You can use ```Sum()``` to sum the elements of a ```PCollection```. + +### Playground exercise + +You can find the full code of this example in the playground window, which you can run and experiment with. + +{{if (eq .Sdk "go")}} +`Sum` returns the sum from the `PCollection`. If you replace the `integers input` with this `map input`: + +``` +input:= beam.ParDo(s, func(_ []byte, emit func(int, int)){ + emit(1,1) + emit(1,4) + emit(2,6) + emit(2,3) + emit(2,-4) + emit(3,23) +}, beam.Impulse(s)) +``` + +And replace `stats.Sum` on `stats.SumPerKey` it will output the sum by key. +{{end}} +{{if (eq .Sdk "java")}} +`Sum.integersGlobally` returns the sum from the `PCollection`. If you replace the `integers input` with this `map input`: + +``` +PCollection<KV<Integer, Integer>> input = pipeline.apply( + Create.of(KV.of(1, 11), + KV.of(1, 36), + KV.of(2, 91), + KV.of(3, 33), + KV.of(3, 11), + KV.of(4, 33))); +``` + +And replace `Sum.integersGlobally` on `Sum.integersPerKey` it will output the sum by key. It is also necessary to replace the generic type: + +``` +PCollection<KV<Integer, Integer>> output = applyTransform(input); +``` + +``` +static PCollection<KV<Integer, Integer>> applyTransform(PCollection<KV<Integer, Integer>> input) { + return input.apply(Sum.integersPerKey()); + } +``` +{{end}} +{{if (eq .Sdk "python")}} +`beam.CombineGlobally(sum)` returns sum from `PCCollection`. If you replace the `integers input` with this `map input` and replace `beam.CombineGlobally(sum)` on `beam.CombinePerKey(sum)` it will output the sum by key : + +``` +beam.Create([ Review Comment: (p = [first transform] on one line ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/max/description.md: ########## @@ -0,0 +1,192 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Max + +`Max` provides a variety of different transforms for computing the maximum values in a collection, either globally or for each key. + +{{if (eq .Sdk "go")}} +You can find the global maximum value from the `PCollection` by using `Max()` + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Max(s, input) +} +``` + +You can use `MaxPerKey()` to calculate the maximum Integer associated with each unique key (which is of type String). + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.MaxPerKey(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +You can find the global maximum value from the `PCollection` by using `Max.doublesGlobally()` + +``` +PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Double> max = input.apply(Max.doublesGlobally()); +``` + +Output + +``` +10 +``` + +You can use `Max.integersPerKey()` to calculate the maximum Integer associated with each unique key (which is of type String). + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 4), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Integer>> maxPerKey = input.apply(Max.integersPerKey()); +``` + +Output + +``` +KV{π , 5} +KV{π₯, 3} +KV{π, 1} +``` +{{end}} +{{if (eq .Sdk "python")}} + +### Maximum element in a PCollection + +You can use `CombineGlobally(lambda elements: max(elements or [None]))` to get the maximum element from the entire `PCollection`. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + max_element = ( + p + | 'Create numbers' >> beam.Create([3, 4, 1, 2]) + | 'Get max value' >> + beam.CombineGlobally(lambda elements: max(elements or [None])) + | beam.Map(print)) +``` + +Output + +``` +4 +``` + +### Maximum elements for each key + +You can use `Combine.PerKey()` to get the maximum element for each unique key in a PCollection of key-values. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + elements_with_max_value_per_key = ( + p Review Comment: Use this style (p | [first transform] on one line ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/max/description.md: ########## @@ -0,0 +1,192 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Max + +`Max` provides a variety of different transforms for computing the maximum values in a collection, either globally or for each key. + +{{if (eq .Sdk "go")}} +You can find the global maximum value from the `PCollection` by using `Max()` + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Max(s, input) +} +``` + +You can use `MaxPerKey()` to calculate the maximum Integer associated with each unique key (which is of type String). + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.MaxPerKey(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +You can find the global maximum value from the `PCollection` by using `Max.doublesGlobally()` + +``` +PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Double> max = input.apply(Max.doublesGlobally()); +``` + +Output + +``` +10 +``` + +You can use `Max.integersPerKey()` to calculate the maximum Integer associated with each unique key (which is of type String). + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 4), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Integer>> maxPerKey = input.apply(Max.integersPerKey()); +``` + +Output + +``` +KV{π , 5} +KV{π₯, 3} +KV{π, 1} +``` +{{end}} +{{if (eq .Sdk "python")}} + +### Maximum element in a PCollection + +You can use `CombineGlobally(lambda elements: max(elements or [None]))` to get the maximum element from the entire `PCollection`. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + max_element = ( + p Review Comment: Use this style (p | [first transform] on one line ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/min/description.md: ########## @@ -0,0 +1,190 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Min + +Min transforms find the minimum values globally or for each key in the input collection. +{{if (eq .Sdk "go")}} +You can find the global minimum value from the `PCollection` by using `Min()` + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Min(s, input) +} +``` + +You can use `MinPerKey()` to calculate the minimum Integer associated with each unique key (which is of type String). + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.MinPerKey(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +You can find the global minimum value from the `PCollection` by using `Min.doublesGlobally()` + +``` +PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Double> min = input.apply(Min.doublesGlobally()); +``` + +Output + +``` +1 +``` + +You can use `Min.integersPerKey()` to calculate the minimum Integer associated with each unique key (which is of type String). + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 4), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Integer>> minPerKey = input.apply(Min.integersPerKey()); +``` + +Output + +``` +KV{π, 1} +KV{π₯, 2} +KV{π , 3} +``` +{{end}} +{{if (eq .Sdk "python")}} +### Minimum element in a PCollection + +You can use `CombineGlobally(lambda elements: min(elements or [-1]))` to get the minimum element from the entire `PCollection`. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + min_element = ( + p + | 'Create numbers' >> beam.Create([3, 4, 1, 2]) + | 'Get min value' >> + beam.CombineGlobally(lambda elements: min(elements or [-1])) + | beam.Map(print)) +``` + +Output +``` +1 +``` + +### Minimum elements for each key + +You can use `Combine.PerKey()` to get the minimum element for each unique key in a `PCollection` of key-values. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + elements_with_min_value_per_key = ( + p + | 'Create produce' >> beam.Create([ + ('π₯', 3), + ('π₯', 2), + ('π', 1), + ('π ', 4), + ('π ', 5), + ('π ', 3), + ]) + | 'Get min value per key' >> beam.CombinePerKey(min) + | beam.Map(print)) +``` + +Output + +``` +('π₯', 2) +('π', 1) +('π ', 3) +``` +{{end}} + + +### Playground exercise + +You can find the full code of this example in the playground window, which you can run and experiment with. +{{if (eq .Sdk "go")}} +`Min` returns the minimum number from the `PCollection`. If you replace the `integers input` with this `map input`: + +``` +input:= beam.ParDo(s, func(_ []byte, emit func(int, int)){ + emit(1,1) + emit(1,4) + emit(2,6) + emit(2,3) + emit(2,-4) + emit(3,23) +}, beam.Impulse(s)) +``` + +And replace `stats.Min` with `stats.MinPerKey` it will output the minimum numbers by key. +{{end}} +{{if (eq .Sdk "java")}} +`Min.integersGlobally` returns the minimum number from the `PCollection`. If you replace the `integers input` with this `map input`: + +``` +PCollection<KV<Integer, Integer>> input = pipeline.apply( + Create.of(KV.of(1, 11), + KV.of(1, 36), + KV.of(2, 91), + KV.of(3, 33), + KV.of(3, 11), + KV.of(4, 33))); +``` + +And replace `Min.integersGlobally` with `Min.integersPerKey` it will output the minimum numbers by key. It is also necessary to replace the generic type: + +``` +PCollection<KV<Integer, Integer>> output = applyTransform(input); +``` + +``` +static PCollection<KV<Integer, Integer>> applyTransform(PCollection<KV<Integer, Integer>> input) { + return input.apply(Sum.integersPerKey()); + } +``` +{{end}} +{{if (eq .Sdk "python")}} +`Top.Smallest` returns smaller numbers from `PCollection` than specified in the function argument. If you replace the `integers input` with this `map input` and replace `beam.combiners.Top.Smallest(5)` with `beam.CombinePerKey(min)` it will output the minimum numbers by key : + +``` +beam.Create([ Review Comment: (p | beam.Create([ ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/min/description.md: ########## @@ -0,0 +1,190 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Min + +Min transforms find the minimum values globally or for each key in the input collection. +{{if (eq .Sdk "go")}} +You can find the global minimum value from the `PCollection` by using `Min()` + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Min(s, input) +} +``` + +You can use `MinPerKey()` to calculate the minimum Integer associated with each unique key (which is of type String). + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.MinPerKey(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +You can find the global minimum value from the `PCollection` by using `Min.doublesGlobally()` + +``` +PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Double> min = input.apply(Min.doublesGlobally()); +``` + +Output + +``` +1 +``` + +You can use `Min.integersPerKey()` to calculate the minimum Integer associated with each unique key (which is of type String). + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 4), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Integer>> minPerKey = input.apply(Min.integersPerKey()); +``` + +Output + +``` +KV{π, 1} +KV{π₯, 2} +KV{π , 3} +``` +{{end}} +{{if (eq .Sdk "python")}} +### Minimum element in a PCollection + +You can use `CombineGlobally(lambda elements: min(elements or [-1]))` to get the minimum element from the entire `PCollection`. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + min_element = ( + p + | 'Create numbers' >> beam.Create([3, 4, 1, 2]) + | 'Get min value' >> + beam.CombineGlobally(lambda elements: min(elements or [-1])) + | beam.Map(print)) +``` + +Output +``` +1 +``` + +### Minimum elements for each key + +You can use `Combine.PerKey()` to get the minimum element for each unique key in a `PCollection` of key-values. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + elements_with_min_value_per_key = ( + p Review Comment: (p | [first transform] on one line ########## learning/tour-of-beam/learning-content/common-transforms/filter/description.md: ########## @@ -0,0 +1,420 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +### Using Filter + +`PCollection` datasets can be filtered using the `Filter` transform. You can create a filter by supplying a predicate and, when applied, filtering out all the elements of `PCollection` that donβt satisfy the predicate. + +{{if (eq .Sdk "go")}} +``` +import ( + "github.com/apache/fbeam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return filter.Exclude(s, input, func(element int) bool { + return element % 2 == 1 + }) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +``` +PCollection<String> input = pipeline + .apply(Create.of(List.of("Hello","world","Hi"))); + +PCollection<String> filteredStrings = input + .apply(Filter.by(new SerializableFunction<String, Boolean>() { + @Override + public Boolean apply(String input) { + return input.length() > 3; + } + })); +``` + +Output + +``` +Hello +world +``` + +### Built-in filters + +The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThen` With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThen` to filter out elements of the input `PCollection` whose values are greater than the specified amount. + +Other built-in filters are: + +* Filter.greaterThanEq +* Filter.greaterThan +* Filter.lessThan +* Filter.lessThanEq +* Filter.equal + + +## Example 2: Filtering with a built-in methods + +``` +// List of integers +PCollection<Integer> input = pipeline.apply(Create.of(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))); + +PCollection<Integer> greaterThanEqNumbers = input.apply(Filter.greaterThanEq(3)); +// PCollection will contain [3, 4, 5, 6, 7, 8, 9, 10] at this point + +PCollection<Integer> greaterThanNumbers = input.apply(Filter.greaterThan(4)); +// PCollection will contain [5, 6, 7, 8, 9, 10] at this point + + +PCollection<Integer> lessThanNumbers = input.apply(Filter.lessThan(10)); +// PCollection will contain [1, 2, 3, 4, 5, 6, 7, 8, 9] at this point + + +PCollection<Integer> lessThanEqNumbers = input.apply(Filter.lessThanEq(7)); +// PCollection will contain [1, 2, 3, 4 5, 6, 7] at this point + + +PCollection<Integer> equalNumbers = input.apply(Filter.equal(9)); +// PCollection will contain [9] at this point +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +import apache_beam as beam + +from log_elements import LogElements + +with beam.Pipeline() as p: + (p | beam.Create(range(1, 11)) + | beam.Filter(lambda num: num % 2 == 0) + | LogElements()) +``` + + +### Example 1: Filtering with a function + +You can define a function `is_perennial()` which returns True if the elementβs duration equals 'perennial', and False otherwise. + +``` +import apache_beam as beam + +def is_perennial(plant): + return plant['duration'] == 'perennial' + +with beam.Pipeline() as p: + perennials = ( + p + | 'Gardening plants' >> beam.Create([ + { + 'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial' + }, + { + 'icon': 'π₯', 'name': 'Carrot', 'duration': 'biennial' + }, + { + 'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial' + }, + { + 'icon': 'π ', 'name': 'Tomato', 'duration': 'annual' + }, + { + 'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial' + }, + ]) + | 'Filter perennials' >> beam.Filter(is_perennial) + | beam.Map(print)) +``` + +Output + +``` +{'icon': 'π', 'name': 'Strawberry', 'duration': 'perennial'} +{'icon': 'π', 'name': 'Eggplant', 'duration': 'perennial'} +{'icon': 'π₯', 'name': 'Potato', 'duration': 'perennial'} +``` + +### Example 2: Filtering with a lambda function + +You can also use lambda functions to simplify Example 1. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + perennials = ( + p Review Comment: (p | [first transform] on one line ########## learning/tour-of-beam/learning-content/common-transforms/filter/description.md: ########## @@ -0,0 +1,420 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +### Using Filter + +`PCollection` datasets can be filtered using the `Filter` transform. You can create a filter by supplying a predicate and, when applied, filtering out all the elements of `PCollection` that donβt satisfy the predicate. + +{{if (eq .Sdk "go")}} +``` +import ( + "github.com/apache/fbeam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return filter.Exclude(s, input, func(element int) bool { + return element % 2 == 1 + }) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +``` +PCollection<String> input = pipeline + .apply(Create.of(List.of("Hello","world","Hi"))); + +PCollection<String> filteredStrings = input + .apply(Filter.by(new SerializableFunction<String, Boolean>() { + @Override + public Boolean apply(String input) { + return input.length() > 3; + } + })); +``` + +Output + +``` +Hello +world +``` + +### Built-in filters + +The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThen` With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThen` to filter out elements of the input `PCollection` whose values are greater than the specified amount. + +Other built-in filters are: + +* Filter.greaterThanEq +* Filter.greaterThan +* Filter.lessThan +* Filter.lessThanEq +* Filter.equal + + +## Example 2: Filtering with a built-in methods + +``` +// List of integers +PCollection<Integer> input = pipeline.apply(Create.of(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))); + +PCollection<Integer> greaterThanEqNumbers = input.apply(Filter.greaterThanEq(3)); +// PCollection will contain [3, 4, 5, 6, 7, 8, 9, 10] at this point + +PCollection<Integer> greaterThanNumbers = input.apply(Filter.greaterThan(4)); +// PCollection will contain [5, 6, 7, 8, 9, 10] at this point + + +PCollection<Integer> lessThanNumbers = input.apply(Filter.lessThan(10)); +// PCollection will contain [1, 2, 3, 4, 5, 6, 7, 8, 9] at this point + + +PCollection<Integer> lessThanEqNumbers = input.apply(Filter.lessThanEq(7)); +// PCollection will contain [1, 2, 3, 4 5, 6, 7] at this point + + +PCollection<Integer> equalNumbers = input.apply(Filter.equal(9)); +// PCollection will contain [9] at this point +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +import apache_beam as beam + +from log_elements import LogElements + +with beam.Pipeline() as p: + (p | beam.Create(range(1, 11)) + | beam.Filter(lambda num: num % 2 == 0) + | LogElements()) +``` + + +### Example 1: Filtering with a function + +You can define a function `is_perennial()` which returns True if the elementβs duration equals 'perennial', and False otherwise. + +``` +import apache_beam as beam + +def is_perennial(plant): + return plant['duration'] == 'perennial' + +with beam.Pipeline() as p: + perennials = ( + p Review Comment: (p | [first transform] on one line ########## learning/tour-of-beam/learning-content/common-transforms/filter/description.md: ########## @@ -0,0 +1,420 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +### Using Filter + +`PCollection` datasets can be filtered using the `Filter` transform. You can create a filter by supplying a predicate and, when applied, filtering out all the elements of `PCollection` that donβt satisfy the predicate. + +{{if (eq .Sdk "go")}} +``` +import ( + "github.com/apache/fbeam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return filter.Exclude(s, input, func(element int) bool { + return element % 2 == 1 + }) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +``` +PCollection<String> input = pipeline + .apply(Create.of(List.of("Hello","world","Hi"))); + +PCollection<String> filteredStrings = input + .apply(Filter.by(new SerializableFunction<String, Boolean>() { + @Override + public Boolean apply(String input) { + return input.length() > 3; + } + })); +``` + +Output + +``` +Hello +world +``` + +### Built-in filters + +The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThen` With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThen` to filter out elements of the input `PCollection` whose values are greater than the specified amount. Review Comment: replace with: The Java SDK has several filter methods built-in, like `Filter.greaterThan` and `Filter.lessThan` With `Filter.greaterThan`, the input `PCollection` can be filtered so that only the elements whose values are greater than the specified amount remain. Similarly, you can use `Filter.lessThan` to filter out elements of the input `PCollection` whose values are less than the specified amount. ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/mean/description.md: ########## @@ -0,0 +1,141 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Mean + +You can use Mean transforms to compute the arithmetic mean of the elements in a collection or the mean of the values associated with each key in a collection of key-value pairs. +{{if (eq .Sdk "go")}} +```Mean()``` returns a transformation that returns a collection whose content is the average of the elements of the input collection. If there are no elements in the input collection, 0 is returned. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Mean(s, input) +} +``` + +You can use ```MeanPerKey()``` to calculate the mean of the elements associated with each unique key. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.MeanPerKey(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +```Mean.globally()``` returns a transformation that returns a collection whose content is the average of the elements of the input collection. If there are no elements in the input collection, 0 is returned. + +``` +PCollection<Integer> numbers = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Double> mean = numbers.apply(Mean.globally()); +``` + +Output + +``` +5.5 +``` + + +```Mean.perKey()``` returns a transform that returns a collection containing an output element mapping each distinct key in the input collection to the mean of the values associated with that key in the input collection. + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("π₯", 3), + KV.of("π₯", 2), + KV.of("π", 1), + KV.of("π ", 4), + KV.of("π ", 5), + KV.of("π ", 3))); +PCollection<KV<String, Double>> meanPerKey = input.apply(Mean.perKey()); +``` + +Output + +``` +KV{π, 1.0} +KV{π₯, 2.5} +KV{π , 4.0} +``` +{{end}} +{{if (eq .Sdk "python")}} +{{end}} + + +### Playground exercise + +You can find the full code of this example in the playground window, which you can run and experiment with. +{{if (eq .Sdk "go")}} +`Mean` returns the mean from the `PCollection`. If you replace the `integers input` with this `map input`: + +``` +input:= beam.ParDo(s, func(_ []byte, emit func(int, int)){ + emit(1,1) + emit(1,4) + emit(2,6) + emit(2,3) + emit(2,-4) + emit(3,23) +}, beam.Impulse(s)) +``` + +And replace `stats.Mean` on `stats.MeanPerKey` it will output the mean by key. +{{end}} +{{if (eq .Sdk "java")}} +`Mean.globally` returns the mean from the `PCollection`. If you replace the `integers input` with this `map input`: + +``` +PCollection<KV<Integer, Integer>> input = pipeline.apply( + Create.of(KV.of(1, 11), + KV.of(1, 36), + KV.of(2, 91), + KV.of(3, 33), + KV.of(3, 11), + KV.of(4, 33))); +``` + +And replace `Mean.globally` on `Mean.perKey` it will output the means by key. It is also necessary to replace the generic type: + +``` +PCollection<KV<Integer, Double>> output = applyTransform(input); +``` + +``` +static PCollection<KV<Integer, Double>> applyTransform(PCollection<KV<Integer, Integer>> input) { + return input.apply(Mean.perKey()); + } +``` +{{end}} +{{if (eq .Sdk "python")}} +`beam.combiners.Mean.Globally()` returns mean from `PCCollection`. If you replace the `integers input` with this `map input` and replace `beam.combiners.Mean.Globally()` on `beam.combiners.Mean.PerKey()` it will output the mean by key : Review Comment: please fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
