This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ac5594ce8f add ExternalTransformProvider example (#30666)
2ac5594ce8f is described below

commit 2ac5594ce8f5e7e927df04c7a4b6d9f2b11625c4
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Aug 29 15:47:20 2024 -0400

    add ExternalTransformProvider example (#30666)
    
    * add ExternalTransformProvider example
    
    * cleanup old command
    
    * clarify docs
    
    * update
    
    * indicate JavaExternalTransform is outdated
---
 examples/multi-language/README.md                  |   3 +
 .../multi-language/python/wordcount_external.py    | 118 +++++++++++++++++++++
 .../schematransforms/ExtractWordsProvider.java     |  76 +++++++++++++
 .../schematransforms/JavaCountProvider.java        |  78 ++++++++++++++
 .../schematransforms/WriteWordsProvider.java       |  77 ++++++++++++++
 sdks/standard_expansion_services.yaml              |   4 +
 6 files changed, 356 insertions(+)

diff --git a/examples/multi-language/README.md 
b/examples/multi-language/README.md
index 4912eb14da3..479a56deab7 100644
--- a/examples/multi-language/README.md
+++ b/examples/multi-language/README.md
@@ -24,6 +24,9 @@ This project provides examples of Apache Beam
 
 ## Using Java transforms from Python
 
+* **python/wordcount_external** - A Python pipeline that runs the Word Count 
workflow using three external Java
+                SchemaTransforms. This example demonstrates the updated 
`ExternalTransformProvider` API.
+    #### _Outdated examples:_
 * **python/addprefix** - A Python pipeline that reads a text file and attaches 
a prefix on the Java side to each input.
 * **python/javacount** - A Python pipeline that counts words using the Java 
`Count.perElement()` transform.
 * **python/javadatagenerator** - A Python pipeline that produces a set of 
strings generated from Java.
diff --git a/examples/multi-language/python/wordcount_external.py 
b/examples/multi-language/python/wordcount_external.py
new file mode 100644
index 00000000000..580c0269d36
--- /dev/null
+++ b/examples/multi-language/python/wordcount_external.py
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+
+import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.transforms.external_transform_provider import 
ExternalTransformProvider
+from apache_beam.typehints.row_type import RowTypeConstraint
+"""A Python multi-language pipeline that counts words using multiple Java 
SchemaTransforms.
+
+This pipeline reads an input text file then extracts the words, counts them, 
and writes the results Java 
+SchemaTransforms. The transforms are listed below and can be found in 
+src/main/java/org/apache/beam/examples/schematransforms/:
+- `ExtractWordsProvider`
+- `JavaCountProvider`
+- `WriteWordsProvider`
+
+These Java transforms are accessible to the Python pipeline via an expansion 
service. Check out the
+[`README.md`](https://github.com/apache/beam/blob/master/examples/multi-language/README.md#1-start-the-expansion-service)
+for instructions on how to download the jar and run this expansion service.
+
+This example aims to demonstrate how to use the `ExternalTransformProvider` 
utility, which dynamically generates and
+provides user-friendly wrappers for external transforms. 
+
+Example commands for executing this program:
+
+DirectRunner:
+$ python wordcount_external.py \
+      --runner DirectRunner \
+      --input <INPUT FILE> \
+      --output <OUTPUT FILE> \
+      --expansion_service_port <PORT>
+
+DataflowRunner:
+$ python wordcount_external.py \
+      --runner DataflowRunner \
+      --temp_location $TEMP_LOCATION \
+      --project $GCP_PROJECT \
+      --region $GCP_REGION \
+      --job_name $JOB_NAME \
+      --num_workers $NUM_WORKERS \
+      --input "gs://dataflow-samples/shakespeare/kinglear.txt" \
+      --output "gs://$GCS_BUCKET/wordcount_external/output" \
+      --expansion_service_port <PORT>
+"""
+
+# Original Java transform is in ExtractWordsProvider.java
+EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1"
+# Original Java transform is in JavaCountProvider.java
+COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1"
+# Original Java transform is in WriteWordsProvider.java
+WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1"
+
+
+def run(input_path, output_path, expansion_service_port, pipeline_args):
+    pipeline_options = PipelineOptions(pipeline_args)
+
+    # Discover and get external transforms from this expansion service
+    provider = ExternalTransformProvider("localhost:" + expansion_service_port)
+    # Get transforms with identifiers, then use them as you would a regular
+    # native PTransform
+    Extract = provider.get_urn(EXTRACT_IDENTIFIER)
+    Count = provider.get_urn(COUNT_IDENTIFIER)
+    Write = provider.get_urn(WRITE_IDENTIFIER)
+
+    with beam.Pipeline(options=pipeline_options) as p:
+        lines = p | 'Read' >> ReadFromText(input_path)
+
+        words = (lines
+                 | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
+                 | 'Extract Words' >> Extract())
+        word_counts = words | 'Count Words' >> Count()
+        formatted_words = (
+            word_counts
+            | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
+                row.word, row.count))).with_output_types(
+                    RowTypeConstraint.from_fields([('line', str)])))
+
+        formatted_words | 'Write' >> Write(file_path_prefix=output_path)
+
+
+if __name__ == '__main__':
+    logging.getLogger().setLevel(logging.INFO)
+    import argparse
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--input',
+                        dest='input',
+                        required=True,
+                        help='Input file')
+    parser.add_argument('--output',
+                        dest='output',
+                        required=True,
+                        help='Output file')
+    parser.add_argument('--expansion_service_port',
+                        dest='expansion_service_port',
+                        required=True,
+                        help='Expansion service port')
+    known_args, pipeline_args = parser.parse_known_args()
+
+    run(known_args.input, known_args.output, known_args.expansion_service_port,
+        pipeline_args)
diff --git 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
new file mode 100644
index 00000000000..724dbce276f
--- /dev/null
+++ 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilanguage.schematransforms;
+
+import static 
org.apache.beam.examples.multilanguage.schematransforms.ExtractWordsProvider.Configuration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+
+/** Splits a line into separate words and returns each word. */
+@AutoService(SchemaTransformProvider.class)
+public class ExtractWordsProvider extends 
TypedSchemaTransformProvider<Configuration> {
+  public static final Schema OUTPUT_SCHEMA = 
Schema.builder().addStringField("word").build();
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:extract_words:v1";
+  }
+
+  @Override
+  protected SchemaTransform from(Configuration configuration) {
+    return new SchemaTransform() {
+      @Override
+      public PCollectionRowTuple expand(PCollectionRowTuple input) {
+        return PCollectionRowTuple.of(
+            "output",
+            input.get("input").apply(ParDo.of(new 
ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA));
+      }
+    };
+  }
+
+  static class ExtractWordsFn extends DoFn<Row, Row> {
+    @ProcessElement
+    public void processElement(@Element Row element, OutputReceiver<Row> 
receiver) {
+      // Split the line into words.
+      String line = Preconditions.checkStateNotNull(element.getString("line"));
+      String[] words = line.split("[^\\p{L}]+", -1);
+
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word", 
word).build());
+        }
+      }
+    }
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  protected abstract static class Configuration {}
+}
diff --git 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
new file mode 100644
index 00000000000..cabea594ae1
--- /dev/null
+++ 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilanguage.schematransforms;
+
+import static 
org.apache.beam.examples.multilanguage.schematransforms.JavaCountProvider.Configuration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@AutoService(SchemaTransformProvider.class)
+public class JavaCountProvider extends 
TypedSchemaTransformProvider<Configuration> {
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:count:v1";
+  }
+
+  @Override
+  protected SchemaTransform from(Configuration configuration) {
+    return new SchemaTransform() {
+      @Override
+      public PCollectionRowTuple expand(PCollectionRowTuple input) {
+        Schema outputSchema =
+            
Schema.builder().addStringField("word").addInt64Field("count").build();
+
+        PCollection<Row> wordCounts =
+            input
+                .get("input")
+                .apply(Count.perElement())
+                .apply(
+                    MapElements.into(TypeDescriptors.rows())
+                        .via(
+                            kv ->
+                                Row.withSchema(outputSchema)
+                                    .withFieldValue(
+                                        "word",
+                                        Preconditions.checkStateNotNull(
+                                            kv.getKey().getString("word")))
+                                    .withFieldValue("count", kv.getValue())
+                                    .build()))
+                .setRowSchema(outputSchema);
+
+        return PCollectionRowTuple.of("output", wordCounts);
+      }
+    };
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  protected abstract static class Configuration {}
+}
diff --git 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
new file mode 100644
index 00000000000..0b2017c5587
--- /dev/null
+++ 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.multilanguage.schematransforms;
+
+import static 
org.apache.beam.examples.multilanguage.schematransforms.WriteWordsProvider.Configuration;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@AutoService(SchemaTransformProvider.class)
+public class WriteWordsProvider extends 
TypedSchemaTransformProvider<Configuration> {
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:write_words:v1";
+  }
+
+  @Override
+  protected SchemaTransform from(Configuration configuration) {
+    return new SchemaTransform() {
+      @Override
+      public PCollectionRowTuple expand(PCollectionRowTuple input) {
+        input
+            .get("input")
+            .apply(
+                MapElements.into(TypeDescriptors.strings())
+                    .via(row -> 
Preconditions.checkStateNotNull(row.getString("line"))))
+            .apply(TextIO.write().to(configuration.getFilePathPrefix()));
+
+        return PCollectionRowTuple.empty(input.getPipeline());
+      }
+    };
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  protected abstract static class Configuration {
+    public static Builder builder() {
+      return new AutoValue_WriteWordsProvider_Configuration.Builder();
+    }
+
+    @SchemaFieldDescription("Writes to output files with this prefix.")
+    public abstract String getFilePathPrefix();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setFilePathPrefix(String filePathPrefix);
+
+      public abstract Configuration build();
+    }
+  }
+}
diff --git a/sdks/standard_expansion_services.yaml 
b/sdks/standard_expansion_services.yaml
index 31a1a6343ae..ad965c8a1ee 100644
--- a/sdks/standard_expansion_services.yaml
+++ b/sdks/standard_expansion_services.yaml
@@ -26,10 +26,14 @@
 #
 # Transform identifiers listed in the `skip_transforms` field will be skipped.
 #
+#
 # Any new gradle targets added here should also be added to:
 # - sdks/python/build.gradle (as a dependency in the 
'generateExternalTransformsConfig' task)
 # - sdks/python/test-suites/xlang/build.gradle (look for 
'servicesToGenerateFrom')
 #
+# After making changes here, please run `./gradlew 
generateExternalTransformsConfig`
+# to regenerate the config file at sdks/standard_external_transforms.yaml
+#
 # Refer to sdks/python/gen_xlang_wrappers.py for more info.
 
 - gradle_target: 'sdks:java:io:expansion-service:shadowJar'

Reply via email to