This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cad999e305 fix(pubsub): handle pubsub message attributes correctly in 
write  (#36140)
3cad999e305 is described below

commit 3cad999e3050863b0d90f5ce82ef65b5a20412f8
Author: liferoad <[email protected]>
AuthorDate: Thu Sep 18 10:10:58 2025 -0400

    fix(pubsub): handle pubsub message attributes correctly in write  (#36140)
    
    * fix(pubsub): handle pubsub message attributes correctly in write operation
    
    Modify the PubSub write operation to properly deserialize protobuf messages 
and handle attributes when publishing. This ensures messages with attributes 
are published correctly rather than being treated as raw bytes.
    
    * fix(pubsub): replace NotImplementedError with warnings for unsupported 
features
    
    Change raising NotImplementedError to logging warnings when id_label or 
timestamp_attribute are used in PubSub writes, as these features are not yet 
supported. Update tests to verify warning messages instead of exception 
handling.
    
    * check pipelines when raising errors
    
    * lint
    
    * lint
    
    * fix tests
    
    * fix(pubsub): improve runner detection and error messaging
    
    Add more robust runner detection logic to handle DirectRunner variants and 
test runners. Include detailed debug logging and error messages to help 
troubleshoot unsupported PubSub write scenarios.
    
    * test(pubsub): increase test timeout durations for reliability
    
    Increase TEST_PIPELINE_DURATION_MS from 8 to 10 minutes and 
MESSAGE_MATCHER_TIMEOUT_S from 5 to 10 minutes to account for potential delays 
in test environment
    
    * fix lint
    
    * fix(pubsub): handle None runner case and improve debug logging
    
    Move debug logging outside error condition and log at debug level instead 
of warning
    
    * use output_labels_supported
---
 .github/trigger_files/beam_PostCommit_Python.json  |  2 +-
 sdks/python/apache_beam/io/gcp/pubsub.py           | 88 +++++++++++++++++++---
 .../apache_beam/io/gcp/pubsub_integration_test.py  |  4 +-
 3 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
index 8675e953506..1fa29a890c2 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "modification": 28
+  "modification": 29
 }
 
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index 281827db034..59eadee5538 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -414,6 +414,7 @@ class WriteToPubSub(PTransform):
     self.project, self.topic_name = parse_topic(topic)
     self.full_topic = topic
     self._sink = _PubSubSink(topic, id_label, timestamp_attribute)
+    self.pipeline_options = None  # Will be set during expand()
 
   @staticmethod
   def message_to_proto_str(element: PubsubMessage) -> bytes:
@@ -429,6 +430,9 @@ class WriteToPubSub(PTransform):
     return msg._to_proto_str(for_publish=True)
 
   def expand(self, pcoll):
+    # Store pipeline options for use in DoFn
+    self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None
+
     if self.with_attributes:
       pcoll = pcoll | 'ToProtobufX' >> ParDo(
           _AddMetricsAndMap(
@@ -564,11 +568,65 @@ class _PubSubWriteDoFn(DoFn):
 
     # TODO(https://github.com/apache/beam/issues/18939): Add support for
     # id_label and timestamp_attribute.
-    if transform.id_label:
-      raise NotImplementedError('id_label is not supported for PubSub writes')
-    if transform.timestamp_attribute:
-      raise NotImplementedError(
-          'timestamp_attribute is not supported for PubSub writes')
+    # Only raise errors for DirectRunner or batch pipelines
+    pipeline_options = transform.pipeline_options
+    output_labels_supported = True
+
+    if pipeline_options:
+      from apache_beam.options.pipeline_options import StandardOptions
+
+      # Check if using DirectRunner
+      try:
+        # Get runner from pipeline options
+        all_options = pipeline_options.get_all_options()
+        runner_name = all_options.get('runner', StandardOptions.DEFAULT_RUNNER)
+
+        # Check if it's a DirectRunner variant
+        if (runner_name is None or
+            (runner_name in StandardOptions.LOCAL_RUNNERS or 'DirectRunner'
+             in str(runner_name) or 'TestDirectRunner' in str(runner_name))):
+          output_labels_supported = False
+      except Exception:
+        # If we can't determine runner, assume DirectRunner for safety
+        output_labels_supported = False
+
+      # Check if in batch mode (not streaming)
+      standard_options = pipeline_options.view_as(StandardOptions)
+      if not standard_options.streaming:
+        output_labels_supported = False
+    else:
+      # If no pipeline options available, fall back to original behavior
+      output_labels_supported = False
+
+    # Log debug information for troubleshooting
+    import logging
+    runner_info = getattr(
+        pipeline_options, 'runner',
+        'None') if pipeline_options else 'No options'
+    streaming_info = 'Unknown'
+    if pipeline_options:
+      try:
+        standard_options = pipeline_options.view_as(StandardOptions)
+        streaming_info = 'streaming=%s' % standard_options.streaming
+      except Exception:
+        streaming_info = 'streaming=unknown'
+
+    logging.debug(
+        'PubSub unsupported feature check: runner=%s, %s',
+        runner_info,
+        streaming_info)
+
+    if not output_labels_supported:
+
+      if transform.id_label:
+        raise NotImplementedError(
+            f'id_label is not supported for PubSub writes with DirectRunner '
+            f'or in batch mode (runner={runner_info}, {streaming_info})')
+      if transform.timestamp_attribute:
+        raise NotImplementedError(
+            f'timestamp_attribute is not supported for PubSub writes with '
+            f'DirectRunner or in batch mode '
+            f'(runner={runner_info}, {streaming_info})')
 
   def setup(self):
     from google.cloud import pubsub
@@ -593,11 +651,21 @@ class _PubSubWriteDoFn(DoFn):
 
     import time
 
-    # The elements in buffer are already serialized bytes from the previous
-    # transforms
-    futures = [
-        self._pub_client.publish(self._topic, elem) for elem in self._buffer
-    ]
+    # The elements in buffer are serialized protobuf bytes from the previous
+    # transforms. We need to deserialize them to extract data and attributes.
+    futures = []
+    for elem in self._buffer:
+      # Deserialize the protobuf to get the original PubsubMessage
+      pubsub_msg = PubsubMessage._from_proto_str(elem)
+
+      # Publish with the correct data and attributes
+      if self.with_attributes and pubsub_msg.attributes:
+        future = self._pub_client.publish(
+            self._topic, pubsub_msg.data, **pubsub_msg.attributes)
+      else:
+        future = self._pub_client.publish(self._topic, pubsub_msg.data)
+
+      futures.append(future)
 
     timer_start = time.time()
     for future in futures:
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index c88f4af2016..8387fe734fc 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -44,10 +44,10 @@ OUTPUT_SUB = 'psit_subscription_output'
 
 # How long TestXXXRunner will wait for pubsub_it_pipeline to run before
 # cancelling it.
-TEST_PIPELINE_DURATION_MS = 8 * 60 * 1000
+TEST_PIPELINE_DURATION_MS = 10 * 60 * 1000
 # How long PubSubMessageMatcher will wait for the correct set of messages to
 # appear.
-MESSAGE_MATCHER_TIMEOUT_S = 5 * 60
+MESSAGE_MATCHER_TIMEOUT_S = 10 * 60
 
 
 class PubSubIntegrationTest(unittest.TestCase):

Reply via email to