This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bef5238b76b Add a STRING format to PubSub reading that interpretes the 
payload as utf-8 encoded. (#34301)
bef5238b76b is described below

commit bef5238b76b6fe6c3c55be0926148f6b8dc5eb95
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Mar 14 19:37:55 2025 -0700

    Add a STRING format to PubSub reading that interpretes the payload as utf-8 
encoded. (#34301)
---
 sdks/python/apache_beam/yaml/yaml_io.py      |  7 +++++++
 sdks/python/apache_beam/yaml/yaml_io_test.py | 22 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/sdks/python/apache_beam/yaml/yaml_io.py 
b/sdks/python/apache_beam/yaml/yaml_io.py
index cc3f6aa3908..47c4541c924 100644
--- a/sdks/python/apache_beam/yaml/yaml_io.py
+++ b/sdks/python/apache_beam/yaml/yaml_io.py
@@ -224,6 +224,12 @@ def _create_parser(
     return (
         schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]),
         lambda payload: beam.Row(payload=payload))
+  if format == 'STRING':
+    if schema:
+      raise ValueError('STRING format does not take a schema')
+    return (
+        schema_pb2.Schema(fields=[schemas.schema_field('payload', str)]),
+        lambda payload: beam.Row(payload=payload.decode('utf-8')))
   elif format == 'JSON':
     _validate_schema()
     beam_schema = json_utils.json_schema_to_beam_schema(schema)
@@ -313,6 +319,7 @@ def read_from_pubsub(
 
         - RAW: Produces records with a single `payload` field whose contents
             are the raw bytes of the pubsub message.
+        - STRING: Like RAW, but the bytes are decoded as a UTF-8 string.
         - AVRO: Parses records with a given Avro schema.
         - JSON: Parses records with a given JSON schema.
 
diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py 
b/sdks/python/apache_beam/yaml/yaml_io_test.py
index 393e31de0e6..2a6a8f16b08 100644
--- a/sdks/python/apache_beam/yaml/yaml_io_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_io_test.py
@@ -101,6 +101,28 @@ class YamlPubSubTest(unittest.TestCase):
             result,
             equal_to([beam.Row(payload=b'msg1'), beam.Row(payload=b'msg2')]))
 
+  def test_simple_read_string(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      with mock.patch('apache_beam.io.ReadFromPubSub',
+                      FakeReadFromPubSub(
+                          topic='my_topic',
+                          messages=[PubsubMessage('äter'.encode('utf-8'),
+                                                  {'attr': 'value1'}),
+                                    PubsubMessage('köttbullar'.encode('utf-8'),
+                                                  {'attr': 'value2'})])):
+        result = p | YamlTransform(
+            '''
+            type: ReadFromPubSub
+            config:
+              topic: my_topic
+              format: STRING
+            ''')
+        assert_that(
+            result,
+            equal_to([beam.Row(payload='äter'),
+                      beam.Row(payload='köttbullar')]))
+
   def test_read_with_attribute(self):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
         pickle_library='cloudpickle')) as p:

Reply via email to