This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 472ed9f8d86 [Go Docs] Remove outdated examples of anonymous functions 
from pardo, partition, and filter. (#29181)
472ed9f8d86 is described below

commit 472ed9f8d8645fea8aadace94ea561aa511b2d70
Author: Robert Burke <[email protected]>
AuthorDate: Mon Oct 30 08:48:00 2023 -0700

    [Go Docs] Remove outdated examples of anonymous functions from pardo, 
partition, and filter. (#29181)
---
 sdks/go/pkg/beam/pardo.go                    | 46 +++++++++++++++++-----------
 sdks/go/pkg/beam/partition.go                | 13 ++++++++
 sdks/go/pkg/beam/transforms/filter/filter.go | 22 +++++++++----
 3 files changed, 57 insertions(+), 24 deletions(-)

diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index d18945834d6..629ce329c9b 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -157,11 +157,14 @@ func ParDo0(s Scope, dofn any, col PCollection, opts 
...Option) {
 // struct may also define Setup, StartBundle, FinishBundle and Teardown 
methods.
 // The struct is JSON-serialized and may contain construction-time values.
 //
+// Functions and types used as DoFns must be registered with beam using the
+// beam `register` package, so they may execute on distributed workers.
+// Functions must not be anonymous or closures, or they will fail at execution 
time.
+//
 // Conceptually, when a ParDo transform is executed, the elements of the input
 // PCollection are first divided up into some number of "bundles". These are
-// farmed off to distributed worker machines (or run locally, if using the
-// direct runner). For each bundle of input elements processing proceeds as
-// follows:
+// farmed off to distributed worker machines (or locally on a local runner 
instance).
+// For each bundle of input elements processing proceeds as follows:
 //
 //   - If a struct, a fresh instance of the argument DoFn is created on a
 //     worker from json serialization, and the Setup method is called on this
@@ -187,10 +190,11 @@ func ParDo0(s Scope, dofn any, col PCollection, opts 
...Option) {
 //
 // For example:
 //
+//     func stringLen(word string) int { return len(word)      }
+//     func init() { register.Function1x1(stringLen) }
+//
 //     words := beam.ParDo(s, &Foo{...}, ...)
-//     lengths := beam.ParDo(s, func (word string) int) {
-//           return len(word)
-//     }, words)
+//     lengths := beam.ParDo(s, stringLen, words)
 //
 // Each output element has the same timestamp and is in the same windows as its
 // corresponding input element. The timestamp can be accessed and/or emitted by
@@ -207,28 +211,34 @@ func ParDo0(s Scope, dofn any, col PCollection, opts 
...Option) {
 // options, and their contents accessible to each of the DoFn operations. For
 // example:
 //
+//     func filterLessThanCutoff(word string, cutoff int, emit func(string)) {
+//             if len(word) < cutoff {
+//                     emit(word)
+//             }
+//     }
+//     func init() { register.Function3x0(filterLessThanCutoff) }
+//
 //     words := ...
 //     cufoff := ...  // Singleton PCollection<int>
-//     smallWords := beam.ParDo(s, func (word string, cutoff int, emit 
func(string)) {
-//           if len(word) < cutoff {
-//                emit(word)
-//           }
-//     }, words, beam.SideInput{Input: cutoff})
+//     smallWords := beam.ParDo(s, filterLessThanCutoff, words, 
beam.SideInput{Input: cutoff})
 //
 // # Additional Outputs
 //
 // Optionally, a ParDo transform can produce zero or multiple output
 // PCollections. Note the use of ParDo2 to specfic 2 outputs. For example:
 //
+//     func partitionAtCutoff(word string, cutoff int, small, big 
func(string)) {
+//             if len(word) < cutoff {
+//                     small(word)
+//             } else {
+//                     big(word)
+//             }
+//     }
+//     func init() { register.Function4x0(partitionAtCutoff) }
+//
 //     words := ...
 //     cufoff := ...  // Singleton PCollection<int>
-//     small, big := beam.ParDo2(s, func (word string, cutoff int, small, big 
func(string)) {
-//           if len(word) < cutoff {
-//                small(word)
-//           } else {
-//                big(word)
-//           }
-//     }, words, beam.SideInput{Input: cutoff})
+//     small, big := beam.ParDo2(s, partitionAtCutoff, words, 
beam.SideInput{Input: cutoff})
 //
 // By default, the Coders for the elements of each output PCollections is
 // inferred from the concrete type.
diff --git a/sdks/go/pkg/beam/partition.go b/sdks/go/pkg/beam/partition.go
index 37498ddbc0b..1c79965ea63 100644
--- a/sdks/go/pkg/beam/partition.go
+++ b/sdks/go/pkg/beam/partition.go
@@ -39,6 +39,19 @@ var (
 //
 // A PartitionFn has the signature `func(T) int.`
 //
+//     func lenToTen(s string) int {
+//             if len(s) > 9 {
+//                     return 10
+//             }
+//             return len(s)
+//     }
+//
+//     // Partition functions must be registered with Beam, and must not be 
closures.
+//     func init() { register.Function1x1(lenToTen) }
+//
+//     // The number of partitions goes up to 11 since we can return 0 through 
10
+//     wordsByLength := beam.Partition(s, 11, lenToTen, inputStrings)
+//
 // T is permitted to be a KV.
 func Partition(s Scope, n int, fn any, col PCollection) []PCollection {
        s = s.Scope(fmt.Sprintf("Partition(%v)", n))
diff --git a/sdks/go/pkg/beam/transforms/filter/filter.go 
b/sdks/go/pkg/beam/transforms/filter/filter.go
index 997eec5eb4e..699ec9c4c79 100644
--- a/sdks/go/pkg/beam/transforms/filter/filter.go
+++ b/sdks/go/pkg/beam/transforms/filter/filter.go
@@ -40,10 +40,15 @@ var (
 // the filter function returns false. It returns a PCollection of the same type
 // as the input. For example:
 //
+//     func lessThanThree(s string) bool {
+//             return len(s) < 3
+//     }
+//
+//     // Filter functions must be registered with Beam, and must not be 
closures.
+//     func init() { register.Function1x1(lessThanThree) }
+//
 //     words := beam.Create(s, "a", "b", "long", "alsolong")
-//     short := filter.Include(s, words, func(s string) bool {
-//         return len(s) < 3
-//     })
+//     short := filter.Include(s, words, lessThanThree)
 //
 // Here, "short" will contain "a" and "b" at runtime.
 func Include(s beam.Scope, col beam.PCollection, fn any) beam.PCollection {
@@ -58,10 +63,15 @@ func Include(s beam.Scope, col beam.PCollection, fn any) 
beam.PCollection {
 // the filter function returns true. It returns a PCollection of the same type
 // as the input. For example:
 //
+//     func lessThanThree(s string) bool {
+//             return len(s) < 3
+//     }
+//
+//     // Filter functions must be registered with Beam, and must not be 
closures.
+//     func init() { register.Function1x1(lessThanThree) }
+//
 //     words := beam.Create(s, "a", "b", "long", "alsolong")
-//     long := filter.Exclude(s, words, func(s string) bool {
-//         return len(s) < 3
-//     })
+//     long := filter.Exclude(s, words, lessThanThree)
 //
 // Here, "long" will contain "long" and "alsolong" at runtime.
 func Exclude(s beam.Scope, col beam.PCollection, fn any) beam.PCollection {

Reply via email to