This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 472ed9f8d86 [Go Docs] Remove outdated examples of anonymous functions
from pardo, partition, and filter. (#29181)
472ed9f8d86 is described below
commit 472ed9f8d8645fea8aadace94ea561aa511b2d70
Author: Robert Burke <[email protected]>
AuthorDate: Mon Oct 30 08:48:00 2023 -0700
[Go Docs] Remove outdated examples of anonymous functions from pardo,
partition, and filter. (#29181)
---
sdks/go/pkg/beam/pardo.go | 46 +++++++++++++++++-----------
sdks/go/pkg/beam/partition.go | 13 ++++++++
sdks/go/pkg/beam/transforms/filter/filter.go | 22 +++++++++----
3 files changed, 57 insertions(+), 24 deletions(-)
diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index d18945834d6..629ce329c9b 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -157,11 +157,14 @@ func ParDo0(s Scope, dofn any, col PCollection, opts
...Option) {
// struct may also define Setup, StartBundle, FinishBundle and Teardown
methods.
// The struct is JSON-serialized and may contain construction-time values.
//
+// Functions and types used as DoFns must be registered with beam using the
+// beam `register` package, so they may execute on distributed workers.
+// Functions must not be anonymous or closures, or they will fail at execution
time.
+//
// Conceptually, when a ParDo transform is executed, the elements of the input
// PCollection are first divided up into some number of "bundles". These are
-// farmed off to distributed worker machines (or run locally, if using the
-// direct runner). For each bundle of input elements processing proceeds as
-// follows:
+// farmed off to distributed worker machines (or locally on a local runner
instance).
+// For each bundle of input elements processing proceeds as follows:
//
// - If a struct, a fresh instance of the argument DoFn is created on a
// worker from json serialization, and the Setup method is called on this
@@ -187,10 +190,11 @@ func ParDo0(s Scope, dofn any, col PCollection, opts
...Option) {
//
// For example:
//
+// func stringLen(word string) int { return len(word) }
+// func init() { register.Function1x1(stringLen) }
+//
// words := beam.ParDo(s, &Foo{...}, ...)
-// lengths := beam.ParDo(s, func (word string) int) {
-// return len(word)
-// }, words)
+// lengths := beam.ParDo(s, stringLen, words)
//
// Each output element has the same timestamp and is in the same windows as its
// corresponding input element. The timestamp can be accessed and/or emitted by
@@ -207,28 +211,34 @@ func ParDo0(s Scope, dofn any, col PCollection, opts
...Option) {
// options, and their contents accessible to each of the DoFn operations. For
// example:
//
+// func filterLessThanCutoff(word string, cutoff int, emit func(string)) {
+// if len(word) < cutoff {
+// emit(word)
+// }
+// }
+// func init() { register.Function3x0(filterLessThanCutoff) }
+//
// words := ...
// cufoff := ... // Singleton PCollection<int>
-// smallWords := beam.ParDo(s, func (word string, cutoff int, emit
func(string)) {
-// if len(word) < cutoff {
-// emit(word)
-// }
-// }, words, beam.SideInput{Input: cutoff})
+// smallWords := beam.ParDo(s, filterLessThanCutoff, words,
beam.SideInput{Input: cutoff})
//
// # Additional Outputs
//
// Optionally, a ParDo transform can produce zero or multiple output
// PCollections. Note the use of ParDo2 to specfic 2 outputs. For example:
//
+// func partitionAtCutoff(word string, cutoff int, small, big
func(string)) {
+// if len(word) < cutoff {
+// small(word)
+// } else {
+// big(word)
+// }
+// }
+// func init() { register.Function4x0(partitionAtCutoff) }
+//
// words := ...
// cufoff := ... // Singleton PCollection<int>
-// small, big := beam.ParDo2(s, func (word string, cutoff int, small, big
func(string)) {
-// if len(word) < cutoff {
-// small(word)
-// } else {
-// big(word)
-// }
-// }, words, beam.SideInput{Input: cutoff})
+// small, big := beam.ParDo2(s, partitionAtCutoff, words,
beam.SideInput{Input: cutoff})
//
// By default, the Coders for the elements of each output PCollections is
// inferred from the concrete type.
diff --git a/sdks/go/pkg/beam/partition.go b/sdks/go/pkg/beam/partition.go
index 37498ddbc0b..1c79965ea63 100644
--- a/sdks/go/pkg/beam/partition.go
+++ b/sdks/go/pkg/beam/partition.go
@@ -39,6 +39,19 @@ var (
//
// A PartitionFn has the signature `func(T) int.`
//
+// func lenToTen(s string) int {
+// if len(s) > 9 {
+// return 10
+// }
+// return len(s)
+// }
+//
+// // Partition functions must be registered with Beam, and must not be
closures.
+// func init() { register.Function1x1(lenToTen) }
+//
+// // The number of partitions goes up to 11 since we can return 0 through
10
+// wordsByLength := beam.Partition(s, 11, lenToTen, inputStrings)
+//
// T is permitted to be a KV.
func Partition(s Scope, n int, fn any, col PCollection) []PCollection {
s = s.Scope(fmt.Sprintf("Partition(%v)", n))
diff --git a/sdks/go/pkg/beam/transforms/filter/filter.go
b/sdks/go/pkg/beam/transforms/filter/filter.go
index 997eec5eb4e..699ec9c4c79 100644
--- a/sdks/go/pkg/beam/transforms/filter/filter.go
+++ b/sdks/go/pkg/beam/transforms/filter/filter.go
@@ -40,10 +40,15 @@ var (
// the filter function returns false. It returns a PCollection of the same type
// as the input. For example:
//
+// func lessThanThree(s string) bool {
+// return len(s) < 3
+// }
+//
+// // Filter functions must be registered with Beam, and must not be
closures.
+// func init() { register.Function1x1(lessThanThree) }
+//
// words := beam.Create(s, "a", "b", "long", "alsolong")
-// short := filter.Include(s, words, func(s string) bool {
-// return len(s) < 3
-// })
+// short := filter.Include(s, words, lessThanThree)
//
// Here, "short" will contain "a" and "b" at runtime.
func Include(s beam.Scope, col beam.PCollection, fn any) beam.PCollection {
@@ -58,10 +63,15 @@ func Include(s beam.Scope, col beam.PCollection, fn any)
beam.PCollection {
// the filter function returns true. It returns a PCollection of the same type
// as the input. For example:
//
+// func lessThanThree(s string) bool {
+// return len(s) < 3
+// }
+//
+// // Filter functions must be registered with Beam, and must not be
closures.
+// func init() { register.Function1x1(lessThanThree) }
+//
// words := beam.Create(s, "a", "b", "long", "alsolong")
-// long := filter.Exclude(s, words, func(s string) bool {
-// return len(s) < 3
-// })
+// long := filter.Exclude(s, words, lessThanThree)
//
// Here, "long" will contain "long" and "alsolong" at runtime.
func Exclude(s beam.Scope, col beam.PCollection, fn any) beam.PCollection {