This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch release-2.62.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.62.0 by this push:
new dfdfd0a577b Improve existing Python multi-lang SchemaTransform
examples (#33361)
dfdfd0a577b is described below
commit dfdfd0a577ba8f4ef094a001c9b03722fccb5d49
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Dec 19 18:10:13 2024 +0000
Improve existing Python multi-lang SchemaTransform examples (#33361)
* improve python multi-lang examples
* minor adjustments
---
.../multi-language/python/wordcount_external.py | 52 ++++++++--------
.../schematransforms/ExtractWordsProvider.java | 72 +++++++++++++++-------
.../schematransforms/JavaCountProvider.java | 52 ++++++++--------
.../schematransforms/WriteWordsProvider.java | 34 ++++++----
sdks/python/apache_beam/transforms/external.py | 3 +-
5 files changed, 126 insertions(+), 87 deletions(-)
diff --git a/examples/multi-language/python/wordcount_external.py
b/examples/multi-language/python/wordcount_external.py
index 580c0269d36..7298d81c1b4 100644
--- a/examples/multi-language/python/wordcount_external.py
+++ b/examples/multi-language/python/wordcount_external.py
@@ -18,8 +18,8 @@
import logging
import apache_beam as beam
-from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external_transform_provider import
ExternalTransformProvider
from apache_beam.typehints.row_type import RowTypeConstraint
"""A Python multi-language pipeline that counts words using multiple Java
SchemaTransforms.
@@ -60,39 +60,35 @@ $ python wordcount_external.py \
--expansion_service_port <PORT>
"""
-# Original Java transform is in ExtractWordsProvider.java
EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1"
-# Original Java transform is in JavaCountProvider.java
COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1"
-# Original Java transform is in WriteWordsProvider.java
WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1"
def run(input_path, output_path, expansion_service_port, pipeline_args):
pipeline_options = PipelineOptions(pipeline_args)
- # Discover and get external transforms from this expansion service
- provider = ExternalTransformProvider("localhost:" + expansion_service_port)
- # Get transforms with identifiers, then use them as you would a regular
- # native PTransform
- Extract = provider.get_urn(EXTRACT_IDENTIFIER)
- Count = provider.get_urn(COUNT_IDENTIFIER)
- Write = provider.get_urn(WRITE_IDENTIFIER)
-
with beam.Pipeline(options=pipeline_options) as p:
- lines = p | 'Read' >> ReadFromText(input_path)
-
- words = (lines
- | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
- | 'Extract Words' >> Extract())
- word_counts = words | 'Count Words' >> Count()
- formatted_words = (
- word_counts
- | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
- row.word, row.count))).with_output_types(
- RowTypeConstraint.from_fields([('line', str)])))
-
- formatted_words | 'Write' >> Write(file_path_prefix=output_path)
+ expansion_service = BeamJarExpansionService(
+ "examples:multi-language:shadowJar")
+ if expansion_service_port:
+ expansion_service = "localhost:" + expansion_service_port
+
+ provider = ExternalTransformProvider(expansion_service)
+ # Retrieve portable transforms
+ Extract = provider.get_urn(EXTRACT_IDENTIFIER)
+ Count = provider.get_urn(COUNT_IDENTIFIER)
+ Write = provider.get_urn(WRITE_IDENTIFIER)
+
+ _ = (p
+ | 'Read' >> beam.io.ReadFromText(input_path)
+ | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
+ | 'Extract Words' >> Extract(drop=["king", "palace"])
+ | 'Count Words' >> Count()
+ | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
+ row.word, row.count))).with_output_types(
+ RowTypeConstraint.from_fields([('line', str)]))
+ | 'Write' >> Write(file_path_prefix=output_path))
if __name__ == '__main__':
@@ -110,8 +106,10 @@ if __name__ == '__main__':
help='Output file')
parser.add_argument('--expansion_service_port',
dest='expansion_service_port',
- required=True,
- help='Expansion service port')
+ required=False,
+ help='Expansion service port. If left empty, the '
+ 'existing multi-language examples service will '
+ 'be used by default.')
known_args, pipeline_args = parser.parse_known_args()
run(known_args.input, known_args.output, known_args.expansion_service_port,
diff --git
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
index 724dbce276f..b7224ecec6b 100644
---
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
+++
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
@@ -21,9 +21,12 @@ import static
org.apache.beam.examples.multilanguage.schematransforms.ExtractWor
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -36,7 +39,6 @@ import org.apache.beam.sdk.values.Row;
/** Splits a line into separate words and returns each word. */
@AutoService(SchemaTransformProvider.class)
public class ExtractWordsProvider extends
TypedSchemaTransformProvider<Configuration> {
- public static final Schema OUTPUT_SCHEMA =
Schema.builder().addStringField("word").build();
@Override
public String identifier() {
@@ -45,32 +47,60 @@ public class ExtractWordsProvider extends
TypedSchemaTransformProvider<Configura
@Override
protected SchemaTransform from(Configuration configuration) {
- return new SchemaTransform() {
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- return PCollectionRowTuple.of(
- "output",
- input.get("input").apply(ParDo.of(new
ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA));
- }
- };
+ return new ExtractWordsTransform(configuration);
}
- static class ExtractWordsFn extends DoFn<Row, Row> {
- @ProcessElement
- public void processElement(@Element Row element, OutputReceiver<Row>
receiver) {
- // Split the line into words.
- String line = Preconditions.checkStateNotNull(element.getString("line"));
- String[] words = line.split("[^\\p{L}]+", -1);
+ static class ExtractWordsTransform extends SchemaTransform {
+ private static final Schema OUTPUT_SCHEMA =
Schema.builder().addStringField("word").build();
+ private final List<String> drop;
- for (String word : words) {
- if (!word.isEmpty()) {
- receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word",
word).build());
- }
- }
+ ExtractWordsTransform(Configuration configuration) {
+ this.drop = configuration.getDrop();
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ return PCollectionRowTuple.of(
+ "output",
+ input
+ .getSinglePCollection()
+ .apply(
+ ParDo.of(
+ new DoFn<Row, Row>() {
+ @ProcessElement
+ public void process(@Element Row element,
OutputReceiver<Row> receiver) {
+ // Split the line into words.
+ String line =
Preconditions.checkStateNotNull(element.getString("line"));
+ String[] words = line.split("[^\\p{L}]+", -1);
+ Arrays.stream(words)
+ .filter(w -> !drop.contains(w))
+ .forEach(
+ word ->
+ receiver.output(
+ Row.withSchema(OUTPUT_SCHEMA)
+ .withFieldValue("word", word)
+ .build()));
+ }
+ }))
+ .setRowSchema(OUTPUT_SCHEMA));
}
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
- protected abstract static class Configuration {}
+ public abstract static class Configuration {
+ public static Builder builder() {
+ return new AutoValue_ExtractWordsProvider_Configuration.Builder();
+ }
+
+ @SchemaFieldDescription("List of words to drop.")
+ public abstract List<String> getDrop();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setDrop(List<String> foo);
+
+ public abstract Configuration build();
+ }
+ }
}
diff --git
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
index cabea594ae1..90d02d92c3c 100644
---
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
+++
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
@@ -44,35 +44,37 @@ public class JavaCountProvider extends
TypedSchemaTransformProvider<Configuratio
@Override
protected SchemaTransform from(Configuration configuration) {
- return new SchemaTransform() {
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- Schema outputSchema =
-
Schema.builder().addStringField("word").addInt64Field("count").build();
+ return new JavaCountTransform();
+ }
+
+ static class JavaCountTransform extends SchemaTransform {
+ static final Schema OUTPUT_SCHEMA =
+ Schema.builder().addStringField("word").addInt64Field("count").build();
- PCollection<Row> wordCounts =
- input
- .get("input")
- .apply(Count.perElement())
- .apply(
- MapElements.into(TypeDescriptors.rows())
- .via(
- kv ->
- Row.withSchema(outputSchema)
- .withFieldValue(
- "word",
- Preconditions.checkStateNotNull(
- kv.getKey().getString("word")))
- .withFieldValue("count", kv.getValue())
- .build()))
- .setRowSchema(outputSchema);
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ PCollection<Row> wordCounts =
+ input
+ .get("input")
+ .apply(Count.perElement())
+ .apply(
+ MapElements.into(TypeDescriptors.rows())
+ .via(
+ kv ->
+ Row.withSchema(OUTPUT_SCHEMA)
+ .withFieldValue(
+ "word",
+ Preconditions.checkStateNotNull(
+ kv.getKey().getString("word")))
+ .withFieldValue("count", kv.getValue())
+ .build()))
+ .setRowSchema(OUTPUT_SCHEMA);
- return PCollectionRowTuple.of("output", wordCounts);
- }
- };
+ return PCollectionRowTuple.of("output", wordCounts);
+ }
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
- protected abstract static class Configuration {}
+ public abstract static class Configuration {}
}
diff --git
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
index 0b2017c5587..faf9590a7f1 100644
---
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
+++
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
@@ -42,24 +42,32 @@ public class WriteWordsProvider extends
TypedSchemaTransformProvider<Configurati
@Override
protected SchemaTransform from(Configuration configuration) {
- return new SchemaTransform() {
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- input
- .get("input")
- .apply(
- MapElements.into(TypeDescriptors.strings())
- .via(row ->
Preconditions.checkStateNotNull(row.getString("line"))))
- .apply(TextIO.write().to(configuration.getFilePathPrefix()));
+ return new WriteWordsTransform(configuration);
+ }
+
+ static class WriteWordsTransform extends SchemaTransform {
+ private final String filePathPrefix;
+
+ WriteWordsTransform(Configuration configuration) {
+ this.filePathPrefix = configuration.getFilePathPrefix();
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ input
+ .get("input")
+ .apply(
+ MapElements.into(TypeDescriptors.strings())
+ .via(row ->
Preconditions.checkStateNotNull(row.getString("line"))))
+ .apply(TextIO.write().to(filePathPrefix));
- return PCollectionRowTuple.empty(input.getPipeline());
- }
- };
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
- protected abstract static class Configuration {
+ public abstract static class Configuration {
public static Builder builder() {
return new AutoValue_WriteWordsProvider_Configuration.Builder();
}
diff --git a/sdks/python/apache_beam/transforms/external.py
b/sdks/python/apache_beam/transforms/external.py
index fb37a8fd974..9ca5886f4cc 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -239,7 +239,8 @@ class
ExplicitSchemaTransformPayloadBuilder(SchemaTransformPayloadBuilder):
extra = set(py_value.keys()) - set(row_type._fields)
if extra:
raise ValueError(
- f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
+ f"Transform '{self.identifier()}' was configured with unknown "
+ f"fields: {extra}. Valid fields: {set(row_type._fields)}")
return row_type(
*[
dict_to_row_recursive(