liferoad commented on code in PR #36027:
URL: https://github.com/apache/beam/pull/36027#discussion_r2319087592


##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -541,11 +546,67 @@ def is_bounded(self):
     return False
 
 
-# TODO(BEAM-27443): Remove in favor of a proper WriteToPubSub transform.
+class _PubSubWriteDoFn(DoFn):
+  """DoFn for writing messages to Cloud Pub/Sub.
+  
+  This DoFn handles both streaming and batch modes by buffering messages
+  and publishing them in batches to optimize performance.
+  """
+  BUFFER_SIZE_ELEMENTS = 100
+  FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5
+
+  def __init__(self, transform):
+    self.project = transform.project
+    self.short_topic_name = transform.topic_name
+    self.id_label = transform.id_label
+    self.timestamp_attribute = transform.timestamp_attribute
+    self.with_attributes = transform.with_attributes
+
+    # TODO(https://github.com/apache/beam/issues/18939): Add support for
+    # id_label and timestamp_attribute.
+    if transform.id_label:
+      raise NotImplementedError('id_label is not supported for PubSub writes')
+    if transform.timestamp_attribute:
+      raise NotImplementedError(
+          'timestamp_attribute is not supported for PubSub writes')
+
+  def start_bundle(self):
+    self._buffer = []
+
+  def process(self, elem):
+    self._buffer.append(elem)
+    if len(self._buffer) >= self.BUFFER_SIZE_ELEMENTS:
+      self._flush()
+
+  def finish_bundle(self):
+    self._flush()
+
+  def _flush(self):
+    if not self._buffer:
+      return
+
+    from google.cloud import pubsub
+    import time
+
+    pub_client = pubsub.PublisherClient()
+    topic = pub_client.topic_path(self.project, self.short_topic_name)
+
+    # The elements in buffer are already serialized bytes from the previous
+    # transforms
+    futures = [pub_client.publish(topic, elem) for elem in self._buffer]
+
+    timer_start = time.time()
+    for future in futures:
+      remaining = self.FLUSH_TIMEOUT_SECS - (time.time() - timer_start)
+      future.result(remaining)

Review Comment:
   I added the timeout exception, which should trigger Dataflow to retry for 
batch jobs. The idea is to avoid any stuckness when publishing the messages. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to