This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch release-2.62.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.62.0 by this push:
     new dfdfd0a577b Improve existing Python multi-lang SchemaTransform 
examples (#33361)
dfdfd0a577b is described below

commit dfdfd0a577ba8f4ef094a001c9b03722fccb5d49
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Dec 19 18:10:13 2024 +0000

    Improve existing Python multi-lang SchemaTransform examples (#33361)
    
    * improve python multi-lang examples
    
    * minor adjustments
---
 .../multi-language/python/wordcount_external.py    | 52 ++++++++--------
 .../schematransforms/ExtractWordsProvider.java     | 72 +++++++++++++++-------
 .../schematransforms/JavaCountProvider.java        | 52 ++++++++--------
 .../schematransforms/WriteWordsProvider.java       | 34 ++++++----
 sdks/python/apache_beam/transforms/external.py     |  3 +-
 5 files changed, 126 insertions(+), 87 deletions(-)

diff --git a/examples/multi-language/python/wordcount_external.py 
b/examples/multi-language/python/wordcount_external.py
index 580c0269d36..7298d81c1b4 100644
--- a/examples/multi-language/python/wordcount_external.py
+++ b/examples/multi-language/python/wordcount_external.py
@@ -18,8 +18,8 @@
 import logging
 
 import apache_beam as beam
-from apache_beam.io import ReadFromText
 from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.transforms.external import BeamJarExpansionService
 from apache_beam.transforms.external_transform_provider import 
ExternalTransformProvider
 from apache_beam.typehints.row_type import RowTypeConstraint
 """A Python multi-language pipeline that counts words using multiple Java 
SchemaTransforms.
@@ -60,39 +60,35 @@ $ python wordcount_external.py \
       --expansion_service_port <PORT>
 """
 
-# Original Java transform is in ExtractWordsProvider.java
 EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1"
-# Original Java transform is in JavaCountProvider.java
 COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1"
-# Original Java transform is in WriteWordsProvider.java
 WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1"
 
 
 def run(input_path, output_path, expansion_service_port, pipeline_args):
     pipeline_options = PipelineOptions(pipeline_args)
 
-    # Discover and get external transforms from this expansion service
-    provider = ExternalTransformProvider("localhost:" + expansion_service_port)
-    # Get transforms with identifiers, then use them as you would a regular
-    # native PTransform
-    Extract = provider.get_urn(EXTRACT_IDENTIFIER)
-    Count = provider.get_urn(COUNT_IDENTIFIER)
-    Write = provider.get_urn(WRITE_IDENTIFIER)
-
     with beam.Pipeline(options=pipeline_options) as p:
-        lines = p | 'Read' >> ReadFromText(input_path)
-
-        words = (lines
-                 | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
-                 | 'Extract Words' >> Extract())
-        word_counts = words | 'Count Words' >> Count()
-        formatted_words = (
-            word_counts
-            | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
-                row.word, row.count))).with_output_types(
-                    RowTypeConstraint.from_fields([('line', str)])))
-
-        formatted_words | 'Write' >> Write(file_path_prefix=output_path)
+        expansion_service = BeamJarExpansionService(
+            "examples:multi-language:shadowJar")
+        if expansion_service_port:
+            expansion_service = "localhost:" + expansion_service_port
+
+        provider = ExternalTransformProvider(expansion_service)
+        # Retrieve portable transforms
+        Extract = provider.get_urn(EXTRACT_IDENTIFIER)
+        Count = provider.get_urn(COUNT_IDENTIFIER)
+        Write = provider.get_urn(WRITE_IDENTIFIER)
+
+        _ = (p
+             | 'Read' >> beam.io.ReadFromText(input_path)
+             | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
+             | 'Extract Words' >> Extract(drop=["king", "palace"])
+             | 'Count Words' >> Count()
+             | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
+                 row.word, row.count))).with_output_types(
+                     RowTypeConstraint.from_fields([('line', str)]))
+             | 'Write' >> Write(file_path_prefix=output_path))
 
 
 if __name__ == '__main__':
@@ -110,8 +106,10 @@ if __name__ == '__main__':
                         help='Output file')
     parser.add_argument('--expansion_service_port',
                         dest='expansion_service_port',
-                        required=True,
-                        help='Expansion service port')
+                        required=False,
+                        help='Expansion service port. If left empty, the '
+                        'existing multi-language examples service will '
+                        'be used by default.')
     known_args, pipeline_args = parser.parse_known_args()
 
     run(known_args.input, known_args.output, known_args.expansion_service_port,
diff --git 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
index 724dbce276f..b7224ecec6b 100644
--- 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
+++ 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
@@ -21,9 +21,12 @@ import static 
org.apache.beam.examples.multilanguage.schematransforms.ExtractWor
 
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -36,7 +39,6 @@ import org.apache.beam.sdk.values.Row;
 /** Splits a line into separate words and returns each word. */
 @AutoService(SchemaTransformProvider.class)
 public class ExtractWordsProvider extends 
TypedSchemaTransformProvider<Configuration> {
-  public static final Schema OUTPUT_SCHEMA = 
Schema.builder().addStringField("word").build();
 
   @Override
   public String identifier() {
@@ -45,32 +47,60 @@ public class ExtractWordsProvider extends 
TypedSchemaTransformProvider<Configura
 
   @Override
   protected SchemaTransform from(Configuration configuration) {
-    return new SchemaTransform() {
-      @Override
-      public PCollectionRowTuple expand(PCollectionRowTuple input) {
-        return PCollectionRowTuple.of(
-            "output",
-            input.get("input").apply(ParDo.of(new 
ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA));
-      }
-    };
+    return new ExtractWordsTransform(configuration);
   }
 
-  static class ExtractWordsFn extends DoFn<Row, Row> {
-    @ProcessElement
-    public void processElement(@Element Row element, OutputReceiver<Row> 
receiver) {
-      // Split the line into words.
-      String line = Preconditions.checkStateNotNull(element.getString("line"));
-      String[] words = line.split("[^\\p{L}]+", -1);
+  static class ExtractWordsTransform extends SchemaTransform {
+    private static final Schema OUTPUT_SCHEMA = 
Schema.builder().addStringField("word").build();
+    private final List<String> drop;
 
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word", 
word).build());
-        }
-      }
+    ExtractWordsTransform(Configuration configuration) {
+      this.drop = configuration.getDrop();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      return PCollectionRowTuple.of(
+          "output",
+          input
+              .getSinglePCollection()
+              .apply(
+                  ParDo.of(
+                      new DoFn<Row, Row>() {
+                        @ProcessElement
+                        public void process(@Element Row element, 
OutputReceiver<Row> receiver) {
+                          // Split the line into words.
+                          String line = 
Preconditions.checkStateNotNull(element.getString("line"));
+                          String[] words = line.split("[^\\p{L}]+", -1);
+                          Arrays.stream(words)
+                              .filter(w -> !drop.contains(w))
+                              .forEach(
+                                  word ->
+                                      receiver.output(
+                                          Row.withSchema(OUTPUT_SCHEMA)
+                                              .withFieldValue("word", word)
+                                              .build()));
+                        }
+                      }))
+              .setRowSchema(OUTPUT_SCHEMA));
     }
   }
 
   @DefaultSchema(AutoValueSchema.class)
   @AutoValue
-  protected abstract static class Configuration {}
+  public abstract static class Configuration {
+    public static Builder builder() {
+      return new AutoValue_ExtractWordsProvider_Configuration.Builder();
+    }
+
+    @SchemaFieldDescription("List of words to drop.")
+    public abstract List<String> getDrop();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setDrop(List<String> foo);
+
+      public abstract Configuration build();
+    }
+  }
 }
diff --git 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
index cabea594ae1..90d02d92c3c 100644
--- 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
+++ 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
@@ -44,35 +44,37 @@ public class JavaCountProvider extends 
TypedSchemaTransformProvider<Configuratio
 
   @Override
   protected SchemaTransform from(Configuration configuration) {
-    return new SchemaTransform() {
-      @Override
-      public PCollectionRowTuple expand(PCollectionRowTuple input) {
-        Schema outputSchema =
-            
Schema.builder().addStringField("word").addInt64Field("count").build();
+    return new JavaCountTransform();
+  }
+
+  static class JavaCountTransform extends SchemaTransform {
+    static final Schema OUTPUT_SCHEMA =
+        Schema.builder().addStringField("word").addInt64Field("count").build();
 
-        PCollection<Row> wordCounts =
-            input
-                .get("input")
-                .apply(Count.perElement())
-                .apply(
-                    MapElements.into(TypeDescriptors.rows())
-                        .via(
-                            kv ->
-                                Row.withSchema(outputSchema)
-                                    .withFieldValue(
-                                        "word",
-                                        Preconditions.checkStateNotNull(
-                                            kv.getKey().getString("word")))
-                                    .withFieldValue("count", kv.getValue())
-                                    .build()))
-                .setRowSchema(outputSchema);
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      PCollection<Row> wordCounts =
+          input
+              .get("input")
+              .apply(Count.perElement())
+              .apply(
+                  MapElements.into(TypeDescriptors.rows())
+                      .via(
+                          kv ->
+                              Row.withSchema(OUTPUT_SCHEMA)
+                                  .withFieldValue(
+                                      "word",
+                                      Preconditions.checkStateNotNull(
+                                          kv.getKey().getString("word")))
+                                  .withFieldValue("count", kv.getValue())
+                                  .build()))
+              .setRowSchema(OUTPUT_SCHEMA);
 
-        return PCollectionRowTuple.of("output", wordCounts);
-      }
-    };
+      return PCollectionRowTuple.of("output", wordCounts);
+    }
   }
 
   @DefaultSchema(AutoValueSchema.class)
   @AutoValue
-  protected abstract static class Configuration {}
+  public abstract static class Configuration {}
 }
diff --git 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
index 0b2017c5587..faf9590a7f1 100644
--- 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
+++ 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
@@ -42,24 +42,32 @@ public class WriteWordsProvider extends 
TypedSchemaTransformProvider<Configurati
 
   @Override
   protected SchemaTransform from(Configuration configuration) {
-    return new SchemaTransform() {
-      @Override
-      public PCollectionRowTuple expand(PCollectionRowTuple input) {
-        input
-            .get("input")
-            .apply(
-                MapElements.into(TypeDescriptors.strings())
-                    .via(row -> 
Preconditions.checkStateNotNull(row.getString("line"))))
-            .apply(TextIO.write().to(configuration.getFilePathPrefix()));
+    return new WriteWordsTransform(configuration);
+  }
+
+  static class WriteWordsTransform extends SchemaTransform {
+    private final String filePathPrefix;
+
+    WriteWordsTransform(Configuration configuration) {
+      this.filePathPrefix = configuration.getFilePathPrefix();
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      input
+          .get("input")
+          .apply(
+              MapElements.into(TypeDescriptors.strings())
+                  .via(row -> 
Preconditions.checkStateNotNull(row.getString("line"))))
+          .apply(TextIO.write().to(filePathPrefix));
 
-        return PCollectionRowTuple.empty(input.getPipeline());
-      }
-    };
+      return PCollectionRowTuple.empty(input.getPipeline());
+    }
   }
 
   @DefaultSchema(AutoValueSchema.class)
   @AutoValue
-  protected abstract static class Configuration {
+  public abstract static class Configuration {
     public static Builder builder() {
       return new AutoValue_WriteWordsProvider_Configuration.Builder();
     }
diff --git a/sdks/python/apache_beam/transforms/external.py 
b/sdks/python/apache_beam/transforms/external.py
index fb37a8fd974..9ca5886f4cc 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -239,7 +239,8 @@ class 
ExplicitSchemaTransformPayloadBuilder(SchemaTransformPayloadBuilder):
         extra = set(py_value.keys()) - set(row_type._fields)
         if extra:
           raise ValueError(
-              f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
+              f"Transform '{self.identifier()}' was configured with unknown "
+              f"fields: {extra}. Valid fields: {set(row_type._fields)}")
         return row_type(
             *[
                 dict_to_row_recursive(

Reply via email to