lostluck commented on a change in pull request #15447: URL: https://github.com/apache/beam/pull/15447#discussion_r705800575
########## File path: website/www/site/content/en/documentation/programming-guide.md ########## @@ -6416,6 +6421,68 @@ with pipeline as p: 4. After the job has been submitted to the Beam runner, shutdown the expansion service by terminating the expansion service process. +#### 13.2.3. Using cross-language transforms in a Go pipeline + +If a Go-specific wrapper for a cross-language is available, use that; otherwise, you have to use the +lower-level [CrossLanguage](https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#CrossLanguage) +function to access the transform. + +**Expansion Services** + +The Go SDK does not yet support automatically starting an expansion service. In order to use +cross-language transforms, you must manually start any necessary expansion services on your local +machine and ensure they are accessible to your code during pipeline construction. + +**Using an SDK wrapper** + +To use a cross-language transform through an SDK wrapper, include the module for the SDK wrapper +and call it from your pipeline as shown in the example: + +{{< highlight >}} +import ( + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio" +) + +// Kafka Read using previously defined values. +kafkaRecords := kafkaio.Read( + s, + expansionAddr, // Address of expansion service. + bootstrapAddr, + []string{topicName}, + kafkaio.MaxNumRecords(numRecords), + kafkaio.ConsumerConfigs(map[string]string{"auto.offset.reset": "earliest"})) +{{< /highlight >}} + +**Using the CrossLanguage function** + +When an SDK-specific wrapper isn't available, you will have to access the cross-language transform through the `CrossLanguage` function. + +1. Make sure you have the appropriate expansion service running. See the expansion service section for details. +2. Make sure the transform you're trying to use is available and can be used by the expansion service. + For Java, make sure the builder and registrar for the transform are available in the classpath of + the expansion service. +3. Use the `CrossLanguage` function in your pipeline as appropriate. Reference the URN, Payload, + expansion service address, and define inputs and outputs. You can use the + [CrossLanguagePayload](https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#CrossLanguagePayload) + function as a helper for encoding a payload. You can use the + [UnnamedInput](https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#UnnamedInput) and + [UnnamedOutput](https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#UnnamedOutput) + functions as shortcuts for single, unnamed inputs/outputs or define a map for named ones. + + {{< highlight >}} +type prefixPayload struct { + Data string +} +urn := "beam:transforms:xlang:test:prefix" +payload := beam.CrossLanguagePayload(prefixPayload{Data: prefix}) +expansionAddr := <Address of expansion service> +outT := beam.UnnamedOutput(typex.New(reflectx.String)) +res := beam.CrossLanguage(s, urn, payload, expansionAddr, beam.UnnamedInput(inputPCol)) Review comment: I'm fine with deferring it to a later PR. That said, just because the authors of the other sections didn't use the full facilities of the hugo static site generator doesn't mean we need to be held to their restrictions. We already use them for the Go SDK in the early sections of the file, and other SDKs use them in other sections of the file. The snippets files for go live in the examples/snippets directory https://github.com/apache/beam/tree/master/sdks/go/examples/snippets The main format is to prefix the filename with the section number (so the file should be 13xlang.go), so that way the file ordering/where code snippets are found roughly match the order in the BPG. I agree having a proper mock expansion service would be ideal, but not all details need to be explicit in the presented example. Eg. "<Address of expansion service>" could be a function `getExpansionServiceAddr()` that has no real implementation. Note that it's not really necessary for there to be executable tests in all cases. We can add a test to excercise the pipeline construction later, once we can mock it out. It is necessary for the samples to at least compile though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
