pcoet commented on a change in pull request #16001:
URL: https://github.com/apache/beam/pull/16001#discussion_r751718298



##########
File path: 
website/www/site/content/en/documentation/sdks/python-multi-language-pipelines.md
##########
@@ -0,0 +1,213 @@
+---
+type: languages
+title: "Python multi-language pipelines"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Python multi-language pipelines
+
+This page provides a high-level overview of creating multi-language pipelines 
with the Apache Beam SDK for Python. To build and run a multi-language Python 
pipeline, you need a Python environment with the Beam SDK installed. If you 
don’t have an environment set up, first complete the [Apache Beam Python SDK 
Quickstart](/get-started/quickstart-py/).
+
+A *multi-language pipeline* is a pipeline that’s built in one Beam SDK 
language and uses one or more transforms from another Beam SDK language. These 
“other-language” transforms are called *cross-language transforms*. The idea is 
to make pipeline components easier to share across the Beam SDKs, and to grow 
the pool of available transforms for all the SDKs. In the examples below, the 
multi-language pipeline is built with the Beam Python SDK, and the 
cross-language transforms are built with the Beam Java SDK.
+
+## Create a cross-language transform
+
+Here's a simple Java transform that adds a prefix to an input string:
+
+```java
+public class JavaPrefix extends PTransform<PCollection<String>, 
PCollection<String>> {
+
+  final String prefix;
+
+  public JavaPrefix(String prefix) {
+    this.prefix = prefix;
+  }
+
+  class AddPrefixDoFn extends DoFn<String, String> {
+
+    @ProcessElement
+    public void process(@Element String input, OutputReceiver<String> o) {
+      o.output(prefix + input);
+    }
+  }
+
+  @Override
+  public PCollection<String> expand(PCollection<String> input) {
+    return input
+        .apply(
+            "AddPrefix",
+            ParDo.of(new AddPrefixDoFn()));
+  }
+}
+```
+
+To make this available as a cross-language transform, you have to add a config 
object and a builder.
+
+> **Note:** Starting with Beam 2.34.0, Python SDK users can use some Java 
transforms without writing additional Java code. To learn more, see [Creating 
cross-language Java 
transforms](/documentation/programming-guide/#1311-creating-cross-language-java-transforms).
+
+The config object is a simple Java object (POJO) that has fields required by 
the transform.
+
+```java
+public class JavaPrefixConfiguration {
+
+  String prefix;
+
+  public void setPrefix(String prefix) {
+    this.prefix = prefix;
+  }
+}
+```
+
+The builder class must implement 
[ExternalTransformBuilder](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/ExternalTransformBuilder.html)
 and override `buildExternal`, which uses the config object.
+
+```java
+public class JavaPrefixBuilder implements
+    ExternalTransformBuilder<JavaPrefixConfiguration, PCollection<String>, 
PCollection<String>> {
+
+    @Override
+    public PTransform<PCollection<String>, PCollection<String>> buildExternal(
+        JavaPrefixConfiguration configuration) {
+      return new JavaPrefix(configuration.prefix);
+    }
+}
+```
+
+You also need to add a registrar class to register your transform with the 
expansion service.
+
+```java
+@AutoService(ExternalTransformRegistrar.class)
+public class JavaPrefixRegistrar implements ExternalTransformRegistrar {
+
+  final String URN = "my.beam.transform.javaprefix";
+
+  @Override
+  public Map<String, ExternalTransformBuilder<?, ?, ?>> 
knownBuilderInstances() {
+    return ImmutableMap.of(URN,new JavaPrefixBuilder());
+  }
+}
+```
+
+The registrar must implement 
[ExternalTransformRegistrar](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.html),
 which has one method, `knownBuilderInstances`. This returns a map that maps a 
unique URN to an instance of your builder. You can use the 
[AutoService](https://github.com/google/auto/tree/master/service) annotation to 
register this class with the expansion service.
+
+## Choose an expansion service
+
+When building a job for a multi-language pipeline, Beam uses an [expansion 
service](/documentation/glossary/#expansion-service) to expand [composite 
transforms](/documentation/glossary/#composite-transform). You must have at 
least one expansion service per remote SDK.
+
+In most cases, you can use the default Java 
[ExpansionService](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/expansion/service/ExpansionService.html).
 The service takes a single parameter, which specifies the port of the 
expansion service. The address is then provided by the Python pipeline.
+
+When you start the expansion service, you need to add dependencies to the 
classpath. You can use more than one JAR, but it’s often easier to create a 
single shaded JAR. Both Python and Java dependencies will be staged for the 
runner by the Python SDK.
+
+## Create a Python pipeline
+
+Your Python pipeline can now use the 
[ExternalTransform](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform)
 API to configure your cross-language transform. Here’s an example:
+
+```py
+with beam.Pipeline(options=pipeline_options) as p:
+  input = p | 'Read' >> ReadFromText(input_path).with_output_types(str)
+
+  java_output = (
+      input
+      | 'JavaPrefix' >> beam.ExternalTransform(
+            'my.beam.transform.javaprefix',
+            ImplicitSchemaPayloadBuilder({'prefix': 'java:'}),
+            "localhost:12345"))
+
+  def python_prefix(record):
+    return 'python:%s' % record
+
+  output = java_output | 'PythonPrefix' >> beam.Map(python_prefix)
+  output | 'Write' >> WriteToText(output_path)
+```
+
+`ExternalTransform` takes three parameters:
+
+* The URN for the cross-language transform
+* The payload, either as a byte string or a 
[PayloadBuilder](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.PayloadBuilder)
+* An expansion service
+
+The URN is simply a unique Beam identifier for the transform, and the 
expansion service has already been discussed. The PayloadBuilder is a new 
concept, discussed next.
+
+> **NOTE**: To ensure that your URN doesn't run into confilcts with URNs from 
other transforms, follow the URN conventions described at [Selecting a URN for 
Cross-language 
Transforms](/documentation/programming-guide/#1314-selecting-a-urn-for-cross-language-transforms).
+
+## Provide a payload builder
+
+The Python pipeline example above provides an 
[ImplicitSchemaPayloadBuilder](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ImplicitSchemaPayloadBuilder)
 as the second argument to `ExternalTransform`. The 
`ImplicitSchemaPayloadBuilder` builds a payload that generates a schema from 
the provided values. In this case, the provided values are contained in the 
following key-value pair: `{'prefix': 'java:'}`. The `JavaPrefix` transform 
expects a `prefix` argument, and the payload builder passes in the string 
`java:`, which will be prepended to each input element.
+
+In general, payload builders help build the payload for the transform in the 
expansion request. If you provide a Beam schema to the payload builder, the 
builder uses it to perform a type/field mapping. Instead of the 
`ImplicitSchemaPayloadBuilder`, you could use a 
[NamedTupleBasedPayloadBuilder](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.NamedTupleBasedPayloadBuilder),
 which builds a payload based on a named tuple schema, or an 
[AnnotationBasedPayloadBuilder](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.AnnotationBasedPayloadBuilder),
 which builds a schema based on type annotations. For a complete list of 
available payload builders, see the [transforms.external API 
reference](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html).
+
+## Use standard element types
+
+At a multi-language boundary, you have to use element types that all the Beam 
SDKs understand. These are types represented by the [Beam standard 
coders](https://github.com/apache/beam/blob/42e1ae8f8d07fb9c6fde14bd688c5a4b763d9d6e/model/pipeline/src/main/proto/beam_runner_api.proto#L784):
+
+* `BYTES`
+* `STRING_UTF8`
+* `KV`
+* `BOOL`
+* `VARINT`
+* `DOUBLE`
+* `ITERABLE`
+* `TIMER`
+* `WINDOWED_VALUE`
+* `ROW`
+
+For arbitrary structured types (for example, an arbitrary Java object), use 
`ROW` (`PCollection<Row>`). You may have to develop a new Java composite 
transform that produces a `PCollection<Row>`.
+

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to