This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch release-2.62.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.62.0 by this push:
new 9d37811e5e5 Revert [Improve existing Python multi-lang SchemaTransform
examples (#33361)]
9d37811e5e5 is described below
commit 9d37811e5e50cbf46611e2668ea55e8bb327f8b1
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Dec 19 13:55:51 2024 -0500
Revert [Improve existing Python multi-lang SchemaTransform examples
(#33361)]
---
.../multi-language/python/wordcount_external.py | 52 ++++++++--------
.../schematransforms/ExtractWordsProvider.java | 72 +++++++---------------
.../schematransforms/JavaCountProvider.java | 52 ++++++++--------
.../schematransforms/WriteWordsProvider.java | 34 ++++------
sdks/python/apache_beam/transforms/external.py | 3 +-
5 files changed, 87 insertions(+), 126 deletions(-)
diff --git a/examples/multi-language/python/wordcount_external.py
b/examples/multi-language/python/wordcount_external.py
index 7298d81c1b4..580c0269d36 100644
--- a/examples/multi-language/python/wordcount_external.py
+++ b/examples/multi-language/python/wordcount_external.py
@@ -18,8 +18,8 @@
import logging
import apache_beam as beam
+from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external_transform_provider import
ExternalTransformProvider
from apache_beam.typehints.row_type import RowTypeConstraint
"""A Python multi-language pipeline that counts words using multiple Java
SchemaTransforms.
@@ -60,35 +60,39 @@ $ python wordcount_external.py \
--expansion_service_port <PORT>
"""
+# Original Java transform is in ExtractWordsProvider.java
EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1"
+# Original Java transform is in JavaCountProvider.java
COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1"
+# Original Java transform is in WriteWordsProvider.java
WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1"
def run(input_path, output_path, expansion_service_port, pipeline_args):
pipeline_options = PipelineOptions(pipeline_args)
+ # Discover and get external transforms from this expansion service
+ provider = ExternalTransformProvider("localhost:" + expansion_service_port)
+ # Get transforms with identifiers, then use them as you would a regular
+ # native PTransform
+ Extract = provider.get_urn(EXTRACT_IDENTIFIER)
+ Count = provider.get_urn(COUNT_IDENTIFIER)
+ Write = provider.get_urn(WRITE_IDENTIFIER)
+
with beam.Pipeline(options=pipeline_options) as p:
- expansion_service = BeamJarExpansionService(
- "examples:multi-language:shadowJar")
- if expansion_service_port:
- expansion_service = "localhost:" + expansion_service_port
-
- provider = ExternalTransformProvider(expansion_service)
- # Retrieve portable transforms
- Extract = provider.get_urn(EXTRACT_IDENTIFIER)
- Count = provider.get_urn(COUNT_IDENTIFIER)
- Write = provider.get_urn(WRITE_IDENTIFIER)
-
- _ = (p
- | 'Read' >> beam.io.ReadFromText(input_path)
- | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
- | 'Extract Words' >> Extract(drop=["king", "palace"])
- | 'Count Words' >> Count()
- | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
- row.word, row.count))).with_output_types(
- RowTypeConstraint.from_fields([('line', str)]))
- | 'Write' >> Write(file_path_prefix=output_path))
+ lines = p | 'Read' >> ReadFromText(input_path)
+
+ words = (lines
+ | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
+ | 'Extract Words' >> Extract())
+ word_counts = words | 'Count Words' >> Count()
+ formatted_words = (
+ word_counts
+ | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
+ row.word, row.count))).with_output_types(
+ RowTypeConstraint.from_fields([('line', str)])))
+
+ formatted_words | 'Write' >> Write(file_path_prefix=output_path)
if __name__ == '__main__':
@@ -106,10 +110,8 @@ if __name__ == '__main__':
help='Output file')
parser.add_argument('--expansion_service_port',
dest='expansion_service_port',
- required=False,
- help='Expansion service port. If left empty, the '
- 'existing multi-language examples service will '
- 'be used by default.')
+ required=True,
+ help='Expansion service port')
known_args, pipeline_args = parser.parse_known_args()
run(known_args.input, known_args.output, known_args.expansion_service_port,
diff --git
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
index b7224ecec6b..724dbce276f 100644
---
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
+++
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java
@@ -21,12 +21,9 @@ import static
org.apache.beam.examples.multilanguage.schematransforms.ExtractWor
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
-import java.util.Arrays;
-import java.util.List;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -39,6 +36,7 @@ import org.apache.beam.sdk.values.Row;
/** Splits a line into separate words and returns each word. */
@AutoService(SchemaTransformProvider.class)
public class ExtractWordsProvider extends
TypedSchemaTransformProvider<Configuration> {
+ public static final Schema OUTPUT_SCHEMA =
Schema.builder().addStringField("word").build();
@Override
public String identifier() {
@@ -47,60 +45,32 @@ public class ExtractWordsProvider extends
TypedSchemaTransformProvider<Configura
@Override
protected SchemaTransform from(Configuration configuration) {
- return new ExtractWordsTransform(configuration);
+ return new SchemaTransform() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ return PCollectionRowTuple.of(
+ "output",
+ input.get("input").apply(ParDo.of(new
ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA));
+ }
+ };
}
- static class ExtractWordsTransform extends SchemaTransform {
- private static final Schema OUTPUT_SCHEMA =
Schema.builder().addStringField("word").build();
- private final List<String> drop;
+ static class ExtractWordsFn extends DoFn<Row, Row> {
+ @ProcessElement
+ public void processElement(@Element Row element, OutputReceiver<Row>
receiver) {
+ // Split the line into words.
+ String line = Preconditions.checkStateNotNull(element.getString("line"));
+ String[] words = line.split("[^\\p{L}]+", -1);
- ExtractWordsTransform(Configuration configuration) {
- this.drop = configuration.getDrop();
- }
-
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- return PCollectionRowTuple.of(
- "output",
- input
- .getSinglePCollection()
- .apply(
- ParDo.of(
- new DoFn<Row, Row>() {
- @ProcessElement
- public void process(@Element Row element,
OutputReceiver<Row> receiver) {
- // Split the line into words.
- String line =
Preconditions.checkStateNotNull(element.getString("line"));
- String[] words = line.split("[^\\p{L}]+", -1);
- Arrays.stream(words)
- .filter(w -> !drop.contains(w))
- .forEach(
- word ->
- receiver.output(
- Row.withSchema(OUTPUT_SCHEMA)
- .withFieldValue("word", word)
- .build()));
- }
- }))
- .setRowSchema(OUTPUT_SCHEMA));
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word",
word).build());
+ }
+ }
}
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
- public abstract static class Configuration {
- public static Builder builder() {
- return new AutoValue_ExtractWordsProvider_Configuration.Builder();
- }
-
- @SchemaFieldDescription("List of words to drop.")
- public abstract List<String> getDrop();
-
- @AutoValue.Builder
- public abstract static class Builder {
- public abstract Builder setDrop(List<String> foo);
-
- public abstract Configuration build();
- }
- }
+ protected abstract static class Configuration {}
}
diff --git
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
index 90d02d92c3c..cabea594ae1 100644
---
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
+++
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java
@@ -44,37 +44,35 @@ public class JavaCountProvider extends
TypedSchemaTransformProvider<Configuratio
@Override
protected SchemaTransform from(Configuration configuration) {
- return new JavaCountTransform();
- }
-
- static class JavaCountTransform extends SchemaTransform {
- static final Schema OUTPUT_SCHEMA =
- Schema.builder().addStringField("word").addInt64Field("count").build();
+ return new SchemaTransform() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ Schema outputSchema =
+
Schema.builder().addStringField("word").addInt64Field("count").build();
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- PCollection<Row> wordCounts =
- input
- .get("input")
- .apply(Count.perElement())
- .apply(
- MapElements.into(TypeDescriptors.rows())
- .via(
- kv ->
- Row.withSchema(OUTPUT_SCHEMA)
- .withFieldValue(
- "word",
- Preconditions.checkStateNotNull(
- kv.getKey().getString("word")))
- .withFieldValue("count", kv.getValue())
- .build()))
- .setRowSchema(OUTPUT_SCHEMA);
+ PCollection<Row> wordCounts =
+ input
+ .get("input")
+ .apply(Count.perElement())
+ .apply(
+ MapElements.into(TypeDescriptors.rows())
+ .via(
+ kv ->
+ Row.withSchema(outputSchema)
+ .withFieldValue(
+ "word",
+ Preconditions.checkStateNotNull(
+ kv.getKey().getString("word")))
+ .withFieldValue("count", kv.getValue())
+ .build()))
+ .setRowSchema(outputSchema);
- return PCollectionRowTuple.of("output", wordCounts);
- }
+ return PCollectionRowTuple.of("output", wordCounts);
+ }
+ };
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
- public abstract static class Configuration {}
+ protected abstract static class Configuration {}
}
diff --git
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
index faf9590a7f1..0b2017c5587 100644
---
a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
+++
b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java
@@ -42,32 +42,24 @@ public class WriteWordsProvider extends
TypedSchemaTransformProvider<Configurati
@Override
protected SchemaTransform from(Configuration configuration) {
- return new WriteWordsTransform(configuration);
- }
-
- static class WriteWordsTransform extends SchemaTransform {
- private final String filePathPrefix;
-
- WriteWordsTransform(Configuration configuration) {
- this.filePathPrefix = configuration.getFilePathPrefix();
- }
-
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- input
- .get("input")
- .apply(
- MapElements.into(TypeDescriptors.strings())
- .via(row ->
Preconditions.checkStateNotNull(row.getString("line"))))
- .apply(TextIO.write().to(filePathPrefix));
+ return new SchemaTransform() {
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ input
+ .get("input")
+ .apply(
+ MapElements.into(TypeDescriptors.strings())
+ .via(row ->
Preconditions.checkStateNotNull(row.getString("line"))))
+ .apply(TextIO.write().to(configuration.getFilePathPrefix()));
- return PCollectionRowTuple.empty(input.getPipeline());
- }
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
+ };
}
@DefaultSchema(AutoValueSchema.class)
@AutoValue
- public abstract static class Configuration {
+ protected abstract static class Configuration {
public static Builder builder() {
return new AutoValue_WriteWordsProvider_Configuration.Builder();
}
diff --git a/sdks/python/apache_beam/transforms/external.py
b/sdks/python/apache_beam/transforms/external.py
index 9ca5886f4cc..fb37a8fd974 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -239,8 +239,7 @@ class
ExplicitSchemaTransformPayloadBuilder(SchemaTransformPayloadBuilder):
extra = set(py_value.keys()) - set(row_type._fields)
if extra:
raise ValueError(
- f"Transform '{self.identifier()}' was configured with unknown "
- f"fields: {extra}. Valid fields: {set(row_type._fields)}")
+ f"Unknown fields: {extra}. Valid fields: {row_type._fields}")
return row_type(
*[
dict_to_row_recursive(