This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cfeac3b  [BEAM-12513] Finish adding Go to BPG 4. Transforms (#15142)
cfeac3b is described below

commit cfeac3bcae3c5c2e03f4a57c2c4258fe4fa9e22e
Author: Robert Burke <[email protected]>
AuthorDate: Thu Jul 8 13:29:36 2021 -0700

    [BEAM-12513] Finish adding Go to BPG 4. Transforms (#15142)
    
    * [BEAM-12513] Finish adding Go to BPG 4. Transforms
---
 sdks/go/examples/snippets/04transforms.go          | 141 ++++++++++++++
 sdks/go/examples/snippets/04transforms_test.go     |  40 ++++
 .../content/en/documentation/programming-guide.md  | 204 +++++++++++++++++++--
 3 files changed, 374 insertions(+), 11 deletions(-)

diff --git a/sdks/go/examples/snippets/04transforms.go 
b/sdks/go/examples/snippets/04transforms.go
index 22eb3eb..c926a1d 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -278,6 +278,7 @@ func init() {
 
 // [END model_multiple_pcollections_partition_fn]
 
+// applyPartition returns the 40th percentile of students.
 func applyPartition(s beam.Scope, students beam.PCollection) beam.PCollection {
        // [START model_multiple_pcollections_partition]
        // Partition returns a slice of PCollections
@@ -287,3 +288,143 @@ func applyPartition(s beam.Scope, students 
beam.PCollection) beam.PCollection {
        // [END model_multiple_pcollections_partition]
        return fortiethPercentile
 }
+
+// [START model_pardo_side_input_dofn]
+
+// filterWordsAbove is a DoFn that takes in a word,
+// and a singleton side input iterator as of a length cut off
+// and only emits words that are beneath that cut off.
+//
+// If the iterator has no elements, an error is returned, aborting processing.
+func filterWordsAbove(word string, lengthCutOffIter func(*float64) bool, 
emitAboveCutoff func(string)) error {
+       var cutOff float64
+       ok := lengthCutOffIter(&cutOff)
+       if !ok {
+               return fmt.Errorf("No length cutoff provided.")
+       }
+       if float64(len(word)) > cutOff {
+               emitAboveCutoff(word)
+       }
+       return nil
+}
+
+// filterWordsBelow is a DoFn that takes in a word,
+// and a singleton side input of a length cut off
+// and only emits words that are beneath that cut off.
+//
+// If the side input isn't a singleton, a runtime panic will occur.
+func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff 
func(string)) {
+       if float64(len(word)) <= lengthCutOff {
+               emitBelowCutoff(word)
+       }
+}
+
+func init() {
+       beam.RegisterFunction(filterWordsAbove)
+       beam.RegisterFunction(filterWordsBelow)
+}
+
+// [END model_pardo_side_input_dofn]
+
+// addSideInput demonstrates passing a side input to a DoFn.
+func addSideInput(s beam.Scope, words beam.PCollection) (beam.PCollection, 
beam.PCollection) {
+       wordLengths := applyWordLen(s, words)
+
+       // [START model_pardo_side_input]
+       // avgWordLength is a PCollection containing a single element, a 
singleton.
+       avgWordLength := stats.Mean(s, wordLengths)
+
+       // Side inputs are added as with the beam.SideInput option to 
beam.ParDo.
+       wordsAboveCutOff := beam.ParDo(s, filterWordsAbove, words, 
beam.SideInput{Input: avgWordLength})
+       wordsBelowCutOff := beam.ParDo(s, filterWordsBelow, words, 
beam.SideInput{Input: avgWordLength})
+       // [END model_pardo_side_input]
+       return wordsAboveCutOff, wordsBelowCutOff
+}
+
+// isMarkedWord is a dummy function.
+func isMarkedWord(word string) bool {
+       return strings.HasPrefix(word, "MARKER")
+}
+
+// [START model_multiple_output_dofn]
+
+// processWords is a DoFn that has 3 output PCollections. The emitter functions
+// are matched in positional order to the PCollections returned by beam.ParDo3.
+func processWords(word string, emitBelowCutoff, emitAboveCutoff, emitMarked 
func(string)) {
+       const cutOff = 5
+       if len(word) < cutOff {
+               emitBelowCutoff(word)
+       } else {
+               emitAboveCutoff(word)
+       }
+       if isMarkedWord(word) {
+               emitMarked(word)
+       }
+}
+
+// processWordsMixed demonstrates mixing an emitter, with a standard return.
+// If a standard return is used, it will always be the first returned 
PCollection,
+// followed in positional order by the emitter functions.
+func processWordsMixed(word string, emitMarked func(string)) int {
+       if isMarkedWord(word) {
+               emitMarked(word)
+       }
+       return len(word)
+}
+
+func init() {
+       beam.RegisterFunction(processWords)
+       beam.RegisterFunction(processWordsMixed)
+}
+
+// [END model_multiple_output_dofn]
+
+func applyMultipleOut(s beam.Scope, words beam.PCollection) (belows, aboves, 
markeds, lengths, mixedMarkeds beam.PCollection) {
+       // [START model_multiple_output]
+       // beam.ParDo3 returns PCollections in the same order as
+       // the emit function parameters in processWords.
+       below, above, marked := beam.ParDo3(s, processWords, words)
+
+       // processWordsMixed uses both a standard return and an emitter 
function.
+       // The standard return produces the first PCollection from beam.ParDo2,
+       // and the emitter produces the second PCollection.
+       length, mixedMarked := beam.ParDo2(s, processWordsMixed, words)
+       // [END model_multiple_output]
+       return below, above, marked, length, mixedMarked
+}
+
+func extractWordsFn(line string, emitWords func(string)) {
+       words := strings.Split(line, " ")
+       for _, w := range words {
+               emitWords(w)
+       }
+}
+
+func init() {
+       beam.RegisterFunction(extractWordsFn)
+}
+
+// [START countwords_composite]
+// CountWords is a function that builds a composite PTransform
+// to count the number of times each word appears.
+func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
+       // A subscope is required for a function to become a composite 
transform.
+       // We assign it to the original scope variable s to shadow the original
+       // for the rest of the CountWords function.
+       s = s.Scope("CountWords")
+
+       // Since the same subscope is used for the following transforms,
+       // they are in the same composite PTransform.
+
+       // Convert lines of text into individual words.
+       words := beam.ParDo(s, extractWordsFn, lines)
+
+       // Count the number of times each word occurs.
+       wordCounts := stats.Count(s, words)
+
+       // Return any PCollections that should be available after
+       // the composite transform.
+       return wordCounts
+}
+
+// [END countwords_composite]
diff --git a/sdks/go/examples/snippets/04transforms_test.go 
b/sdks/go/examples/snippets/04transforms_test.go
index ebfa47b..76c85da 100644
--- a/sdks/go/examples/snippets/04transforms_test.go
+++ b/sdks/go/examples/snippets/04transforms_test.go
@@ -183,3 +183,43 @@ func TestPartition(t *testing.T) {
        passert.Equals(s, avg, Student{42})
        ptest.RunAndValidate(t, p)
 }
+
+func TestMultipleOutputs(t *testing.T) {
+       p, s, words := ptest.CreateList([]string{"a", "the", "pjamas", "art", 
"candy", "MARKERmarked"})
+       below, above, marked, lengths, mixedMarked := applyMultipleOut(s, words)
+
+       passert.Equals(s, below, "a", "the", "art")
+       passert.Equals(s, above, "pjamas", "candy", "MARKERmarked")
+       passert.Equals(s, marked, "MARKERmarked")
+       passert.Equals(s, lengths, 1, 3, 6, 3, 5, 12)
+       passert.Equals(s, mixedMarked, "MARKERmarked")
+
+       ptest.RunAndValidate(t, p)
+}
+
+func TestSideInputs(t *testing.T) {
+       p, s, words := ptest.CreateList([]string{"a", "the", "pjamas", "art", 
"candy", "garbage"})
+       above, below := addSideInput(s, words)
+       passert.Equals(s, above, "pjamas", "candy", "garbage")
+       passert.Equals(s, below, "a", "the", "art")
+       ptest.RunAndValidate(t, p)
+}
+
+func TestComposite(t *testing.T) {
+       p, s, lines := ptest.CreateList([]string{
+               "this test dataset has the word test",
+               "at least twice, because to test the Composite",
+               "CountWords, one needs test data to run it with",
+       })
+       // [START countwords_composite_call]
+       // A Composite PTransform function is called like any other function.
+       wordCounts := CountWords(s, lines) // returns a 
PCollection<KV<string,int>>
+       // [END countwords_composite_call]
+       testCount := beam.ParDo(s, func(k string, v int, emit func(int)) {
+               if k == "test" {
+                       emit(v)
+               }
+       }, wordCounts)
+       passert.Equals(s, testCount, 4)
+       ptest.RunAndValidate(t, p)
+}
diff --git a/website/www/site/content/en/documentation/programming-guide.md 
b/website/www/site/content/en/documentation/programming-guide.md
index 0d73f58..7edcd7a 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -1677,16 +1677,29 @@ Unexported fields are unable to be serialized, and will 
be silently ignored.</sp
 
 Some other serializability factors you should keep in mind are:
 
-* Transient fields in your function object are *not* transmitted to worker
+* <span class="language-java language-py">Transient</span><span 
class="langauage-go">Unexported</span>
+  fields in your function object are *not* transmitted to worker
   instances, because they are not automatically serialized.
 * Avoid loading a field with a large amount of data before serialization.
 * Individual instances of your function object cannot share data.
 * Mutating a function object after it gets applied will have no effect.
-* Take care when declaring your function object inline by using an anonymous
-  inner class instance. In a non-static context, your inner class instance will
-  implicitly contain a pointer to the enclosing class and that class' state.
-  That enclosing class will also be serialized, and thus the same 
considerations
-  that apply to the function object itself also apply to this outer class.
+
+<span class="language-java">
+
+> **Note:** Take care when declaring your function object inline by using an 
anonymous
+> inner class instance. In a non-static context, your inner class instance will
+> implicitly contain a pointer to the enclosing class and that class' state.
+> That enclosing class will also be serialized, and thus the same 
considerations
+> that apply to the function object itself also apply to this outer class.
+
+</span>
+
+<span class="language-go">
+
+> **Note:** There's no way to detect if a function is a closure. Closures will 
cause
+> runtime errors and pipeline failures. Avoid using anonymous functions when 
possible.
+
+</span>
 
 #### 4.3.2. Thread-compatibility {#user-code-thread-compatibility}
 
@@ -1694,9 +1707,9 @@ Your function object should be thread-compatible. Each 
instance of your function
 object is accessed by a single thread at a time on a worker instance, unless 
you
 explicitly create your own threads. Note, however, that **the Beam SDKs are not
 thread-safe**. If you create your own threads in your user code, you must
-provide your own synchronization. Note that static members in your function
+provide your own synchronization. <span class="language-java"> Note that 
static members in your function
 object are not passed to worker instances and that multiple instances of your
-function may be accessed from different threads.
+function may be accessed from different threads.</span>
 
 #### 4.3.3. Idempotence {#user-code-idempotence}
 
@@ -1771,6 +1784,20 @@ words = ...
 ...
 {{< /highlight >}}
 
+{{< highlight go >}}
+// Side inputs are provided using `beam.SideInput` in the DoFn's 
ProcessElement method.
+// Side inputs can be arbitrary PCollections, which can then be iterated over 
per element
+// in a DoFn.
+// Side input parameters appear after main input elements, and before any 
output emitters.
+words = ...
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" 
model_pardo_side_input >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" 
model_pardo_side_input_dofn >}}
+
+// The Go SDK doesn't support custom ViewFns.
+// See https://issues.apache.org/jira/browse/BEAM-3305 for details
+// on how to contribute them!
+{{< /highlight >}}
+
 #### 4.4.2. Side inputs and windowing {#side-inputs-windowing}
 
 A windowed `PCollection` may be infinite and thus cannot be compressed into a
@@ -1802,14 +1829,30 @@ a single global window and specify a trigger.
 
 ### 4.5. Additional outputs {#additional-outputs}
 
+{{< paragraph class="language-java language-python" >}}
 While `ParDo` always produces a main output `PCollection` (as the return value
 from `apply`), you can also have your `ParDo` produce any number of additional
 output `PCollection`s. If you choose to have multiple outputs, your `ParDo`
 returns all of the output `PCollection`s (including the main output) bundled
 together.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+While `beam.ParDo` always produces an output `PCollection`, your `DoFn` can 
produce any
+number of additional output `PCollections`s, or even none at all.
+If you choose to have multiple outputs, your `DoFn` needs to be called with 
the `ParDo`
+function that matches the number of outputs. `beam.ParDo2` for two output 
`PCollection`s,
+`beam.ParDo3` for three and so on until `beam.ParDo7`. If you need more, you 
can
+use `beam.ParDoN` which will return a `[]beam.PCollection`.
+{{< /paragraph >}}
 
 #### 4.5.1. Tags for multiple outputs {#output-tags}
 
+{{< paragraph class="language-go" >}}
+The Go SDK doesn't use output tags, and instead uses positional ordering for
+multiple output PCollections.
+{{< /paragraph >}}
+
 {{< highlight java >}}
 // To emit elements to multiple output PCollections, create a TupleTag object 
to identify each collection
 // that your ParDo produces. For example, if your ParDo produces three output 
PCollections (the main output
@@ -1872,8 +1915,24 @@ together.
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" 
model_pardo_with_tagged_outputs_iter >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" 
model_multiple_output >}}
+{{< /highlight >}}
+
 #### 4.5.2. Emitting to multiple outputs in your DoFn {#multiple-outputs-dofn}
 
+{{< paragraph class="language-go" >}}
+Call emitter functions as needed to produce 0 or more elements for its matching
+`PCollection`. The same value can be emitted with multiple emitters.
+As normal, do not mutate values after emitting them from any emitter.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+DoFns can also return a single element via the standard return.
+The standard return is always the first PCollection returned from beam.ParDo.
+Other emitters output to their own PCollections in their defined parameter 
order.
+{{< /paragraph >}}
+
 {{< highlight java >}}
 // Inside your ParDo's DoFn, you can emit an element to a specific output 
PCollection by providing a
 // MultiOutputReceiver to your process method, and passing in the appropriate 
TupleTag to obtain an OutputReceiver.
@@ -1910,6 +1969,10 @@ together.
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" 
model_pardo_with_undeclared_outputs >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" 
model_multiple_output_dofn >}}
+{{< /highlight >}}
+
 #### 4.5.3. Accessing additional parameters in your DoFn 
{#other-dofn-parameters}
 
 {{< paragraph class="language-java" >}}
@@ -1922,6 +1985,21 @@ In addition to the element, Beam will populate other 
parameters to your DoFn's `
 Any combination of these parameters can be added to your process method in any 
order.
 {{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+In addition to the element, Beam will populate other parameters to your DoFn's 
`ProcessElement` method.
+Any combination of these parameters can be added to your process method in a 
standard order.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+**context.Context:**
+To support consolidated logging and user defined metrics, a `context.Context` 
parameter can be requested.
+Per Go conventions, if present it's required to be the first parameter of the 
`DoFn` method.
+{{< /paragraph >}}
+
+{{< highlight go >}}
+func MyDoFn(ctx context.Context, word string) string { ... }
+{{< /highlight >}}
+
 {{< paragraph class="language-java" >}}
 **Timestamp:**
 To access the timestamp of an input element, add a parameter annotated with 
`@Timestamp` of type `Instant`. For example:
@@ -1932,6 +2010,11 @@ To access the timestamp of an input element, add a 
parameter annotated with `@Ti
 To access the timestamp of an input element, add a keyword parameter default 
to `DoFn.TimestampParam`. For example:
 {{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+**Timestamp:**
+To access the timestamp of an input element, add a `beam.EventTime` parameter 
before the element. For example:
+{{< /paragraph >}}
+
 {{< highlight java >}}
 .of(new DoFn<String, String>() {
      public void processElement(@Element String word, @Timestamp Instant 
timestamp) {
@@ -1949,6 +2032,10 @@ class ProcessRecord(beam.DoFn):
 
 {{< /highlight >}}
 
+{{< highlight go >}}
+func MyDoFn(ts beam.EventTime, word string) string { ... }
+{{< /highlight >}}
+
 {{< paragraph class="language-java" >}}
 **Window:**
 To access the window an input element falls into, add a parameter of the type 
of the window used for the input `PCollection`.
@@ -1965,6 +2052,15 @@ If an element falls in multiple windows (for example, 
this will happen when usin
 `process` method will be invoked multiple time for the element, once for each 
window.
 {{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+**Window:**
+To access the timestamp of an input element, add a `beam.Window` parameter 
before the element.
+If an element falls in multiple windows (for example, this will happen when 
using SlidingWindows),
+then the `ProcessElement` method will be invoked multiple time for the 
element, once for each window.
+Since `beam.Window` is an interface it's possible to type assert to the 
concrete implementation of the window.
+For example, when fixed windows are being used, the window is of type 
`window.IntervalWindow`.
+{{< /paragraph >}}
+
 {{< highlight java >}}
 .of(new DoFn<String, String>() {
      public void processElement(@Element String word, IntervalWindow window) {
@@ -1982,6 +2078,13 @@ class ProcessRecord(beam.DoFn):
 
 {{< /highlight >}}
 
+{{< highlight go >}}
+func MyDoFn(w beam.Window, word string) string {
+  iw := w.(window.IntervalWindow)
+  ...
+}
+{{< /highlight >}}
+
 {{< paragraph class="language-java" >}}
 **PaneInfo:**
 When triggers are used, Beam provides a `PaneInfo` object that contains 
information about the current firing. Using `PaneInfo`
@@ -1995,6 +2098,13 @@ you can determine whether this is an early or a late 
firing, and how many times
 This feature implementation in Python SDK is not fully completed; see more at 
[BEAM-3759](https://issues.apache.org/jira/browse/BEAM-3759).
 {{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+**PaneInfo:**
+This feature isn't implemented in the Go SDK; see more at 
[BEAM-3304](https://issues.apache.org/jira/browse/BEAM-3304). However, once 
implemented, when triggers are used, Beam provides
+information about the current firing. The pane lets you determine whether this
+is an early or a late firing, and how many times this window has already fired 
for this key.
+{{< /paragraph >}}
+
 {{< highlight java >}}
 .of(new DoFn<String, String>() {
      public void processElement(@Element String word, PaneInfo paneInfo) {
@@ -2012,6 +2122,12 @@ class ProcessRecord(beam.DoFn):
 
 {{< /highlight >}}
 
+{{< highlight go >}}
+// PaneInfo and triggers are not yet implemented in the Go SDK.
+// See https://issues.apache.org/jira/browse/BEAM-3304 for info
+// on contributing triggers and panes.
+{{< /highlight >}}
+
 {{< paragraph class="language-java" >}}
 **PipelineOptions:**
 The `PipelineOptions` for the current pipeline can always be accessed in a 
process method by adding it
@@ -2039,6 +2155,11 @@ Timers and States are explained in more detail in the
 [Timely (and Stateful) Processing with Apache 
Beam](/blog/2017/08/28/timely-processing.html) blog post.
 {{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+**Timer and State:**
+This feature isn't implemented in the Go SDK; see more at 
[BEAM-10660](https://issues.apache.org/jira/browse/BEAM-10660). Once 
implemented, user defined Timer and State parameters can be used in a stateful 
DoFn.
+{{< /paragraph >}}
+
 {{< highlight py >}}
 
 class StatefulDoFn(beam.DoFn):
@@ -2097,6 +2218,12 @@ class StatefulDoFn(beam.DoFn):
 
 {{< /highlight >}}
 
+{{< highlight go >}}
+// State and Timers are yet implemented in the Go SDK.
+// See https://issues.apache.org/jira/browse/BEAM-10660 for info
+// on contributing State and Timers.
+{{< /highlight >}}
+
 ### 4.6. Composite transforms {#composite-transforms}
 
 Transforms can have a nested structure, where a complex transform performs
@@ -2109,14 +2236,17 @@ The Beam SDK comes packed with many useful composite 
transforms. See the API
 reference pages for a list of transforms:
   * [Pre-written Beam transforms for 
Java](https://beam.apache.org/releases/javadoc/{{< param release_latest 
>}}/index.html?org/apache/beam/sdk/transforms/package-summary.html)
   * [Pre-written Beam transforms for 
Python](https://beam.apache.org/releases/pydoc/{{< param release_latest 
>}}/apache_beam.transforms.html)
+  * [Pre-written Beam transforms for 
Go](https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/transforms)
 
 #### 4.6.1. An example composite transform {#composite-transform-example}
 
 The `CountWords` transform in the [WordCount example 
program](/get-started/wordcount-example/)
-is an example of a composite transform. `CountWords` is a `PTransform` subclass
-that consists of multiple nested transforms.
+is an example of a composite transform. `CountWords` is a `PTransform`
+<span class="language-java language-py">subclass</span> that consists
+of multiple nested transforms.
 
-In its `expand` method, the `CountWords` transform applies the following
+<span class="language-java language-py">In its `expand` method, the</span>
+<span class="language-go">The</span> `CountWords` transform applies the 
following
 transform operations:
 
   1. It applies a `ParDo` on the input `PCollection` of text lines, producing
@@ -2149,15 +2279,21 @@ transform operations:
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" 
pipeline_monitoring_composite >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" 
countwords_composite >}}
+{{< /highlight >}}
+
 > **Note:** Because `Count` is itself a composite transform,
 > `CountWords` is also a nested composite transform.
 
 #### 4.6.2. Creating a composite transform {#composite-transform-creation}
 
+{{< paragraph class="language-java language-py" >}}
 To create your own composite transform, create a subclass of the `PTransform`
 class and override the `expand` method to specify the actual processing logic.
 You can then use this transform just as you would a built-in transform from the
 Beam SDK.
+{{< /paragraph >}}
 
 {{< paragraph class="language-java" >}}
 For the `PTransform` class type parameters, you pass the `PCollection` types
@@ -2166,8 +2302,23 @@ that your transform takes as input, and produces as 
output. To take multiple
 of the multi-collection types for the relevant type parameter.
 {{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+To create your own composite `PTransform` call the `Scope` method on the 
current
+pipeline scope variable. Transforms passed this new sub-`Scope` will be a part 
of
+the same composite `PTransform`.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+To be able to re-use your Composite, build it inside a normal Go function or 
method.
+This function is passed a scope and input PCollections, and returns any
+output PCollections it produces. **Note:** Such functions cannot be passed 
directly to
+`ParDo` functions.
+{{< /paragraph >}}
+
+{{< paragraph class="language-java language-py" >}}
 The following code sample shows how to declare a `PTransform` that accepts a
 `PCollection` of `String`s for input, and outputs a `PCollection` of 
`Integer`s:
+{{< /paragraph >}}
 
 {{< highlight java >}}
   static class ComputeWordLengths
@@ -2180,14 +2331,27 @@ The following code sample shows how to declare a 
`PTransform` that accepts a
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" 
model_composite_transform >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" 
countwords_composite >}}
+{{< /highlight >}}
+
+{{< paragraph class="language-java language-py" >}}
 Within your `PTransform` subclass, you'll need to override the `expand` method.
 The `expand` method is where you add the processing logic for the `PTransform`.
 Your override of `expand` must accept the appropriate type of input
 `PCollection` as a parameter, and specify the output `PCollection` as the 
return
 value.
+{{< /paragraph >}}
 
+{{< paragraph class="language-java language-py" >}}
 The following code sample shows how to override `expand` for the
 `ComputeWordLengths` class declared in the previous example:
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+The following code sample shows how to call the `CountWords` composite 
PTransform,
+adding it to your pipeline:
+{{< /paragraph >}}
 
 {{< highlight java >}}
   static class ComputeWordLengths
@@ -2204,20 +2368,38 @@ The following code sample shows how to override 
`expand` for the
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" 
model_composite_transform >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+lines := ... // a PCollection of strings.
+{{< code_sample "sdks/go/examples/snippets/04transforms_test.go" 
countwords_composite_call >}}
+{{< /highlight >}}
+
+{{< paragraph class="language-java language-py" >}}
 As long as you override the `expand` method in your `PTransform` subclass to
 accept the appropriate input `PCollection`(s) and return the corresponding
 output `PCollection`(s), you can include as many transforms as you want. These
 transforms can include core transforms, composite transforms, or the transforms
 included in the Beam SDK libraries.
+{{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+Your composite `PTransform`s can include as many transforms as you want. These
+transforms can include core transforms, other composite transforms, or the 
transforms
+included in the Beam SDK libraries. They can also consume and return as many
+`PCollection`s as are necessary.
+{{< /paragraph >}}
+
+{{< paragraph class="language-java language-py" >}}
 Your composite transform's parameters and return value must match the initial
 input type and final return type for the entire transform, even if the
 transform's intermediate data changes type multiple times.
+{{< /paragraph >}}
 
+{{< paragraph class="language-java language-py" >}}
 **Note:** The `expand` method of a `PTransform` is not meant to be invoked
 directly by the user of a transform. Instead, you should call the `apply` 
method
 on the `PCollection` itself, with the transform as an argument. This allows
 transforms to be nested within the structure of your pipeline.
+{{< /paragraph >}}
 
 #### 4.6.3. PTransform Style Guide {#ptransform-style-guide}
 

Reply via email to