This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch release-2.62.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.62.0 by this push:
     new 9d37811e5e5 Revert [Improve existing Python multi-lang SchemaTransform 
examples (#33361)]
9d37811e5e5 is described below

commit 9d37811e5e50cbf46611e2668ea55e8bb327f8b1
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Dec 19 13:55:51 2024 -0500

    Revert [Improve existing Python multi-lang SchemaTransform examples 
(#33361)]
---
 .../multi-language/python/wordcount_external.py    | 52 ++++++++--------
 .../schematransforms/ExtractWordsProvider.java     | 72 +++++++---------------
 .../schematransforms/JavaCountProvider.java        | 52 ++++++++--------
 .../schematransforms/WriteWordsProvider.java       | 34 ++++------
 sdks/python/apache_beam/transforms/external.py     |  3 +-
 5 files changed, 87 insertions(+), 126 deletions(-)

diff --git a/examples/multi-language/python/wordcount_external.py 
b/examples/multi-language/python/wordcount_external.py
index 7298d81c1b4..580c0269d36 100644
--- a/examples/multi-language/python/wordcount_external.py
+++ b/examples/multi-language/python/wordcount_external.py
@@ -18,8 +18,8 @@
 import logging
 
 import apache_beam as beam
+from apache_beam.io import ReadFromText
 from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.transforms.external import BeamJarExpansionService
 from apache_beam.transforms.external_transform_provider import 
ExternalTransformProvider
 from apache_beam.typehints.row_type import RowTypeConstraint
 """A Python multi-language pipeline that counts words using multiple Java 
SchemaTransforms.
@@ -60,35 +60,39 @@ $ python wordcount_external.py \
       --expansion_service_port <PORT>
 """
 
+# Original Java transform is in ExtractWordsProvider.java
 EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1"
+# Original Java transform is in JavaCountProvider.java
 COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1"
+# Original Java transform is in WriteWordsProvider.java
 WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1"
 
 
 def run(input_path, output_path, expansion_service_port, pipeline_args):
     pipeline_options = PipelineOptions(pipeline_args)
 
+    # Discover and get external transforms from this expansion service
+    provider = ExternalTransformProvider("localhost:" + expansion_service_port)
+    # Get transforms with identifiers, then use them as you would a regular
+    # native PTransform
+    Extract = provider.get_urn(EXTRACT_IDENTIFIER)
+    Count = provider.get_urn(COUNT_IDENTIFIER)
+    Write = provider.get_urn(WRITE_IDENTIFIER)
+
     with beam.Pipeline(options=pipeline_options) as p:
-        expansion_service = BeamJarExpansionService(
-            "examples:multi-language:shadowJar")
-        if expansion_service_port:
-            expansion_service = "localhost:" + expansion_service_port
-
-        provider = ExternalTransformProvider(expansion_service)
-        # Retrieve portable transforms
-        Extract = provider.get_urn(EXTRACT_IDENTIFIER)
-        Count = provider.get_urn(COUNT_IDENTIFIER)
-        Write = provider.get_urn(WRITE_IDENTIFIER)
-
-        _ = (p
-             | 'Read' >> beam.io.ReadFromText(input_path)
-             | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
-             | 'Extract Words' >> Extract(drop=["king", "palace"])
-             | 'Count Words' >> Count()
-             | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
-                 row.word, row.count))).with_output_types(
-                     RowTypeConstraint.from_fields([('line', str)]))
-             | 'Write' >> Write(file_path_prefix=output_path))
+        lines = p | 'Read' >> ReadFromText(input_path)
+
+        words = (lines
+                 | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
+                 | 'Extract Words' >> Extract())
+        word_counts = words | 'Count Words' >> Count()
+        formatted_words = (
+            word_counts
+            | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
+                row.word, row.count))).with_output_types(
+                    RowTypeConstraint.from_fields([('line', str)])))
+
+        formatted_words | 'Write' >> Write(file_path_prefix=output_path)
 
 
 if __name__ == '__main__':
@@ -106,10 +110,8 @@ if __name__ == '__main__':
                         help='Output file')
     parser.add_argument('--expansion_service_port',
                         dest='expansion_service_port',
-                        required=False,
-                        help='Expansion service port. If left empty, the '
-                        'existing multi-language examples service will '
-                        'be used by default.')
+                        required=True,
+                        help='Expansion service port')
     known_args, pipeline_args = parser.parse_known_args()
 
     run(known_args.input, known_args.output, known_args.expansion_service_port,
diff --git 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
index b7224ecec6b..724dbce276f 100644
--- 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
+++ 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
@@ -21,12 +21,9 @@ import static 
org.apache.beam.examples.multilanguage.schematransforms.ExtractWor
 
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
-import java.util.Arrays;
-import java.util.List;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -39,6 +36,7 @@ import org.apache.beam.sdk.values.Row;
 /** Splits a line into separate words and returns each word. */
 @AutoService(SchemaTransformProvider.class)
 public class ExtractWordsProvider extends 
TypedSchemaTransformProvider<Configuration> {
+  public static final Schema OUTPUT_SCHEMA = 
Schema.builder().addStringField("word").build();
 
   @Override
   public String identifier() {
@@ -47,60 +45,32 @@ public class ExtractWordsProvider extends 
TypedSchemaTransformProvider<Configura
 
   @Override
   protected SchemaTransform from(Configuration configuration) {
-    return new ExtractWordsTransform(configuration);
+    return new SchemaTransform() {
+      @Override
+      public PCollectionRowTuple expand(PCollectionRowTuple input) {
+        return PCollectionRowTuple.of(
+            "output",
+            input.get("input").apply(ParDo.of(new 
ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA));
+      }
+    };
   }
 
-  static class ExtractWordsTransform extends SchemaTransform {
-    private static final Schema OUTPUT_SCHEMA = 
Schema.builder().addStringField("word").build();
-    private final List<String> drop;
+  static class ExtractWordsFn extends DoFn<Row, Row> {
+    @ProcessElement
+    public void processElement(@Element Row element, OutputReceiver<Row> 
receiver) {
+      // Split the line into words.
+      String line = Preconditions.checkStateNotNull(element.getString("line"));
+      String[] words = line.split("[^\\p{L}]+", -1);
 
-    ExtractWordsTransform(Configuration configuration) {
-      this.drop = configuration.getDrop();
-    }
-
-    @Override
-    public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      return PCollectionRowTuple.of(
-          "output",
-          input
-              .getSinglePCollection()
-              .apply(
-                  ParDo.of(
-                      new DoFn<Row, Row>() {
-                        @ProcessElement
-                        public void process(@Element Row element, 
OutputReceiver<Row> receiver) {
-                          // Split the line into words.
-                          String line = 
Preconditions.checkStateNotNull(element.getString("line"));
-                          String[] words = line.split("[^\\p{L}]+", -1);
-                          Arrays.stream(words)
-                              .filter(w -> !drop.contains(w))
-                              .forEach(
-                                  word ->
-                                      receiver.output(
-                                          Row.withSchema(OUTPUT_SCHEMA)
-                                              .withFieldValue("word", word)
-                                              .build()));
-                        }
-                      }))
-              .setRowSchema(OUTPUT_SCHEMA));
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word", 
word).build());
+        }
+      }
     }
   }
 
   @DefaultSchema(AutoValueSchema.class)
   @AutoValue
-  public abstract static class Configuration {
-    public static Builder builder() {
-      return new AutoValue_ExtractWordsProvider_Configuration.Builder();
-    }
-
-    @SchemaFieldDescription("List of words to drop.")
-    public abstract List<String> getDrop();
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-      public abstract Builder setDrop(List<String> foo);
-
-      public abstract Configuration build();
-    }
-  }
+  protected abstract static class Configuration {}
 }
diff --git 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
index 90d02d92c3c..cabea594ae1 100644
--- 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
+++ 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
@@ -44,37 +44,35 @@ public class JavaCountProvider extends 
TypedSchemaTransformProvider<Configuratio
 
   @Override
   protected SchemaTransform from(Configuration configuration) {
-    return new JavaCountTransform();
-  }
-
-  static class JavaCountTransform extends SchemaTransform {
-    static final Schema OUTPUT_SCHEMA =
-        Schema.builder().addStringField("word").addInt64Field("count").build();
+    return new SchemaTransform() {
+      @Override
+      public PCollectionRowTuple expand(PCollectionRowTuple input) {
+        Schema outputSchema =
+            
Schema.builder().addStringField("word").addInt64Field("count").build();
 
-    @Override
-    public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      PCollection<Row> wordCounts =
-          input
-              .get("input")
-              .apply(Count.perElement())
-              .apply(
-                  MapElements.into(TypeDescriptors.rows())
-                      .via(
-                          kv ->
-                              Row.withSchema(OUTPUT_SCHEMA)
-                                  .withFieldValue(
-                                      "word",
-                                      Preconditions.checkStateNotNull(
-                                          kv.getKey().getString("word")))
-                                  .withFieldValue("count", kv.getValue())
-                                  .build()))
-              .setRowSchema(OUTPUT_SCHEMA);
+        PCollection<Row> wordCounts =
+            input
+                .get("input")
+                .apply(Count.perElement())
+                .apply(
+                    MapElements.into(TypeDescriptors.rows())
+                        .via(
+                            kv ->
+                                Row.withSchema(outputSchema)
+                                    .withFieldValue(
+                                        "word",
+                                        Preconditions.checkStateNotNull(
+                                            kv.getKey().getString("word")))
+                                    .withFieldValue("count", kv.getValue())
+                                    .build()))
+                .setRowSchema(outputSchema);
 
-      return PCollectionRowTuple.of("output", wordCounts);
-    }
+        return PCollectionRowTuple.of("output", wordCounts);
+      }
+    };
   }
 
   @DefaultSchema(AutoValueSchema.class)
   @AutoValue
-  public abstract static class Configuration {}
+  protected abstract static class Configuration {}
 }
diff --git 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
index faf9590a7f1..0b2017c5587 100644
--- 
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
+++ 
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
@@ -42,32 +42,24 @@ public class WriteWordsProvider extends 
TypedSchemaTransformProvider<Configurati
 
   @Override
   protected SchemaTransform from(Configuration configuration) {
-    return new WriteWordsTransform(configuration);
-  }
-
-  static class WriteWordsTransform extends SchemaTransform {
-    private final String filePathPrefix;
-
-    WriteWordsTransform(Configuration configuration) {
-      this.filePathPrefix = configuration.getFilePathPrefix();
-    }
-
-    @Override
-    public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      input
-          .get("input")
-          .apply(
-              MapElements.into(TypeDescriptors.strings())
-                  .via(row -> 
Preconditions.checkStateNotNull(row.getString("line"))))
-          .apply(TextIO.write().to(filePathPrefix));
+    return new SchemaTransform() {
+      @Override
+      public PCollectionRowTuple expand(PCollectionRowTuple input) {
+        input
+            .get("input")
+            .apply(
+                MapElements.into(TypeDescriptors.strings())
+                    .via(row -> 
Preconditions.checkStateNotNull(row.getString("line"))))
+            .apply(TextIO.write().to(configuration.getFilePathPrefix()));
 
-      return PCollectionRowTuple.empty(input.getPipeline());
-    }
+        return PCollectionRowTuple.empty(input.getPipeline());
+      }
+    };
   }
 
   @DefaultSchema(AutoValueSchema.class)
   @AutoValue
-  public abstract static class Configuration {
+  protected abstract static class Configuration {
     public static Builder builder() {
       return new AutoValue_WriteWordsProvider_Configuration.Builder();
     }
diff --git a/sdks/python/apache_beam/transforms/external.py 
b/sdks/python/apache_beam/transforms/external.py
index 9ca5886f4cc..fb37a8fd974 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -239,8 +239,7 @@ class 
ExplicitSchemaTransformPayloadBuilder(SchemaTransformPayloadBuilder):
         extra = set(py_value.keys()) - set(row_type._fields)
         if extra:
           raise ValueError(
-              f"Transform '{self.identifier()}' was configured with unknown "
-              f"fields: {extra}. Valid fields: {set(row_type._fields)}")
+              f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
         return row_type(
             *[
                 dict_to_row_recursive(

Reply via email to