kerrydc commented on code in PR #23577: URL: https://github.com/apache/beam/pull/23577#discussion_r1128368324
########## learning/tour-of-beam/learning-content/common-transforms/aggregation/count/description.md: ########## @@ -0,0 +1,311 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Count + +`Count` provides many transformations for calculating the count of values in a `PCollection`, either globally or for each key. + +{{if (eq .Sdk "go")}} +Counts the number of elements within each aggregation. The Count transform has two varieties: + +You can count the number of elements in a `PCollection` with `CountElms()`, it will return one element. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.CountElms(s, input) +} +``` + +You can use `Count()` to count how many elements are associated with a particular key. The result will be one output for each key. + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Count(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +Counts the number of elements within each aggregation. The Count transform has three varieties: + +### Counting all elements in a PCollection + +`Count.globally()` counts the number of elements in the entire `PCollection`. The result is a collection with a single element. + +``` +PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Long> output = input.apply(Count.globally()); +``` + +Output +``` +10 +``` + +### Counting elements for each key + +`Count.perKey()` counts how many elements are associated with each key. It ignores the values. The resulting collection has one output for every key in the input collection. + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("🥕", 3), + KV.of("🥕", 2), + KV.of("🍆", 1), + KV.of("🍅", 4), + KV.of("🍅", 5), + KV.of("🍅", 3))); +PCollection<KV<String, Long>> output = input.apply(Count.perKey()); +``` + +Output + +``` +KV{🥕, 2} +KV{🍅, 3} +KV{🍆, 1} +``` + +### Counting all unique elements + +`Count.perElement()` counts how many times each element appears in the input collection. The output collection is a key-value pair, containing each unique element and the number of times it appeared in the original collection. + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("🥕", 3), + KV.of("🥕", 2), + KV.of("🍆", 1), + KV.of("🍅", 3), + KV.of("🍅", 5), + KV.of("🍅", 3))); +PCollection<KV<String, Long>> output = input.apply(Count.perElement()); +``` + +Output + +``` +KV{KV{🍅, 3}, 2} +KV{KV{🥕, 2}, 1} +KV{KV{🍆, 1}, 1} +KV{KV{🥕, 3}, 1} +KV{KV{🍅, 5}, 1} +``` +{{end}} +{{if (eq .Sdk "python")}} +### Counting all elements in a PCollection + +You can use `Count.Globally()` to count all elements in a PCollection, even if there are duplicate elements. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + total_elements = ( + p + | 'Create plants' >> beam.Create( + ['🍓', '🥕', '🥕', '🥕', '🍆', '🍆', '🍅', '🍅', '🍅', '🌽']) + | 'Count all elements' >> beam.combiners.Count.Globally() + | beam.Map(print)) +``` + +Output + +``` +10 +``` + +### Counting elements for each key + +You can use `Count.PerKey()` to count the elements for each unique key in a PCollection of key-values. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + total_elements_per_keys = ( + p + | 'Create plants' >> beam.Create([ + ('spring', '🍓'), + ('spring', '🥕'), + ('summer', '🥕'), + ('fall', '🥕'), + ('spring', '🍆'), + ('winter', '🍆'), + ('spring', '🍅'), + ('summer', '🍅'), + ('fall', '🍅'), + ('summer', '🌽'), + ]) + | 'Count elements per key' >> beam.combiners.Count.PerKey() + | beam.Map(print)) +``` + +Output + +``` +('spring', 4) +('summer', 3) +('fall', 2) +('winter', 1) +``` + +### Counting all unique elements + +You can use `Count.PerElement()` to count only the unique elements in a `PCollection`. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + total_unique_elements = ( + p Review Comment: p | [first transform] on one line ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/min/java-example/Task.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: Min +// description: Min example. +// multifile: false +// context_line: 42 +// categories: +// - Quickstart +// complexity: BASIC +// tags: +// - hellobeam + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.beam.sdk.values.KV; + + +public class Task { + + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + Pipeline pipeline = Pipeline.create(options); + + // Create input PCollection + PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + + // The applyTransform() converts [input] to [output] + PCollection<Integer> output = applyTransform(input); + + output.apply("Log", ParDo.of(new LogOutput<>("PCollection minimum value"))); + + pipeline.run(); + } + + // Min.integersGlobally() to return the globally minimum from `PCollection`. Review Comment: // Min.integersGlobally() returns the global minimum from `PCollection`. ########## learning/tour-of-beam/learning-content/common-transforms/aggregation/min/description.md: ########## @@ -0,0 +1,190 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Min + +Min transforms find the minimum values globally or for each key in the input collection. +{{if (eq .Sdk "go")}} +You can find the global minimum value from the `PCollection` by using `Min()` + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.Min(s, input) +} +``` + +You can use `MinPerKey()` to calculate the minimum Integer associated with each unique key (which is of type String). + +``` +import ( + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats" +) + +func ApplyTransform(s beam.Scope, input beam.PCollection) beam.PCollection { + return stats.MinPerKey(s, input) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +You can find the global minimum value from the `PCollection` by using `Min.doublesGlobally()` + +``` +PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); +PCollection<Double> min = input.apply(Min.doublesGlobally()); +``` + +Output + +``` +1 +``` + +You can use `Min.integersPerKey()` to calculate the minimum Integer associated with each unique key (which is of type String). + +``` +PCollection<KV<String, Integer>> input = pipeline.apply( + Create.of(KV.of("🥕", 3), + KV.of("🥕", 2), + KV.of("🍆", 1), + KV.of("🍅", 4), + KV.of("🍅", 5), + KV.of("🍅", 3))); +PCollection<KV<String, Integer>> minPerKey = input.apply(Min.integersPerKey()); +``` + +Output + +``` +KV{🍆, 1} +KV{🥕, 2} +KV{🍅, 3} +``` +{{end}} +{{if (eq .Sdk "python")}} +### Minimum element in a PCollection + +You can use `CombineGlobally(lambda elements: min(elements or [-1]))` to get the minimum element from the entire `PCollection`. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + min_element = ( + p + | 'Create numbers' >> beam.Create([3, 4, 1, 2]) + | 'Get min value' >> + beam.CombineGlobally(lambda elements: min(elements or [-1])) + | beam.Map(print)) +``` + +Output +``` +1 +``` + +### Minimum elements for each key + +You can use `Combine.PerKey()` to get the minimum element for each unique key in a `PCollection` of key-values. + +``` +import apache_beam as beam + +with beam.Pipeline() as p: + elements_with_min_value_per_key = ( + p + | 'Create produce' >> beam.Create([ + ('🥕', 3), + ('🥕', 2), + ('🍆', 1), + ('🍅', 4), + ('🍅', 5), + ('🍅', 3), + ]) + | 'Get min value per key' >> beam.CombinePerKey(min) + | beam.Map(print)) +``` + +Output + +``` +('🥕', 2) +('🍆', 1) +('🍅', 3) +``` +{{end}} + + +### Playground exercise + +You can find the full code of this example in the playground window, which you can run and experiment with. +{{if (eq .Sdk "go")}} +`Min` returns the minimum number from the `PCollection`. If you replace the `integers input` with this `map input`: + +``` +input:= beam.ParDo(s, func(_ []byte, emit func(int, int)){ + emit(1,1) + emit(1,4) + emit(2,6) + emit(2,3) + emit(2,-4) + emit(3,23) +}, beam.Impulse(s)) +``` + +And replace `stats.Min` with `stats.MinPerKey` it will output the minimum numbers by key. +{{end}} +{{if (eq .Sdk "java")}} +`Min.integersGlobally` returns the minimum number from the `PCollection`. If you replace the `integers input` with this `map input`: + +``` +PCollection<KV<Integer, Integer>> input = pipeline.apply( + Create.of(KV.of(1, 11), + KV.of(1, 36), + KV.of(2, 91), + KV.of(3, 33), + KV.of(3, 11), + KV.of(4, 33))); +``` + +And replace `Min.integersGlobally` with `Min.integersPerKey` it will output the minimum numbers by key. It is also necessary to replace the generic type: + +``` +PCollection<KV<Integer, Integer>> output = applyTransform(input); +``` + +``` +static PCollection<KV<Integer, Integer>> applyTransform(PCollection<KV<Integer, Integer>> input) { + return input.apply(Sum.integersPerKey()); + } +``` +{{end}} +{{if (eq .Sdk "python")}} +`Top.Smallest` returns smaller numbers from `PCollection` than specified in the function argument. If you replace the `integers input` with this `map input` and replace `beam.combiners.Top.Smallest(5)` with `beam.CombinePerKey(min)` it will output the minimum numbers by key : Review Comment: Not correct. Should be: `Top.Smallest(N)` returns the n smalleast numbers from the `PCollection`. If you replace the `integers input` with this `map input` and replace `beam.combiners.Top.Smallest(5)` with `beam.CombinePerKey(min)` it will output the minimum numbers by key -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
