This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bef5238b76b Add a STRING format to PubSub reading that interpretes the
payload as utf-8 encoded. (#34301)
bef5238b76b is described below
commit bef5238b76b6fe6c3c55be0926148f6b8dc5eb95
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Mar 14 19:37:55 2025 -0700
Add a STRING format to PubSub reading that interpretes the payload as utf-8
encoded. (#34301)
---
sdks/python/apache_beam/yaml/yaml_io.py | 7 +++++++
sdks/python/apache_beam/yaml/yaml_io_test.py | 22 ++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git a/sdks/python/apache_beam/yaml/yaml_io.py
b/sdks/python/apache_beam/yaml/yaml_io.py
index cc3f6aa3908..47c4541c924 100644
--- a/sdks/python/apache_beam/yaml/yaml_io.py
+++ b/sdks/python/apache_beam/yaml/yaml_io.py
@@ -224,6 +224,12 @@ def _create_parser(
return (
schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
lambda payload: beam.Row(payload=payload))
+ if format == 'STRING':
+ if schema:
+ raise ValueError('STRING format does not take a schema')
+ return (
+ schema_pb2.Schema(fields=[schemas.schema_field('payload', str)]),
+ lambda payload: beam.Row(payload=payload.decode('utf-8')))
elif format == 'JSON':
_validate_schema()
beam_schema = json_utils.json_schema_to_beam_schema(schema)
@@ -313,6 +319,7 @@ def read_from_pubsub(
- RAW: Produces records with a single `payload` field whose contents
are the raw bytes of the pubsub message.
+ - STRING: Like RAW, but the bytes are decoded as a UTF-8 string.
- AVRO: Parses records with a given Avro schema.
- JSON: Parses records with a given JSON schema.
diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py
b/sdks/python/apache_beam/yaml/yaml_io_test.py
index 393e31de0e6..2a6a8f16b08 100644
--- a/sdks/python/apache_beam/yaml/yaml_io_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_io_test.py
@@ -101,6 +101,28 @@ class YamlPubSubTest(unittest.TestCase):
result,
equal_to([beam.Row(payload=b'msg1'), beam.Row(payload=b'msg2')]))
+ def test_simple_read_string(self):
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pickle_library='cloudpickle')) as p:
+ with mock.patch('apache_beam.io.ReadFromPubSub',
+ FakeReadFromPubSub(
+ topic='my_topic',
+ messages=[PubsubMessage('äter'.encode('utf-8'),
+ {'attr': 'value1'}),
+ PubsubMessage('köttbullar'.encode('utf-8'),
+ {'attr': 'value2'})])):
+ result = p | YamlTransform(
+ '''
+ type: ReadFromPubSub
+ config:
+ topic: my_topic
+ format: STRING
+ ''')
+ assert_that(
+ result,
+ equal_to([beam.Row(payload='äter'),
+ beam.Row(payload='köttbullar')]))
+
def test_read_with_attribute(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p: