This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cfeac3b [BEAM-12513] Finish adding Go to BPG 4. Transforms (#15142)
cfeac3b is described below
commit cfeac3bcae3c5c2e03f4a57c2c4258fe4fa9e22e
Author: Robert Burke <[email protected]>
AuthorDate: Thu Jul 8 13:29:36 2021 -0700
[BEAM-12513] Finish adding Go to BPG 4. Transforms (#15142)
* [BEAM-12513] Finish adding Go to BPG 4. Transforms
---
sdks/go/examples/snippets/04transforms.go | 141 ++++++++++++++
sdks/go/examples/snippets/04transforms_test.go | 40 ++++
.../content/en/documentation/programming-guide.md | 204 +++++++++++++++++++--
3 files changed, 374 insertions(+), 11 deletions(-)
diff --git a/sdks/go/examples/snippets/04transforms.go
b/sdks/go/examples/snippets/04transforms.go
index 22eb3eb..c926a1d 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -278,6 +278,7 @@ func init() {
// [END model_multiple_pcollections_partition_fn]
+// applyPartition returns the 40th percentile of students.
func applyPartition(s beam.Scope, students beam.PCollection) beam.PCollection {
// [START model_multiple_pcollections_partition]
// Partition returns a slice of PCollections
@@ -287,3 +288,143 @@ func applyPartition(s beam.Scope, students
beam.PCollection) beam.PCollection {
// [END model_multiple_pcollections_partition]
return fortiethPercentile
}
+
+// [START model_pardo_side_input_dofn]
+
+// filterWordsAbove is a DoFn that takes in a word,
+// and a singleton side input iterator as of a length cut off
+// and only emits words that are beneath that cut off.
+//
+// If the iterator has no elements, an error is returned, aborting processing.
+func filterWordsAbove(word string, lengthCutOffIter func(*float64) bool,
emitAboveCutoff func(string)) error {
+ var cutOff float64
+ ok := lengthCutOffIter(&cutOff)
+ if !ok {
+ return fmt.Errorf("No length cutoff provided.")
+ }
+ if float64(len(word)) > cutOff {
+ emitAboveCutoff(word)
+ }
+ return nil
+}
+
+// filterWordsBelow is a DoFn that takes in a word,
+// and a singleton side input of a length cut off
+// and only emits words that are beneath that cut off.
+//
+// If the side input isn't a singleton, a runtime panic will occur.
+func filterWordsBelow(word string, lengthCutOff float64, emitBelowCutoff
func(string)) {
+ if float64(len(word)) <= lengthCutOff {
+ emitBelowCutoff(word)
+ }
+}
+
+func init() {
+ beam.RegisterFunction(filterWordsAbove)
+ beam.RegisterFunction(filterWordsBelow)
+}
+
+// [END model_pardo_side_input_dofn]
+
+// addSideInput demonstrates passing a side input to a DoFn.
+func addSideInput(s beam.Scope, words beam.PCollection) (beam.PCollection,
beam.PCollection) {
+ wordLengths := applyWordLen(s, words)
+
+ // [START model_pardo_side_input]
+ // avgWordLength is a PCollection containing a single element, a
singleton.
+ avgWordLength := stats.Mean(s, wordLengths)
+
+ // Side inputs are added as with the beam.SideInput option to
beam.ParDo.
+ wordsAboveCutOff := beam.ParDo(s, filterWordsAbove, words,
beam.SideInput{Input: avgWordLength})
+ wordsBelowCutOff := beam.ParDo(s, filterWordsBelow, words,
beam.SideInput{Input: avgWordLength})
+ // [END model_pardo_side_input]
+ return wordsAboveCutOff, wordsBelowCutOff
+}
+
+// isMarkedWord is a dummy function.
+func isMarkedWord(word string) bool {
+ return strings.HasPrefix(word, "MARKER")
+}
+
+// [START model_multiple_output_dofn]
+
+// processWords is a DoFn that has 3 output PCollections. The emitter functions
+// are matched in positional order to the PCollections returned by beam.ParDo3.
+func processWords(word string, emitBelowCutoff, emitAboveCutoff, emitMarked
func(string)) {
+ const cutOff = 5
+ if len(word) < cutOff {
+ emitBelowCutoff(word)
+ } else {
+ emitAboveCutoff(word)
+ }
+ if isMarkedWord(word) {
+ emitMarked(word)
+ }
+}
+
+// processWordsMixed demonstrates mixing an emitter, with a standard return.
+// If a standard return is used, it will always be the first returned
PCollection,
+// followed in positional order by the emitter functions.
+func processWordsMixed(word string, emitMarked func(string)) int {
+ if isMarkedWord(word) {
+ emitMarked(word)
+ }
+ return len(word)
+}
+
+func init() {
+ beam.RegisterFunction(processWords)
+ beam.RegisterFunction(processWordsMixed)
+}
+
+// [END model_multiple_output_dofn]
+
+func applyMultipleOut(s beam.Scope, words beam.PCollection) (belows, aboves,
markeds, lengths, mixedMarkeds beam.PCollection) {
+ // [START model_multiple_output]
+ // beam.ParDo3 returns PCollections in the same order as
+ // the emit function parameters in processWords.
+ below, above, marked := beam.ParDo3(s, processWords, words)
+
+ // processWordsMixed uses both a standard return and an emitter
function.
+ // The standard return produces the first PCollection from beam.ParDo2,
+ // and the emitter produces the second PCollection.
+ length, mixedMarked := beam.ParDo2(s, processWordsMixed, words)
+ // [END model_multiple_output]
+ return below, above, marked, length, mixedMarked
+}
+
+func extractWordsFn(line string, emitWords func(string)) {
+ words := strings.Split(line, " ")
+ for _, w := range words {
+ emitWords(w)
+ }
+}
+
+func init() {
+ beam.RegisterFunction(extractWordsFn)
+}
+
+// [START countwords_composite]
+// CountWords is a function that builds a composite PTransform
+// to count the number of times each word appears.
+func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
+ // A subscope is required for a function to become a composite
transform.
+ // We assign it to the original scope variable s to shadow the original
+ // for the rest of the CountWords function.
+ s = s.Scope("CountWords")
+
+ // Since the same subscope is used for the following transforms,
+ // they are in the same composite PTransform.
+
+ // Convert lines of text into individual words.
+ words := beam.ParDo(s, extractWordsFn, lines)
+
+ // Count the number of times each word occurs.
+ wordCounts := stats.Count(s, words)
+
+ // Return any PCollections that should be available after
+ // the composite transform.
+ return wordCounts
+}
+
+// [END countwords_composite]
diff --git a/sdks/go/examples/snippets/04transforms_test.go
b/sdks/go/examples/snippets/04transforms_test.go
index ebfa47b..76c85da 100644
--- a/sdks/go/examples/snippets/04transforms_test.go
+++ b/sdks/go/examples/snippets/04transforms_test.go
@@ -183,3 +183,43 @@ func TestPartition(t *testing.T) {
passert.Equals(s, avg, Student{42})
ptest.RunAndValidate(t, p)
}
+
+func TestMultipleOutputs(t *testing.T) {
+ p, s, words := ptest.CreateList([]string{"a", "the", "pjamas", "art",
"candy", "MARKERmarked"})
+ below, above, marked, lengths, mixedMarked := applyMultipleOut(s, words)
+
+ passert.Equals(s, below, "a", "the", "art")
+ passert.Equals(s, above, "pjamas", "candy", "MARKERmarked")
+ passert.Equals(s, marked, "MARKERmarked")
+ passert.Equals(s, lengths, 1, 3, 6, 3, 5, 12)
+ passert.Equals(s, mixedMarked, "MARKERmarked")
+
+ ptest.RunAndValidate(t, p)
+}
+
+func TestSideInputs(t *testing.T) {
+ p, s, words := ptest.CreateList([]string{"a", "the", "pjamas", "art",
"candy", "garbage"})
+ above, below := addSideInput(s, words)
+ passert.Equals(s, above, "pjamas", "candy", "garbage")
+ passert.Equals(s, below, "a", "the", "art")
+ ptest.RunAndValidate(t, p)
+}
+
+func TestComposite(t *testing.T) {
+ p, s, lines := ptest.CreateList([]string{
+ "this test dataset has the word test",
+ "at least twice, because to test the Composite",
+ "CountWords, one needs test data to run it with",
+ })
+ // [START countwords_composite_call]
+ // A Composite PTransform function is called like any other function.
+ wordCounts := CountWords(s, lines) // returns a
PCollection<KV<string,int>>
+ // [END countwords_composite_call]
+ testCount := beam.ParDo(s, func(k string, v int, emit func(int)) {
+ if k == "test" {
+ emit(v)
+ }
+ }, wordCounts)
+ passert.Equals(s, testCount, 4)
+ ptest.RunAndValidate(t, p)
+}
diff --git a/website/www/site/content/en/documentation/programming-guide.md
b/website/www/site/content/en/documentation/programming-guide.md
index 0d73f58..7edcd7a 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -1677,16 +1677,29 @@ Unexported fields are unable to be serialized, and will
be silently ignored.</sp
Some other serializability factors you should keep in mind are:
-* Transient fields in your function object are *not* transmitted to worker
+* <span class="language-java language-py">Transient</span><span
class="langauage-go">Unexported</span>
+ fields in your function object are *not* transmitted to worker
instances, because they are not automatically serialized.
* Avoid loading a field with a large amount of data before serialization.
* Individual instances of your function object cannot share data.
* Mutating a function object after it gets applied will have no effect.
-* Take care when declaring your function object inline by using an anonymous
- inner class instance. In a non-static context, your inner class instance will
- implicitly contain a pointer to the enclosing class and that class' state.
- That enclosing class will also be serialized, and thus the same
considerations
- that apply to the function object itself also apply to this outer class.
+
+<span class="language-java">
+
+> **Note:** Take care when declaring your function object inline by using an
anonymous
+> inner class instance. In a non-static context, your inner class instance will
+> implicitly contain a pointer to the enclosing class and that class' state.
+> That enclosing class will also be serialized, and thus the same
considerations
+> that apply to the function object itself also apply to this outer class.
+
+</span>
+
+<span class="language-go">
+
+> **Note:** There's no way to detect if a function is a closure. Closures will
cause
+> runtime errors and pipeline failures. Avoid using anonymous functions when
possible.
+
+</span>
#### 4.3.2. Thread-compatibility {#user-code-thread-compatibility}
@@ -1694,9 +1707,9 @@ Your function object should be thread-compatible. Each
instance of your function
object is accessed by a single thread at a time on a worker instance, unless
you
explicitly create your own threads. Note, however, that **the Beam SDKs are not
thread-safe**. If you create your own threads in your user code, you must
-provide your own synchronization. Note that static members in your function
+provide your own synchronization. <span class="language-java"> Note that
static members in your function
object are not passed to worker instances and that multiple instances of your
-function may be accessed from different threads.
+function may be accessed from different threads.</span>
#### 4.3.3. Idempotence {#user-code-idempotence}
@@ -1771,6 +1784,20 @@ words = ...
...
{{< /highlight >}}
+{{< highlight go >}}
+// Side inputs are provided using `beam.SideInput` in the DoFn's
ProcessElement method.
+// Side inputs can be arbitrary PCollections, which can then be iterated over
per element
+// in a DoFn.
+// Side input parameters appear after main input elements, and before any
output emitters.
+words = ...
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
model_pardo_side_input >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
model_pardo_side_input_dofn >}}
+
+// The Go SDK doesn't support custom ViewFns.
+// See https://issues.apache.org/jira/browse/BEAM-3305 for details
+// on how to contribute them!
+{{< /highlight >}}
+
#### 4.4.2. Side inputs and windowing {#side-inputs-windowing}
A windowed `PCollection` may be infinite and thus cannot be compressed into a
@@ -1802,14 +1829,30 @@ a single global window and specify a trigger.
### 4.5. Additional outputs {#additional-outputs}
+{{< paragraph class="language-java language-python" >}}
While `ParDo` always produces a main output `PCollection` (as the return value
from `apply`), you can also have your `ParDo` produce any number of additional
output `PCollection`s. If you choose to have multiple outputs, your `ParDo`
returns all of the output `PCollection`s (including the main output) bundled
together.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+While `beam.ParDo` always produces an output `PCollection`, your `DoFn` can
produce any
+number of additional output `PCollections`s, or even none at all.
+If you choose to have multiple outputs, your `DoFn` needs to be called with
the `ParDo`
+function that matches the number of outputs. `beam.ParDo2` for two output
`PCollection`s,
+`beam.ParDo3` for three and so on until `beam.ParDo7`. If you need more, you
can
+use `beam.ParDoN` which will return a `[]beam.PCollection`.
+{{< /paragraph >}}
#### 4.5.1. Tags for multiple outputs {#output-tags}
+{{< paragraph class="language-go" >}}
+The Go SDK doesn't use output tags, and instead uses positional ordering for
+multiple output PCollections.
+{{< /paragraph >}}
+
{{< highlight java >}}
// To emit elements to multiple output PCollections, create a TupleTag object
to identify each collection
// that your ParDo produces. For example, if your ParDo produces three output
PCollections (the main output
@@ -1872,8 +1915,24 @@ together.
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
model_pardo_with_tagged_outputs_iter >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
model_multiple_output >}}
+{{< /highlight >}}
+
#### 4.5.2. Emitting to multiple outputs in your DoFn {#multiple-outputs-dofn}
+{{< paragraph class="language-go" >}}
+Call emitter functions as needed to produce 0 or more elements for its matching
+`PCollection`. The same value can be emitted with multiple emitters.
+As normal, do not mutate values after emitting them from any emitter.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+DoFns can also return a single element via the standard return.
+The standard return is always the first PCollection returned from beam.ParDo.
+Other emitters output to their own PCollections in their defined parameter
order.
+{{< /paragraph >}}
+
{{< highlight java >}}
// Inside your ParDo's DoFn, you can emit an element to a specific output
PCollection by providing a
// MultiOutputReceiver to your process method, and passing in the appropriate
TupleTag to obtain an OutputReceiver.
@@ -1910,6 +1969,10 @@ together.
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
model_pardo_with_undeclared_outputs >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
model_multiple_output_dofn >}}
+{{< /highlight >}}
+
#### 4.5.3. Accessing additional parameters in your DoFn
{#other-dofn-parameters}
{{< paragraph class="language-java" >}}
@@ -1922,6 +1985,21 @@ In addition to the element, Beam will populate other
parameters to your DoFn's `
Any combination of these parameters can be added to your process method in any
order.
{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+In addition to the element, Beam will populate other parameters to your DoFn's
`ProcessElement` method.
+Any combination of these parameters can be added to your process method in a
standard order.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+**context.Context:**
+To support consolidated logging and user defined metrics, a `context.Context`
parameter can be requested.
+Per Go conventions, if present it's required to be the first parameter of the
`DoFn` method.
+{{< /paragraph >}}
+
+{{< highlight go >}}
+func MyDoFn(ctx context.Context, word string) string { ... }
+{{< /highlight >}}
+
{{< paragraph class="language-java" >}}
**Timestamp:**
To access the timestamp of an input element, add a parameter annotated with
`@Timestamp` of type `Instant`. For example:
@@ -1932,6 +2010,11 @@ To access the timestamp of an input element, add a
parameter annotated with `@Ti
To access the timestamp of an input element, add a keyword parameter default
to `DoFn.TimestampParam`. For example:
{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+**Timestamp:**
+To access the timestamp of an input element, add a `beam.EventTime` parameter
before the element. For example:
+{{< /paragraph >}}
+
{{< highlight java >}}
.of(new DoFn<String, String>() {
public void processElement(@Element String word, @Timestamp Instant
timestamp) {
@@ -1949,6 +2032,10 @@ class ProcessRecord(beam.DoFn):
{{< /highlight >}}
+{{< highlight go >}}
+func MyDoFn(ts beam.EventTime, word string) string { ... }
+{{< /highlight >}}
+
{{< paragraph class="language-java" >}}
**Window:**
To access the window an input element falls into, add a parameter of the type
of the window used for the input `PCollection`.
@@ -1965,6 +2052,15 @@ If an element falls in multiple windows (for example,
this will happen when usin
`process` method will be invoked multiple time for the element, once for each
window.
{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+**Window:**
+To access the timestamp of an input element, add a `beam.Window` parameter
before the element.
+If an element falls in multiple windows (for example, this will happen when
using SlidingWindows),
+then the `ProcessElement` method will be invoked multiple time for the
element, once for each window.
+Since `beam.Window` is an interface it's possible to type assert to the
concrete implementation of the window.
+For example, when fixed windows are being used, the window is of type
`window.IntervalWindow`.
+{{< /paragraph >}}
+
{{< highlight java >}}
.of(new DoFn<String, String>() {
public void processElement(@Element String word, IntervalWindow window) {
@@ -1982,6 +2078,13 @@ class ProcessRecord(beam.DoFn):
{{< /highlight >}}
+{{< highlight go >}}
+func MyDoFn(w beam.Window, word string) string {
+ iw := w.(window.IntervalWindow)
+ ...
+}
+{{< /highlight >}}
+
{{< paragraph class="language-java" >}}
**PaneInfo:**
When triggers are used, Beam provides a `PaneInfo` object that contains
information about the current firing. Using `PaneInfo`
@@ -1995,6 +2098,13 @@ you can determine whether this is an early or a late
firing, and how many times
This feature implementation in Python SDK is not fully completed; see more at
[BEAM-3759](https://issues.apache.org/jira/browse/BEAM-3759).
{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+**PaneInfo:**
+This feature isn't implemented in the Go SDK; see more at
[BEAM-3304](https://issues.apache.org/jira/browse/BEAM-3304). However, once
implemented, when triggers are used, Beam provides
+information about the current firing. The pane lets you determine whether this
+is an early or a late firing, and how many times this window has already fired
for this key.
+{{< /paragraph >}}
+
{{< highlight java >}}
.of(new DoFn<String, String>() {
public void processElement(@Element String word, PaneInfo paneInfo) {
@@ -2012,6 +2122,12 @@ class ProcessRecord(beam.DoFn):
{{< /highlight >}}
+{{< highlight go >}}
+// PaneInfo and triggers are not yet implemented in the Go SDK.
+// See https://issues.apache.org/jira/browse/BEAM-3304 for info
+// on contributing triggers and panes.
+{{< /highlight >}}
+
{{< paragraph class="language-java" >}}
**PipelineOptions:**
The `PipelineOptions` for the current pipeline can always be accessed in a
process method by adding it
@@ -2039,6 +2155,11 @@ Timers and States are explained in more detail in the
[Timely (and Stateful) Processing with Apache
Beam](/blog/2017/08/28/timely-processing.html) blog post.
{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+**Timer and State:**
+This feature isn't implemented in the Go SDK; see more at
[BEAM-10660](https://issues.apache.org/jira/browse/BEAM-10660). Once
implemented, user defined Timer and State parameters can be used in a stateful
DoFn.
+{{< /paragraph >}}
+
{{< highlight py >}}
class StatefulDoFn(beam.DoFn):
@@ -2097,6 +2218,12 @@ class StatefulDoFn(beam.DoFn):
{{< /highlight >}}
+{{< highlight go >}}
+// State and Timers are yet implemented in the Go SDK.
+// See https://issues.apache.org/jira/browse/BEAM-10660 for info
+// on contributing State and Timers.
+{{< /highlight >}}
+
### 4.6. Composite transforms {#composite-transforms}
Transforms can have a nested structure, where a complex transform performs
@@ -2109,14 +2236,17 @@ The Beam SDK comes packed with many useful composite
transforms. See the API
reference pages for a list of transforms:
* [Pre-written Beam transforms for
Java](https://beam.apache.org/releases/javadoc/{{< param release_latest
>}}/index.html?org/apache/beam/sdk/transforms/package-summary.html)
* [Pre-written Beam transforms for
Python](https://beam.apache.org/releases/pydoc/{{< param release_latest
>}}/apache_beam.transforms.html)
+ * [Pre-written Beam transforms for
Go](https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/transforms)
#### 4.6.1. An example composite transform {#composite-transform-example}
The `CountWords` transform in the [WordCount example
program](/get-started/wordcount-example/)
-is an example of a composite transform. `CountWords` is a `PTransform` subclass
-that consists of multiple nested transforms.
+is an example of a composite transform. `CountWords` is a `PTransform`
+<span class="language-java language-py">subclass</span> that consists
+of multiple nested transforms.
-In its `expand` method, the `CountWords` transform applies the following
+<span class="language-java language-py">In its `expand` method, the</span>
+<span class="language-go">The</span> `CountWords` transform applies the
following
transform operations:
1. It applies a `ParDo` on the input `PCollection` of text lines, producing
@@ -2149,15 +2279,21 @@ transform operations:
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py"
pipeline_monitoring_composite >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
countwords_composite >}}
+{{< /highlight >}}
+
> **Note:** Because `Count` is itself a composite transform,
> `CountWords` is also a nested composite transform.
#### 4.6.2. Creating a composite transform {#composite-transform-creation}
+{{< paragraph class="language-java language-py" >}}
To create your own composite transform, create a subclass of the `PTransform`
class and override the `expand` method to specify the actual processing logic.
You can then use this transform just as you would a built-in transform from the
Beam SDK.
+{{< /paragraph >}}
{{< paragraph class="language-java" >}}
For the `PTransform` class type parameters, you pass the `PCollection` types
@@ -2166,8 +2302,23 @@ that your transform takes as input, and produces as
output. To take multiple
of the multi-collection types for the relevant type parameter.
{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+To create your own composite `PTransform` call the `Scope` method on the
current
+pipeline scope variable. Transforms passed this new sub-`Scope` will be a part
of
+the same composite `PTransform`.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+To be able to re-use your Composite, build it inside a normal Go function or
method.
+This function is passed a scope and input PCollections, and returns any
+output PCollections it produces. **Note:** Such functions cannot be passed
directly to
+`ParDo` functions.
+{{< /paragraph >}}
+
+{{< paragraph class="language-java language-py" >}}
The following code sample shows how to declare a `PTransform` that accepts a
`PCollection` of `String`s for input, and outputs a `PCollection` of
`Integer`s:
+{{< /paragraph >}}
{{< highlight java >}}
static class ComputeWordLengths
@@ -2180,14 +2331,27 @@ The following code sample shows how to declare a
`PTransform` that accepts a
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
model_composite_transform >}}
{{< /highlight >}}
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/04transforms.go"
countwords_composite >}}
+{{< /highlight >}}
+
+{{< paragraph class="language-java language-py" >}}
Within your `PTransform` subclass, you'll need to override the `expand` method.
The `expand` method is where you add the processing logic for the `PTransform`.
Your override of `expand` must accept the appropriate type of input
`PCollection` as a parameter, and specify the output `PCollection` as the
return
value.
+{{< /paragraph >}}
+{{< paragraph class="language-java language-py" >}}
The following code sample shows how to override `expand` for the
`ComputeWordLengths` class declared in the previous example:
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+The following code sample shows how to call the `CountWords` composite
PTransform,
+adding it to your pipeline:
+{{< /paragraph >}}
{{< highlight java >}}
static class ComputeWordLengths
@@ -2204,20 +2368,38 @@ The following code sample shows how to override
`expand` for the
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py"
model_composite_transform >}}
{{< /highlight >}}
+{{< highlight go >}}
+lines := ... // a PCollection of strings.
+{{< code_sample "sdks/go/examples/snippets/04transforms_test.go"
countwords_composite_call >}}
+{{< /highlight >}}
+
+{{< paragraph class="language-java language-py" >}}
As long as you override the `expand` method in your `PTransform` subclass to
accept the appropriate input `PCollection`(s) and return the corresponding
output `PCollection`(s), you can include as many transforms as you want. These
transforms can include core transforms, composite transforms, or the transforms
included in the Beam SDK libraries.
+{{< /paragraph >}}
+{{< paragraph class="language-go" >}}
+Your composite `PTransform`s can include as many transforms as you want. These
+transforms can include core transforms, other composite transforms, or the
transforms
+included in the Beam SDK libraries. They can also consume and return as many
+`PCollection`s as are necessary.
+{{< /paragraph >}}
+
+{{< paragraph class="language-java language-py" >}}
Your composite transform's parameters and return value must match the initial
input type and final return type for the entire transform, even if the
transform's intermediate data changes type multiple times.
+{{< /paragraph >}}
+{{< paragraph class="language-java language-py" >}}
**Note:** The `expand` method of a `PTransform` is not meant to be invoked
directly by the user of a transform. Instead, you should call the `apply`
method
on the `PCollection` itself, with the transform as an argument. This allows
transforms to be nested within the structure of your pipeline.
+{{< /paragraph >}}
#### 4.6.3. PTransform Style Guide {#ptransform-style-guide}