This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b76db4d [BEAM-12513] Update initial sections of BPG for Go (#15057)
b76db4d is described below
commit b76db4db641637fd59b6f194081435dcc2a683cc
Author: Robert Burke <[email protected]>
AuthorDate: Fri Jul 2 10:27:51 2021 -0700
[BEAM-12513] Update initial sections of BPG for Go (#15057)
* [BEAM-12513] Update initial sections of BPG for Go
* [BEAM-12513] Reformat code to be easier to find
* [BEAM-12513] Add scoping and additional comments.
Co-authored-by: Jack McCluskey
<[email protected]>
---
sdks/go/examples/snippets/01_03intro.go | 102 +++++
sdks/go/examples/snippets/04transforms.go | 289 ++++++++++++++
sdks/go/examples/snippets/04transforms_test.go | 185 +++++++++
sdks/go/examples/snippets/doc.go | 23 ++
.../content/en/documentation/programming-guide.md | 432 +++++++++++++++++----
5 files changed, 966 insertions(+), 65 deletions(-)
diff --git a/sdks/go/examples/snippets/01_03intro.go
b/sdks/go/examples/snippets/01_03intro.go
new file mode 100644
index 0000000..37496e9
--- /dev/null
+++ b/sdks/go/examples/snippets/01_03intro.go
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package snippets
+
+import (
+ "flag"
+
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
+)
+
+// PipelineConstruction contains snippets for the initial sections of
+// the Beam Programming Guide, from initializing to submitting a
+// pipeline.
+func PipelineConstruction() {
+ // [START pipeline_options]
+ // If beamx or Go flags are used, flags must be parsed first,
+ // before beam.Init() is called.
+ flag.Parse()
+ // [END pipeline_options]
+
+ // [START pipelines_constructing_creating]
+ // beam.Init() is an initialization hook that must be called
+ // near the beginning of main(), before creating a pipeline.
+ beam.Init()
+
+ // Create the Pipeline object and root scope.
+ pipeline, scope := beam.NewPipelineWithRoot()
+ // [END pipelines_constructing_creating]
+
+ // [START pipelines_constructing_reading]
+ // Read the file at the URI 'gs://some/inputData.txt' and return
+ // the lines as a PCollection<string>.
+ // Notice the scope as the first variable when calling
+ // the method as is needed when calling all transforms.
+ lines := textio.Read(scope, "gs://some/inputData.txt")
+
+ // [END pipelines_constructing_reading]
+
+ _ = []interface{}{pipeline, scope, lines}
+}
+
+// Create demonstrates using beam.CreateList.
+func Create() {
+ // [START model_pcollection]
+ lines := []string{
+ "To be, or not to be: that is the question: ",
+ "Whether 'tis nobler in the mind to suffer ",
+ "The slings and arrows of outrageous fortune, ",
+ "Or to take arms against a sea of troubles, ",
+ }
+
+ // Create the Pipeline object and root scope.
+ // It's conventional to use p as the Pipeline variable and
+ // s as the scope variable.
+ p, s := beam.NewPipelineWithRoot()
+
+ // Pass the slice to beam.CreateList, to create the pcollection.
+ // The scope variable s is used to add the CreateList transform
+ // to the pipeline.
+ linesPCol := beam.CreateList(s, lines)
+ // [END model_pcollection]
+ _ = []interface{}{p, linesPCol}
+}
+
+// PipelineOptions shows basic pipeline options using flags.
+func PipelineOptions() {
+ // [START pipeline_options_define_custom]
+ // Use standard Go flags to define pipeline options.
+ var (
+ input = flag.String("input", "", "")
+ output = flag.String("output", "", "")
+ )
+ // [END pipeline_options_define_custom]
+
+ _ = []interface{}{input, output}
+}
+
+// PipelineOptionsCustom shows slightly less basic pipeline options using
flags.
+func PipelineOptionsCustom() {
+ // [START pipeline_options_define_custom_with_help_and_default]
+ var (
+ input = flag.String("input", "gs://my-bucket/input", "Input
for the pipeline")
+ output = flag.String("output", "gs://my-bucket/output", "Output
for the pipeline")
+ )
+ // [END pipeline_options_define_custom_with_help_and_default]
+
+ _ = []interface{}{input, output}
+}
diff --git a/sdks/go/examples/snippets/04transforms.go
b/sdks/go/examples/snippets/04transforms.go
new file mode 100644
index 0000000..22eb3eb
--- /dev/null
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package snippets
+
+import (
+ "fmt"
+ "math"
+ "reflect"
+ "sort"
+ "strings"
+
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
+)
+
+// [START model_pardo_pardo]
+
+// ComputeWordLengthFn is the DoFn to perform on each element in the input
PCollection.
+type ComputeWordLengthFn struct{}
+
+// ProcessElement is the method to execute for each element.
+func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) {
+ emit(len(word))
+}
+
+// DoFns must be registered with beam.
+func init() {
+ beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil)))
+}
+
+// [END model_pardo_pardo]
+
+// applyWordLen applies ComputeWordLengthFn to words, which must be
+// a PCollection<string>
+func applyWordLen(s beam.Scope, words beam.PCollection) beam.PCollection {
+ // [START model_pardo_apply]
+ wordLengths := beam.ParDo(s, &ComputeWordLengthFn{}, words)
+ // [END model_pardo_apply]
+ return wordLengths
+}
+
+func applyWordLenAnon(s beam.Scope, words beam.PCollection) beam.PCollection {
+ // [START model_pardo_apply_anon]
+ // Apply an anonymous function as a DoFn PCollection words.
+ // Save the result as the PCollection wordLengths.
+ wordLengths := beam.ParDo(s, func(word string) int {
+ return len(word)
+ }, words)
+ // [END model_pardo_apply_anon]
+ return wordLengths
+}
+
+// [START cogroupbykey_input_helpers]
+
+type stringPair struct {
+ K, V string
+}
+
+func splitStringPair(e stringPair) (string, string) {
+ return e.K, e.V
+}
+
+func init() {
+ // Register element types and DoFns.
+ beam.RegisterType(reflect.TypeOf((*stringPair)(nil)).Elem())
+ beam.RegisterFunction(splitStringPair)
+}
+
+// CreateAndSplit is a helper function that creates
+func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
+ initial := beam.CreateList(s, input)
+ return beam.ParDo(s, splitStringPair, initial)
+}
+
+// [END cogroupbykey_input_helpers]
+
+// [START cogroupbykey_output_helpers]
+
+func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool)
string {
+ var s string
+ var emails, phones []string
+ for emailIter(&s) {
+ emails = append(emails, s)
+ }
+ for phoneIter(&s) {
+ phones = append(phones, s)
+ }
+ // Values have no guaranteed order, sort for deterministic output.
+ sort.Strings(emails)
+ sort.Strings(phones)
+ return fmt.Sprintf("%s; %s; %s", key, formatStringIter(emails),
formatStringIter(phones))
+}
+
+func init() {
+ beam.RegisterFunction(formatCoGBKResults)
+}
+
+// [END cogroupbykey_output_helpers]
+
+func formatStringIter(vs []string) string {
+ var b strings.Builder
+ b.WriteRune('[')
+ for i, v := range vs {
+ b.WriteRune('\'')
+ b.WriteString(v)
+ b.WriteRune('\'')
+ if i < len(vs)-1 {
+ b.WriteString(", ")
+ }
+ }
+ b.WriteRune(']')
+ return b.String()
+}
+
+func coGBKExample(s beam.Scope) beam.PCollection {
+ // [START cogroupbykey_inputs]
+ var emailSlice = []stringPair{
+ {"amy", "[email protected]"},
+ {"carl", "[email protected]"},
+ {"julia", "[email protected]"},
+ {"carl", "[email protected]"},
+ }
+
+ var phoneSlice = []stringPair{
+ {"amy", "111-222-3333"},
+ {"james", "222-333-4444"},
+ {"amy", "333-444-5555"},
+ {"carl", "444-555-6666"},
+ }
+ emails := CreateAndSplit(s.Scope("CreateEmails"), emailSlice)
+ phones := CreateAndSplit(s.Scope("CreatePhones"), phoneSlice)
+ // [END cogroupbykey_inputs]
+
+ // [START cogroupbykey_outputs]
+ results := beam.CoGroupByKey(s, emails, phones)
+
+ contactLines := beam.ParDo(s, formatCoGBKResults, results)
+ // [END cogroupbykey_outputs]
+
+ return contactLines
+}
+
+// [START combine_simple_sum]
+func sumInts(a, v int) int {
+ return a + v
+}
+
+func init() {
+ beam.RegisterFunction(sumInts)
+}
+
+func globallySumInts(s beam.Scope, ints beam.PCollection) beam.PCollection {
+ return beam.Combine(s, sumInts, ints)
+}
+
+type boundedSum struct {
+ Bound int
+}
+
+func (fn *boundedSum) MergeAccumulators(a, v int) int {
+ sum := a + v
+ if fn.Bound > 0 && sum > fn.Bound {
+ return fn.Bound
+ }
+ return sum
+}
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*boundedSum)(nil)))
+}
+
+func globallyBoundedSumInts(s beam.Scope, bound int, ints beam.PCollection)
beam.PCollection {
+ return beam.Combine(s, &boundedSum{Bound: bound}, ints)
+}
+
+// [END combine_simple_sum]
+
+// [START combine_custom_average]
+
+type averageFn struct{}
+
+type averageAccum struct {
+ Count, Sum int
+}
+
+func (fn *averageFn) CreateAccumulator() averageAccum {
+ return averageAccum{0, 0}
+}
+
+func (fn *averageFn) AddInput(a averageAccum, v int) averageAccum {
+ return averageAccum{Count: a.Count + 1, Sum: a.Sum + v}
+}
+
+func (fn *averageFn) MergeAccumulators(a, v averageAccum) averageAccum {
+ return averageAccum{Count: a.Count + v.Count, Sum: a.Sum + v.Sum}
+}
+
+func (fn *averageFn) ExtractOutput(a averageAccum) float64 {
+ if a.Count == 0 {
+ return math.NaN()
+ }
+ return float64(a.Sum) / float64(a.Count)
+}
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*averageFn)(nil)))
+}
+
+// [END combine_custom_average]
+
+func globallyAverage(s beam.Scope, ints beam.PCollection) beam.PCollection {
+ // [START combine_global_average]
+ average := beam.Combine(s, &averageFn{}, ints)
+ // [END combine_global_average]
+ return average
+}
+
+func globallyAverageWithDefault(s beam.Scope, ints beam.PCollection)
beam.PCollection {
+ // [START combine_global_with_default]
+ // Setting combine defaults has requires no helper function in the Go
SDK.
+ average := beam.Combine(s, &averageFn{}, ints)
+
+ // To add a default value:
+ defaultValue := beam.Create(s, float64(0))
+ avgWithDefault := beam.ParDo(s, func(d float64, iter func(*float64)
bool) float64 {
+ var c float64
+ if iter(&c) {
+ // Side input has a value, so return it.
+ return c
+ }
+ // Otherwise, return the default
+ return d
+ }, defaultValue, beam.SideInput{Input: average})
+ // [END combine_global_with_default]
+ return avgWithDefault
+}
+
+func perKeyAverage(s beam.Scope, playerAccuracies beam.PCollection)
beam.PCollection {
+ // [START combine_per_key]
+ avgAccuracyPerPlayer := stats.MeanPerKey(s, playerAccuracies)
+ // [END combine_per_key]
+ return avgAccuracyPerPlayer
+}
+
+func applyFlatten(s beam.Scope, pcol1, pcol2, pcol3 beam.PCollection)
beam.PCollection {
+ // [START model_multiple_pcollections_flatten]
+ merged := beam.Flatten(s, pcol1, pcol2, pcol3)
+ // [END model_multiple_pcollections_flatten]
+ return merged
+}
+
+type Student struct {
+ Percentile int
+}
+
+// [START model_multiple_pcollections_partition_fn]
+
+func decileFn(student Student) int {
+ return int(float64(student.Percentile) / float64(10))
+}
+
+func init() {
+ beam.RegisterFunction(decileFn)
+}
+
+// [END model_multiple_pcollections_partition_fn]
+
+func applyPartition(s beam.Scope, students beam.PCollection) beam.PCollection {
+ // [START model_multiple_pcollections_partition]
+ // Partition returns a slice of PCollections
+ studentsByPercentile := beam.Partition(s, 10, decileFn, students)
+ // Each partition can be extracted by indexing into the slice.
+ fortiethPercentile := studentsByPercentile[4]
+ // [END model_multiple_pcollections_partition]
+ return fortiethPercentile
+}
diff --git a/sdks/go/examples/snippets/04transforms_test.go
b/sdks/go/examples/snippets/04transforms_test.go
new file mode 100644
index 0000000..ebfa47b
--- /dev/null
+++ b/sdks/go/examples/snippets/04transforms_test.go
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package snippets
+
+import (
+ "testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+ "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+)
+
+func TestMain(m *testing.M) {
+ ptest.Main(m)
+}
+
+func TestParDo(t *testing.T) {
+ p, s, input := ptest.CreateList([]string{"one", "two", "three"})
+ lens := applyWordLen(s, input)
+ passert.Equals(s, lens, 3, 3, 5)
+ ptest.RunAndValidate(t, p)
+}
+
+func TestParDo_anon(t *testing.T) {
+ p, s, input := ptest.CreateList([]string{"one", "two", "three"})
+ lens := applyWordLenAnon(s, input)
+ passert.Equals(s, lens, 3, 3, 5)
+ ptest.RunAndValidate(t, p)
+}
+
+func TestFormatCoGBKResults(t *testing.T) {
+ // [START cogroupbykey_outputs]
+ // Synthetic example results of a cogbk.
+ results := []struct {
+ Key string
+ Emails, Phones []string
+ }{
+ {
+ Key: "amy",
+ Emails: []string{"[email protected]"},
+ Phones: []string{"111-222-3333", "333-444-5555"},
+ }, {
+ Key: "carl",
+ Emails: []string{"[email protected]", "[email protected]"},
+ Phones: []string{"444-555-6666"},
+ }, {
+ Key: "james",
+ Emails: []string{},
+ Phones: []string{"222-333-4444"},
+ }, {
+ Key: "julia",
+ Emails: []string{"[email protected]"},
+ Phones: []string{},
+ },
+ }
+ // [END cogroupbykey_outputs]
+
+ // [START cogroupbykey_formatted_outputs]
+ formattedResults := []string{
+ "amy; ['[email protected]']; ['111-222-3333', '333-444-5555']",
+ "carl; ['[email protected]', '[email protected]'];
['444-555-6666']",
+ "james; []; ['222-333-4444']",
+ "julia; ['[email protected]']; []",
+ }
+ // [END cogroupbykey_formatted_outputs]
+
+ // Helper to fake iterators for unit testing.
+ makeIter := func(vs []string) func(*string) bool {
+ i := 0
+ return func(v *string) bool {
+ if i >= len(vs) {
+ return false
+ }
+ *v = vs[i]
+ i++
+ return true
+ }
+ }
+
+ for i, result := range results {
+ got := formatCoGBKResults(result.Key, makeIter(result.Emails),
makeIter(result.Phones))
+ want := formattedResults[i]
+ if got != want {
+ t.Errorf("%d.%v, got %q, want %q", i, result.Key, got,
want)
+ }
+ }
+
+ p, s := beam.NewPipelineWithRoot()
+ formattedCoGBK := coGBKExample(s)
+ passert.Equals(s, formattedCoGBK, formattedResults[0],
formattedResults[1], formattedResults[2], formattedResults[3])
+ ptest.RunAndValidate(t, p)
+}
+
+func TestCombine(t *testing.T) {
+ p, s, input := ptest.CreateList([]int{1, 2, 3})
+ avg := globallyAverage(s, input)
+ passert.Equals(s, avg, float64(2.0))
+ ptest.RunAndValidate(t, p)
+}
+
+func TestCombineWithDefault_useDefault(t *testing.T) {
+ p, s, input := ptest.CreateList([]int{})
+ avg := globallyAverageWithDefault(s, input)
+ passert.Equals(s, avg, float64(0))
+ ptest.RunAndValidate(t, p)
+}
+
+func TestCombineWithDefault_useAverage(t *testing.T) {
+ p, s, input := ptest.CreateList([]int{1, 2, 3})
+ avg := globallyAverageWithDefault(s, input)
+ passert.Equals(s, avg, float64(2.0))
+ ptest.RunAndValidate(t, p)
+}
+
+func TestCombine_sum(t *testing.T) {
+ p, s, input := ptest.CreateList([]int{1, 2, 3})
+ avg := globallySumInts(s, input)
+ passert.Equals(s, avg, int(6))
+ ptest.RunAndValidate(t, p)
+}
+
+func TestCombine_sum_bounded(t *testing.T) {
+ p, s, input := ptest.CreateList([]int{1, 2, 3})
+ bound := int(4)
+ avg := globallyBoundedSumInts(s, bound, input)
+ passert.Equals(s, avg, bound)
+ ptest.RunAndValidate(t, p)
+}
+
+type player struct {
+ Name string
+ Accuracy float64
+}
+
+func splitPlayer(e player) (string, float64) {
+ return e.Name, e.Accuracy
+}
+
+func mergePlayer(k string, v float64) player {
+ return player{Name: k, Accuracy: v}
+}
+
+func init() {
+ beam.RegisterFunction(splitPlayer)
+ beam.RegisterFunction(mergePlayer)
+}
+
+func TestCombinePerKey(t *testing.T) {
+ p, s, input := ptest.CreateList([]player{{"fred", 0.2}, {"velma", 0.4},
{"fred", 0.5}, {"velma", 1.0}, {"shaggy", 0.1}})
+ kvs := beam.ParDo(s, splitPlayer, input)
+ avg := perKeyAverage(s, kvs)
+ results := beam.ParDo(s, mergePlayer, avg)
+ passert.Equals(s, results, player{"fred", 0.35}, player{"velma", 0.7},
player{"shaggy", 0.1})
+ ptest.RunAndValidate(t, p)
+}
+
+func TestFlatten(t *testing.T) {
+ p, s := beam.NewPipelineWithRoot()
+ a := beam.CreateList(s, []int{1, 2, 3})
+ b := beam.CreateList(s, []int{5, 7, 9})
+ c := beam.CreateList(s, []int{4, 6, 8})
+ merged := applyFlatten(s, a, b, c)
+ passert.Equals(s, merged, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+ ptest.RunAndValidate(t, p)
+}
+
+func TestPartition(t *testing.T) {
+ p, s, input := ptest.CreateList([]Student{{42}, {57}, {23}, {89}, {99},
{5}})
+ avg := applyPartition(s, input)
+ passert.Equals(s, avg, Student{42})
+ ptest.RunAndValidate(t, p)
+}
diff --git a/sdks/go/examples/snippets/doc.go b/sdks/go/examples/snippets/doc.go
new file mode 100644
index 0000000..95900b5
--- /dev/null
+++ b/sdks/go/examples/snippets/doc.go
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package snippets contains code used in the Beam Programming Guide
+// as examples for the Apache Beam Go SDK. These snippets are compiled
+// and their tests run to ensure correctness. However, due to their
+// piecemeal pedagogical use, they may not be the best example of
+// production code.
+//
+// The Beam Programming Guide can be found at
https://beam.apache.org/documentation/programming-guide/.
+package snippets
diff --git a/website/www/site/content/en/documentation/programming-guide.md
b/website/www/site/content/en/documentation/programming-guide.md
index 90a568c..bbf2d78 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -28,12 +28,17 @@ programmatically building your Beam pipeline. As the
programming guide is filled
out, the text will include code samples in multiple languages to help
illustrate
how to implement Beam concepts in your pipelines.
-{{< language-switcher java py >}}
+{{< language-switcher java py go >}}
{{< paragraph class="language-py" >}}
The Python SDK supports Python 3.6, 3.7, and 3.8. Beam 2.24.0 was the last
Python SDK release to support Python 2 and 3.5.
{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+The Go SDK is presently considered experimental, but removing that designation
is
+[under
discussion](https://lists.apache.org/thread.html/re3589c797d2dc6c5a9c1015683b4a8b48a097bacfa6f12bb1e48aa45%40%3Cdev.beam.apache.org%3E).
+{{< /paragraph >}}
+
## 1. Overview {#overview}
To use Beam, you need to first create a driver program using the classes in one
@@ -69,6 +74,15 @@ include:
input, performs a processing function that you provide on the elements of
that
`PCollection`, and produces zero or more output `PCollection` objects.
+<span class="language-go">
+
+* `Scope`: The Go SDK has an explicit scope variable used to build a
`Pipeline`.
+ A `Pipeline` can return it's root scope with the `Root()` method. The scope
+ variable is passed to `PTransform` functions to place them in the `Pipeline`
+ that owns the `Scope`.
+
+</span>
+
* I/O transforms: Beam comes with a number of "IOs" - library `PTransform`s
that
read or write data to various external storage systems.
@@ -103,6 +117,7 @@ The `Pipeline` abstraction encapsulates all the data and
steps in your data
processing task. Your Beam driver program typically starts by constructing a
<span
class="language-java">[Pipeline](https://beam.apache.org/releases/javadoc/{{<
param release_latest >}}/index.html?org/apache/beam/sdk/Pipeline.html)</span>
<span
class="language-py">[Pipeline](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py)</span>
+<span
class="language-go">[Pipeline](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/pipeline.go#L62)</span>
object, and then using that object as the basis for creating the pipeline's
data
sets as `PCollection`s and its operations as `Transform`s.
@@ -126,8 +141,7 @@ Pipeline p = Pipeline.create(options);
{{< /highlight >}}
{{< highlight go >}}
-// In order to start creating the pipeline for execution, a Pipeline object
and a Scope object are needed.
-p, s := beam.NewPipelineWithRoot()
+{{< code_sample "sdks/go/examples/snippets/01_03intro.go"
pipelines_constructing_creating >}}
{{< /highlight >}}
### 2.1. Configuring pipeline options {#configuring-pipeline-options}
@@ -138,18 +152,29 @@ configuration required by the chosen runner. Your
pipeline options will
potentially include information such as your project ID or a location for
storing files.
+{{< paragraph class="language-java" >}}
When you run the pipeline on a runner of your choice, a copy of the
PipelineOptions will be available to your code. For example, if you add a
PipelineOptions parameter
to a DoFn's `@ProcessElement` method, it will be populated by the system.
+{{< /paragraph >}}
#### 2.1.1. Setting PipelineOptions from command-line arguments
{#pipeline-options-cli}
+{{< paragraph class="language-java language-py" >}}
While you can configure your pipeline by creating a `PipelineOptions` object
and
setting the fields directly, the Beam SDKs include a command-line parser that
you can use to set fields in `PipelineOptions` using command-line arguments.
+{{< /paragraph >}}
+{{< paragraph class="language-java language-py" >}}
To read options from the command-line, construct your `PipelineOptions` object
as demonstrated in the following example code:
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+Use Go flags to parse command line arguments to configure your pipeline. Flags
must be parsed
+before `beam.Init()` is called.
+{{< /paragraph >}}
{{< highlight java >}}
PipelineOptions options =
@@ -161,8 +186,7 @@ PipelineOptions options =
{{< /highlight >}}
{{< highlight go >}}
-// If beamx or Go flags are used, flags must be parsed first.
-flag.Parse()
+{{< code_sample "sdks/go/examples/snippets/01_03intro.go" pipeline_options >}}
{{< /highlight >}}
This interprets command-line arguments that follow the format:
@@ -171,11 +195,21 @@ This interprets command-line arguments that follow the
format:
--<option>=<value>
```
-> **Note:** Appending the method `.withValidation` will check for required
+<span class="language-java">
+
+> Appending the method `.withValidation` will check for required
> command-line arguments and validate argument values.
+</span>
+
+{{< paragraph class="language-java language-py" >}}
Building your `PipelineOptions` this way lets you specify any of the options as
a command-line argument.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+Defining flag variables this way lets you specify any of the options as a
command-line argument.
+{{< /paragraph >}}
> **Note:** The [WordCount example pipeline](/get-started/wordcount-example)
> demonstrates how to set pipeline options at runtime by using command-line
@@ -203,10 +237,7 @@ public interface MyOptions extends PipelineOptions {
{{< /highlight >}}
{{< highlight go >}}
-var (
- input = flag.String("input", "", "")
- output = flag.String("output", "", "")
-)
+{{< code_sample "sdks/go/examples/snippets/01_03intro.go"
pipeline_options_define_custom >}}
{{< /highlight >}}
You can also specify a description, which appears when a user passes `--help`
as
@@ -233,10 +264,7 @@ public interface MyOptions extends PipelineOptions {
{{< /highlight >}}
{{< highlight go >}}
-var (
- input = flag.String("input", "gs://my-bucket/input", "Input for the
pipeline")
- output = flag.String("output", "gs://my-bucket/output", "Output for the
pipeline")
-)
+{{< code_sample "sdks/go/examples/snippets/01_03intro.go"
pipeline_options_define_custom_with_help_and_default >}}
{{< /highlight >}}
{{< paragraph class="language-java" >}}
@@ -265,7 +293,9 @@ Now your pipeline can accept `--input=value` and
`--output=value` as command-lin
## 3. PCollections {#pcollections}
The <span
class="language-java">[PCollection](https://beam.apache.org/releases/javadoc/{{<
param release_latest
>}}/index.html?org/apache/beam/sdk/values/PCollection.html)</span>
-<span class="language-py">`PCollection`</span> abstraction represents a
+<span class="language-py">`PCollection`</span>
+<span
class="language-go">[PCollection](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/pcollection.go#L39)</span>
+abstraction represents a
potentially distributed, multi-element data set. You can think of a
`PCollection` as "pipeline" data; Beam transforms use `PCollection` objects as
inputs and outputs. As such, if you want to work with data in your pipeline, it
@@ -295,11 +325,13 @@ represent the data records in that source.
Each data source adapter has a `Read` transform; to read, you must apply that
transform to the `Pipeline` object itself.
<span class="language-java">`TextIO.Read`</span>
-<span class="language-py">`io.TextFileSource`</span>, for example, reads from
an
+<span class="language-py">`io.TextFileSource`</span>
+<span class="language-go">`textio.Read`</span>, for example, reads from an
external text file and returns a `PCollection` whose elements are of type
`String`, each `String` represents one line from the text file. Here's how you
would apply <span class="language-java">`TextIO.Read`</span>
-<span class="language-py">`io.TextFileSource`</span> to your `Pipeline` to
create
+<span class="language-py">`io.TextFileSource`</span>
+<span class="language-go">`textio.Read`</span> to your `Pipeline` to create
a `PCollection`:
{{< highlight java >}}
@@ -320,7 +352,7 @@ public static void main(String[] args) {
{{< /highlight >}}
{{< highlight go >}}
-lines := textio.Read(s, "gs://some/inputData.txt")
+{{< code_sample "sdks/go/examples/snippets/01_03intro.go"
pipelines_constructing_reading >}}
{{< /highlight >}}
See the [section on I/O](#pipeline-io) to learn more about how to read from the
@@ -346,8 +378,13 @@ To create a `PCollection` from an in-memory `list`, you
use the Beam-provided
itself.
{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+To create a `PCollection` from an in-memory `slice`, you use the Beam-provided
+`beam.CreateList` transform. Pass the pipeline `scope`, and the `slice` to
this transform.
+{{< /paragraph >}}
+
The following example code shows how to create a `PCollection` from an
in-memory
-<span class="language-java">`List`</span><span
class="language-py">`list`</span>:
+<span class="language-java">`List`</span><span
class="language-py">`list`</span><span class="language-go">`slice`</span>:
{{< highlight java >}}
public static void main(String[] args) {
@@ -372,12 +409,16 @@ public static void main(String[] args) {
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py"
model_pcollection >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/01_03intro.go" model_pcollection >}}
+{{< /highlight >}}
+
### 3.2. PCollection characteristics {#pcollection-characteristics}
A `PCollection` is owned by the specific `Pipeline` object for which it is
-created; multiple pipelines cannot share a `PCollection`. <span
language="java">
-In some respects, a `PCollection` functions like a `Collection` class. However,
-a `PCollection` can differ in a few key ways:</span>
+created; multiple pipelines cannot share a `PCollection`.
+<span class="language-java">In some respects, a `PCollection` functions like
+a `Collection` class. However, a `PCollection` can differ in a few key
ways:</span>
#### 3.2.1. Element type {#element-type}
@@ -388,6 +429,12 @@ around to distributed workers). The Beam SDKs provide a
data encoding mechanism
that includes built-in encoding for commonly-used types as well as support for
specifying custom encodings as needed.
+{{< paragraph class="language-go" >}}
+Custom struct types should be registered with beam using `beam.RegisterType`.
+Among other things, this allows the Go SDK to infer an encoding from their
+exported fields. Unexported fields in struct types are ignored.
+{{< /paragraph >}}
+
#### 3.2.2. Element schema {#element-schema}
In many cases, the element type in a `PCollection` has a structure that can
introspected.
@@ -499,16 +546,34 @@ the transform itself as an argument, and the operation
returns the output
[Output PCollection] = [Input PCollection] | [Transform]
{{< /highlight >}}
+{{< highlight go >}}
+[Output PCollection] := beam.ParDo(scope, [Transform], [Input PCollection])
+{{< /highlight >}}
+
+{{< paragraph class="language-java language-py" >}}
Because Beam uses a generic `apply` method for `PCollection`, you can both
chain
transforms sequentially and also apply transforms that contain other transforms
nested within (called [composite transforms](#composite-transforms) in the Beam
SDKs).
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+It's recommended to create a new variable for each new `PCollection` to
+sequentially transform input data. `Scope`s can be used to create functions
+that contain other transforms
+(called [composite transforms](#composite-transforms) in the Beam SDKs).
+{{< /paragraph >}}
How you apply your pipeline's transforms determines the structure of your
pipeline. The best way to think of your pipeline is as a directed acyclic
graph,
where `PTransform` nodes are subroutines that accept `PCollection` nodes as
-inputs and emit `PCollection` nodes as outputs. For example, you can chain
-together transforms to create a pipeline that successively modifies input data:
+inputs and emit `PCollection` nodes as outputs.
+<span class="language-java language-py">
+For example, you can chain together transforms to create a pipeline that
successively modifies input data:
+</span>
+<span class="language-go">
+For example, you can successively call transforms on PCollections to modify
the input data:
+</span>
{{< highlight java >}}
[Final Output PCollection] = [Initial Input PCollection].apply([First
Transform])
@@ -522,6 +587,12 @@ together transforms to create a pipeline that successively
modifies input data:
| [Third Transform])
{{< /highlight >}}
+{{< highlight go >}}
+[Second PCollection] := beam.ParDo(scope, [First Transform], [Initial Input
PCollection])
+[Third PCollection] := beam.ParDo(scope, [Second Transform], [Second
PCollection])
+[Final Output PCollection] := beam.ParDo(scope, [Third Transform], [Third
PCollection])
+{{< /highlight >}}
+
The graph of this pipeline looks like the following:
![This linear pipeline starts with one input collection, sequentially applies
@@ -546,6 +617,12 @@ a branching pipeline, like so:
[PCollection of 'B' names] = [PCollection of database table rows] | [Transform
B]
{{< /highlight >}}
+{{< highlight go >}}
+[PCollection of database table rows] = beam.ParDo(scope, [Read Transform],
[Database Table Reader])
+[PCollection of 'A' names] = beam.ParDo(scope, [Transform A], [PCollection of
database table rows])
+[PCollection of 'B' names] = beam.ParDo(scope, [Transform B], [PCollection of
database table rows])
+{{< /highlight >}}
+
The graph of this branching pipeline looks like the following:
.
+</span>
+
##### 4.2.1.1. Applying ParDo {#applying-pardo}
+{{< paragraph class="language-java language-py" >}}
Like all Beam transforms, you apply `ParDo` by calling the `apply` method on
the
input `PCollection` and passing `ParDo` as an argument, as shown in the
following example code:
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+`beam.ParDo` applies the passed in `DoFn` argument to the input `PCollection`,
+as shown in the following example code:
+{{< /paragraph >}}
{{< highlight java >}}
// The input PCollection of Strings.
@@ -640,20 +728,18 @@ words = ...
{{< /highlight >}}
{{< highlight go >}}
-// words is the input PCollection of strings
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_pardo_pardo
>}}
+// words is an input PCollection of strings
var words beam.PCollection = ...
-
-func computeWordLengthFn(word string) int {
- return len(word)
-}
-
-wordLengths := beam.ParDo(s, computeWordLengthFn, words)
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_pardo_apply
>}}
{{< /highlight >}}
-In the example, our input `PCollection` contains `String` values. We apply a
+In the example, our input `PCollection` contains <span class="language-java
language-py">`String`</span>
+<span class="language-go">`string`</span> values. We apply a
`ParDo` transform that specifies a function (`ComputeWordLengthFn`) to compute
the length of each string, and outputs the result to a new `PCollection` of
-`Integer` values that stores the length of each word.
+<span class="language-java language-py">`Integer`</span>
+<span class="language-go">`int`</span> values that stores the length of each
word.
##### 4.2.1.2. Creating a DoFn
@@ -675,10 +761,35 @@ the types of the input and output elements. If your
`DoFn` processes incoming
look like this:
{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+A `DoFn` processes one element at a time from the input `PCollection`. When you
+create a `DoFn` struct, you'll need to provide type parameters that match
+the types of the input and output elements in a ProcessElement method.
+If your `DoFn` processes incoming `string` elements and produces `int` elements
+for the output collection (like our previous example, `ComputeWordLengthFn`),
your dofn could
+look like this:
+{{< /paragraph >}}
+
{{< highlight java >}}
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
{{< /highlight >}}
+{{< highlight go >}}
+// ComputeWordLengthFn is a DoFn that computes the word length of string
elements.
+type ComputeWordLengthFn struct{}
+
+// ProcessElement computes the length of word and emits the result.
+// When creating structs as a DoFn, the ProcessElement method performs the
+// work of this step in the pipeline.
+func (fn *ComputeWordLengthFn) ProcessElement(word string, emit func(int)) {
+ ...
+}
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*ComputeWordLengthFn)(nil)))
+}
+{{< /highlight >}}
+
{{< paragraph class="language-java" >}}
Inside your `DoFn` subclass, you'll write a method annotated with
`@ProcessElement` where you provide the actual processing logic. You don't need
@@ -702,6 +813,16 @@ elements with `yield` statements. You can also use a
`return` statement
with an iterable, like a list or a generator.
{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+For your `DoFn` type, you'll write a method `ProcessElement` where you provide
+the actual processing logic. You don't need to manually extract the elements
+from the input collection; the Beam SDKs handle that for you. Your
`ProcessElement` method
+should accept a parameter `element`, which is the input element. In order to
output elements,
+the method can also take a function parameter, which can be called to emit
elements.
+The parameter types must match the input and output types of your `DoFn`
+ or the framework will raise an error.
+{{< /paragraph >}}
+
{{< highlight java >}}
static class ComputeWordLengthFn extends DoFn<String, Integer> {
@ProcessElement
@@ -716,11 +837,45 @@ static class ComputeWordLengthFn extends DoFn<String,
Integer> {
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
model_pardo_pardo >}}
{{< /highlight >}}
-{{< paragraph class="language-java" >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" model_pardo_pardo
>}}
+{{< /highlight >}}
+
+{{< paragraph class="language-go" >}}
+Simple DoFns can also be written as functions.
+{{< /paragraph >}}
+
+{{< highlight go >}}
+func ComputeWordLengthFn(word string, emit func(int)) { ... }
+
+func init() {
+ beam.RegisterFunction(ComputeWordLengthFn)
+}
+{{< /highlight >}}
+
+<span class="language-go" >
+
+> **Note:** Whether using a structural `DoFn` type or a functional `DoFn`,
they should be registered with
+> beam in an `init` block. Otherwise they may not execute on distributed
runners.
+
+</span>
+
+<span class="language-java">
+
> **Note:** If the elements in your input `PCollection` are key/value pairs,
> you
> can access the key or value by using `element.getKey()` or
> `element.getValue()`, respectively.
-{{< /paragraph >}}
+
+</span>
+
+<span class="language-go">
+
+> **Note:** If the elements in your input `PCollection` are key/value pairs,
your
+> process element method must have two parameters, for each of the key and
value,
+> respectively. Similarly, key/value pairs are also output as separate
+> parameters to a single `emitter function`.
+
+</span>
A given `DoFn` instance generally gets invoked one or more times to process
some
arbitrary bundle of elements. However, Beam doesn't guarantee an exact number
of
@@ -734,32 +889,48 @@ requirements to ensure that Beam and the processing
back-end can safely
serialize and cache the values in your pipeline. Your method should meet the
following requirements:
-{{< paragraph class="language-java" >}}
+<span class="language-java">
+
* You should not in any way modify an element returned by
the `@Element` annotation or `ProcessContext.sideInput()` (the incoming
elements from the input collection).
* Once you output a value using `OutputReceiver.output()` you should not modify
that value in any way.
-{{< /paragraph >}}
-{{< paragraph class="language-py" >}}
+</span>
+
+<span class="language-py">
+
* You should not in any way modify the `element` argument provided to the
`process` method, or any side inputs.
* Once you output a value using `yield` or `return`, you should not modify
that value in any way.
-{{< /paragraph >}}
+
+</span>
+
+<span class="language-go">
+
+* You should not in any way modify the parameters provided to the
+ `ProcessElement` method, or any side inputs.
+* Once you output a value using an `emitter function`, you should not modify
+ that value in any way.
+
+</span>
+
##### 4.2.1.3. Lightweight DoFns and other abstractions {#lightweight-dofns}
If your function is relatively straightforward, you can simplify your use of
`ParDo` by providing a lightweight `DoFn` in-line, as
<span class="language-java">an anonymous inner class instance</span>
-<span class="language-py">a lambda function</span>.
+<span class="language-py">a lambda function</span>
+<span class="language-go">an anonymous function</span>.
Here's the previous example, `ParDo` with `ComputeLengthWordsFn`, with the
`DoFn` specified as
<span class="language-java">an anonymous inner class instance</span>
-<span class="language-py">a lambda function</span>:
+<span class="language-py">a lambda function</span>
+<span class="language-go">an anonymous function</span>:
{{< highlight java >}}
// The input PCollection.
@@ -790,20 +961,21 @@ words = ...
// words is the input PCollection of strings
var words beam.PCollection = ...
-lengths := beam.ParDo(s, func (word string) int {
- return len(word)
+lengths := beam.ParDo(s, func (word string, emit func(int)) {
+ emit(len(word))
}, words)
{{< /highlight >}}
If your `ParDo` performs a one-to-one mapping of input elements to output
elements--that is, for each input element, it applies a function that produces
-*exactly one* output element, you can use the higher-level
+*exactly one* output element, <span class="language-go">you can return that
+element directly.</span><span class="language-java language-py">you can use
the higher-level
<span class="language-java">`MapElements`</span><span
class="language-py">`Map`</span>
-transform. <span class="language-java">`MapElements` can accept an anonymous
+transform.</span><span class="language-java">`MapElements` can accept an
anonymous
Java 8 lambda function for additional brevity.</span>
Here's the previous example using <span
class="language-java">`MapElements`</span>
-<span class="language-py">`Map`</span>:
+<span class="language-py">`Map`</span><span class="language-go">a direct
return</span>:
{{< highlight java >}}
// The input PCollection.
@@ -825,10 +997,28 @@ words = ...
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
model_pardo_using_map >}}
{{< /highlight >}}
-{{< paragraph class="language-java" >}}
+{{< highlight go >}}
+// words is the input PCollection of strings
+var words beam.PCollection = ...
+
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
model_pardo_apply_anon >}}
+{{< /highlight >}}
+
+<span class="language-java" >
+
> **Note:** You can use Java 8 lambda functions with several other Beam
> transforms, including `Filter`, `FlatMapElements`, and `Partition`.
-{{< /paragraph >}}
+
+</span>
+
+<span class="language-go" >
+
+> **Note:** Anonymous function DoFns may not work on distributed runners.
+> It's recommended to use named functions and register them with
`beam.RegisterFunction` in
+> an `init()` block.
+
+</span>
+
##### 4.2.1.4. DoFn lifecycle {#dofn}
Here is a sequence diagram that shows the lifecycle of the DoFn during
@@ -944,7 +1134,7 @@ windowing](#setting-your-pcollections-windowing-function)
or an
[GroupByKey and unbounded PCollections](#groupbykey-and-unbounded-pcollections)
for more details.
-<span class="language-java">
+{{< paragraph class="language-java" >}}
In the Beam SDK for Java, `CoGroupByKey` accepts a tuple of keyed
`PCollection`s (`PCollection<KV<K, V>>`) as input. For type safety, the SDK
requires you to pass each `PCollection` as part of a `KeyedPCollectionTuple`.
@@ -955,27 +1145,37 @@ from all the input `PCollection`s by their common keys.
Each key (all of type
`K`) will have a different `CoGbkResult`, which is a map from `TupleTag<T>` to
`Iterable<T>`. You can access a specific collection in an `CoGbkResult` object
by using the `TupleTag` that you supplied with the initial collection.
-</span>
-<span class="language-py">
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
In the Beam SDK for Python, `CoGroupByKey` accepts a dictionary of keyed
`PCollection`s as input. As output, `CoGroupByKey` creates a single output
`PCollection` that contains one key/value tuple for each key in the input
`PCollection`s. Each key's value is a dictionary that maps each tag to an
iterable of the values under they key in the corresponding `PCollection`.
-</span>
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+In the Beam Go SDK, `CoGroupByKey` accepts an arbitrary number of
+`PCollection`s as input. As output, `CoGroupByKey` creates a single output
+`PCollection` that groups each key with value iterator functions for each
+input `PCollection`. The iterator functions map to input `PCollections` in
+ the same order they were provided to the `CoGroupByKey`.
+{{< /paragraph >}}
The following conceptual examples use two input collections to show the
mechanics of
`CoGroupByKey`.
-<span class="language-java">
+{{< paragraph class="language-java" >}}
The first set of data has a `TupleTag<String>` called `emailsTag` and contains
names
and email addresses. The second set of data has a `TupleTag<String>` called
`phonesTag` and contains names and phone numbers.
-</span>
-<span class="language-py">
+{{< /paragraph >}}
+
+{{< paragraph class="language-py language-go" >}}
The first set of data contains names and email addresses. The second set of
data contains names and phone numbers.
-</span>
+{{< /paragraph >}}
{{< highlight java >}}
{{< code_sample
"examples/java/src/test/java/org/apache/beam/examples/snippets/SnippetsTest.java"
CoGroupByKeyTupleInputs >}}
@@ -985,6 +1185,11 @@ data contains names and phone numbers.
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
model_group_by_key_cogroupbykey_tuple_inputs >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
cogroupbykey_input_helpers >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
cogroupbykey_inputs >}}
+{{< /highlight >}}
+
After `CoGroupByKey`, the resulting data contains all data associated with each
unique key from any of the input collections.
@@ -996,9 +1201,22 @@ unique key from any of the input collections.
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
model_group_by_key_cogroupbykey_tuple_outputs >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
cogroupbykey_outputs >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms_test.go"
cogroupbykey_outputs >}}
+{{< /highlight >}}
+
+{{< paragraph class="language-java language-py" >}}
The following code example joins the two `PCollection`s with `CoGroupByKey`,
followed by a `ParDo` to consume the result. Then, the code uses tags to look
up
and format data from each collection.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+The following code example joins the two `PCollection`s with `CoGroupByKey`,
+followed by a `ParDo` to consume the result. The ordering of the `DoFn`
iterator
+parameters maps to the ordering of the `CoGroupByKey` inputs.
+{{< /paragraph >}}
{{< highlight java >}}
{{< code_sample
"examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java"
CoGroupByKeyTuple >}}
@@ -1008,6 +1226,11 @@ and format data from each collection.
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py"
model_group_by_key_cogroupbykey_tuple >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
cogroupbykey_output_helpers >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms_test.go"
cogroupbykey_outputs >}}
+{{< /highlight >}}
+
The formatted data looks like this:
{{< highlight java >}}
@@ -1018,10 +1241,15 @@ The formatted data looks like this:
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
model_group_by_key_cogroupbykey_tuple_formatted_outputs >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms_test.go"
cogroupbykey_formatted_outputs >}}
+{{< /highlight >}}
+
#### 4.2.4. Combine {#combine}
<span
class="language-java">[`Combine`](https://beam.apache.org/releases/javadoc/{{<
param release_latest
>}}/index.html?org/apache/beam/sdk/transforms/Combine.html)</span>
<span
class="language-py">[`Combine`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py)</span>
+<span
class="language-go">[`Combine`](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/combine.go#L27)</span>
is a Beam transform for combining collections of elements or values in your
data. `Combine` has variants that work on entire `PCollection`s, and some that
combine the values for each key in `PCollection`s of key/value pairs.
@@ -1038,8 +1266,8 @@ and max.
Simple combine operations, such as sums, can usually be implemented as a simple
function. More complex combination operations might require you to create a
-subclass of `CombineFn` that has an accumulation type distinct from the
-input/output type.
+<span class="language-java language-py">subclass of</span> `CombineFn`
+that has an accumulation type distinct from the input/output type.
##### 4.2.4.1. Simple combinations using simple functions {#simple-combines}
@@ -1063,15 +1291,21 @@ public static class SumInts implements
SerializableFunction<Iterable<Integer>, I
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
combine_bounded_sum >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_simple_sum
>}}
+{{< /highlight >}}
+
##### 4.2.4.2. Advanced combinations using CombineFn {#advanced-combines}
-For more complex combine functions, you can define a subclass of `CombineFn`.
-You should use `CombineFn` if the combine function requires a more
sophisticated
+For more complex combine functions, you can define a
+<span class="language-java language-py">subclass of</span>`CombineFn`.
+You should use a `CombineFn` if the combine function requires a more
sophisticated
accumulator, must perform additional pre- or post-processing, might change the
output type, or takes the key into account.
A general combining operation consists of four operations. When you create a
-subclass of `CombineFn`, you must provide four operations by overriding the
+<span class="language-java language-py">subclass of</span>
+`CombineFn`, you must provide four operations by overriding the
corresponding methods:
1. **Create Accumulator** creates a new "local" accumulator. In the example
@@ -1136,6 +1370,17 @@ pc = ...
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
combine_custom_average_define >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
combine_custom_average >}}
+{{< /highlight >}}
+
+<span class="language-go">
+
+> **Note**: Only `MergeAccumulators` is a required method. The others will
have a default interpretation
+> based on the accumulator type.
+
+</span>
+
##### 4.2.4.3. Combining a PCollection into a single value
{#combining-pcollection}
Use the global combine to transform all of the elements in a given
`PCollection`
@@ -1160,6 +1405,10 @@ pc = ...
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
combine_custom_average_execute >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
combine_global_average >}}
+{{< /highlight >}}
+
##### 4.2.4.4. Combine and global windowing {#combine-global-windowing}
If your input `PCollection` uses the default global windowing, the default
@@ -1184,8 +1433,14 @@ pc = ...
sum = pc | beam.CombineGlobally(sum).without_defaults()
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
combine_global_with_default >}}
+{{< /highlight >}}
+
##### 4.2.4.5. Combine and non-global windowing {#combine-non-global-windowing}
+<span class="language-java language-py">
+
If your `PCollection` uses any non-global windowing function, Beam does not
provide the default behavior. You must specify one of the following options
when
applying `Combine`:
@@ -1198,6 +1453,13 @@ applying `Combine`:
the result of your pipeline's `Combine` is to be used as a side input later
in
the pipeline.
+</span>
+
+{{< paragraph class="language-go" >}}
+If your `PCollection` uses any non-global windowing function, the Beam Go SDK
+behaves the same way as with global windowing.
+{{< /paragraph >}}
+
##### 4.2.4.6. Combining values in a keyed PCollection
{#combining-values-in-a-keyed-pcollection}
After creating a keyed PCollection (for example, by using a `GroupByKey`
@@ -1220,7 +1482,8 @@ considering them individually), you can combine the
iterable of integers to
create a single, merged value to be paired with each key. This pattern of a
`GroupByKey` followed by merging the collection of values is equivalent to
Beam's Combine PerKey transform. The combine function you supply to Combine
-PerKey must be an associative reduction function or a subclass of `CombineFn`.
+PerKey must be an associative reduction function or a
+<span class="language-java language-py">subclass of</span> `CombineFn`.
{{< highlight java >}}
// PCollection is grouped by key and the Double values associated with each
key are combined into a Double.
@@ -1244,10 +1507,19 @@ player_accuracies = ...
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
combine_per_key >}}
{{< /highlight >}}
+{{< highlight go >}}
+// PCollection is grouped by key and the numeric values associated with each
key
+// are averaged into a float64.
+playerAccuracies := ... // PCollection<string,int>
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" combine_per_key >}}
+// avgAccuracyPerPlayer is a PCollection<string,float64>
+{{< /highlight >}}
+
#### 4.2.5. Flatten {#flatten}
<span
class="language-java">[`Flatten`](https://beam.apache.org/releases/javadoc/{{<
param release_latest
>}}/index.html?org/apache/beam/sdk/transforms/Flatten.html)</span>
<span
class="language-py">[`Flatten`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py)</span>
+<span
class="language-go">[`Flatten`](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/flatten.go)</span>
is a Beam transform for `PCollection` objects that store the same data type.
`Flatten` merges multiple `PCollection` objects into a single logical
`PCollection`.
@@ -1273,6 +1545,12 @@ PCollection<String> merged =
collections.apply(Flatten.<String>pCollections());
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py"
model_multiple_pcollections_flatten >}}
{{< /highlight >}}
+{{< highlight go >}}
+// Flatten accepts any number of PCollections of the same element type.
+// Returns a single PCollection that contains all of the elements in input
PCollections.
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
model_multiple_pcollections_flatten >}}
+{{< /highlight >}}
+
##### 4.2.5.1. Data encoding in merged collections
{#data-encoding-merged-collections}
By default, the coder for the output `PCollection` is the same as the coder for
@@ -1296,6 +1574,7 @@ pipeline is constructed.
<span
class="language-java">[`Partition`](https://beam.apache.org/releases/javadoc/{{<
param release_latest
>}}/index.html?org/apache/beam/sdk/transforms/Partition.html)</span>
<span
class="language-py">[`Partition`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py)</span>
+<span
class="language-go">[`Partition`](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/partition.go)</span>
is a Beam transform for `PCollection` objects that store the same data
type. `Partition` splits a single `PCollection` into a fixed number of smaller
collections.
@@ -1339,6 +1618,11 @@ students = ...
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py"
model_multiple_pcollections_partition_40th >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
model_multiple_pcollections_partition_fn >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
model_multiple_pcollections_partition >}}
+{{< /highlight >}}
+
### 4.3. Requirements for writing user code for Beam transforms
{#requirements-for-writing-user-code-for-beam-transforms}
When you build user code for a Beam transform, you should keep in mind the
@@ -1360,18 +1644,36 @@ In addition, it's recommended that you make your
function object **idempotent**.
Non-idempotent functions are supported by Beam, but require additional
thought to ensure correctness when there are external side effects.
-> **Note:** These requirements apply to subclasses of `DoFn` (a function object
+<span class="language-java language-py">
+
+> **Note:** These requirements apply to subclasses of `DoFn`</span> (a
function object
> used with the [ParDo](#pardo) transform), `CombineFn` (a function object used
> with the [Combine](#combine) transform), and `WindowFn` (a function object
> used with the [Window](#windowing) transform).
+</span>
+
+<span class="language-go">
+
+> **Note:** These requirements apply to `DoFn`s</span> (a function object
+> used with the [ParDo](#pardo) transform), `CombineFn`s (a function object
used
+> with the [Combine](#combine) transform), and `WindowFn`s (a function object
+> used with the [Window](#windowing) transform).
+
+</span>
+
#### 4.3.1. Serializability {#user-code-serializability}
Any function object you provide to a transform must be **fully serializable**.
This is because a copy of the function needs to be serialized and transmitted
to
-a remote worker in your processing cluster. The base classes for user code,
such
+a remote worker in your processing cluster.
+<span class="language-java language-py">The base classes for user code, such
as `DoFn`, `CombineFn`, and `WindowFn`, already implement `Serializable`;
-however, your subclass must not add any non-serializable members.
+however, your subclass must not add any non-serializable members.</span>
+<span class="language-go">Funcs are serializable as long as
+they are registered with `beam.RegisterFunction`, and are not
+closures. Structural `DoFn`s will have all exported fields serialized.
+Unexported fields are unable to be serialized, and will be silently
ignored.</span>
Some other serializability factors you should keep in mind are: