scwhittle commented on code in PR #36027:
URL: https://github.com/apache/beam/pull/36027#discussion_r2318091327


##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -541,11 +546,67 @@ def is_bounded(self):
     return False
 
 
-# TODO(BEAM-27443): Remove in favor of a proper WriteToPubSub transform.
+class _PubSubWriteDoFn(DoFn):
+  """DoFn for writing messages to Cloud Pub/Sub.
+  
+  This DoFn handles both streaming and batch modes by buffering messages
+  and publishing them in batches to optimize performance.
+  """
+  BUFFER_SIZE_ELEMENTS = 100
+  FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5
+
+  def __init__(self, transform):
+    self.project = transform.project
+    self.short_topic_name = transform.topic_name
+    self.id_label = transform.id_label
+    self.timestamp_attribute = transform.timestamp_attribute
+    self.with_attributes = transform.with_attributes
+
+    # TODO(https://github.com/apache/beam/issues/18939): Add support for
+    # id_label and timestamp_attribute.
+    if transform.id_label:
+      raise NotImplementedError('id_label is not supported for PubSub writes')
+    if transform.timestamp_attribute:
+      raise NotImplementedError(
+          'timestamp_attribute is not supported for PubSub writes')
+
+  def start_bundle(self):
+    self._buffer = []
+
+  def process(self, elem):
+    self._buffer.append(elem)
+    if len(self._buffer) >= self.BUFFER_SIZE_ELEMENTS:
+      self._flush()
+
+  def finish_bundle(self):
+    self._flush()
+
+  def _flush(self):
+    if not self._buffer:
+      return
+
+    from google.cloud import pubsub
+    import time
+
+    pub_client = pubsub.PublisherClient()

Review Comment:
   might be a benefit to having this be long-lived so it reuses channels etc 
instead of reconnecting. Can be created in Setup method.



##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -541,11 +546,67 @@ def is_bounded(self):
     return False
 
 
-# TODO(BEAM-27443): Remove in favor of a proper WriteToPubSub transform.
+class _PubSubWriteDoFn(DoFn):
+  """DoFn for writing messages to Cloud Pub/Sub.
+  
+  This DoFn handles both streaming and batch modes by buffering messages
+  and publishing them in batches to optimize performance.
+  """
+  BUFFER_SIZE_ELEMENTS = 100
+  FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5
+
+  def __init__(self, transform):
+    self.project = transform.project
+    self.short_topic_name = transform.topic_name
+    self.id_label = transform.id_label
+    self.timestamp_attribute = transform.timestamp_attribute
+    self.with_attributes = transform.with_attributes
+
+    # TODO(https://github.com/apache/beam/issues/18939): Add support for
+    # id_label and timestamp_attribute.
+    if transform.id_label:
+      raise NotImplementedError('id_label is not supported for PubSub writes')
+    if transform.timestamp_attribute:
+      raise NotImplementedError(
+          'timestamp_attribute is not supported for PubSub writes')
+
+  def start_bundle(self):
+    self._buffer = []
+
+  def process(self, elem):
+    self._buffer.append(elem)
+    if len(self._buffer) >= self.BUFFER_SIZE_ELEMENTS:
+      self._flush()
+
+  def finish_bundle(self):
+    self._flush()
+
+  def _flush(self):
+    if not self._buffer:
+      return
+
+    from google.cloud import pubsub
+    import time
+
+    pub_client = pubsub.PublisherClient()
+    topic = pub_client.topic_path(self.project, self.short_topic_name)
+
+    # The elements in buffer are already serialized bytes from the previous
+    # transforms
+    futures = [pub_client.publish(topic, elem) for elem in self._buffer]
+
+    timer_start = time.time()
+    for future in futures:
+      remaining = self.FLUSH_TIMEOUT_SECS - (time.time() - timer_start)
+      future.result(remaining)

Review Comment:
   why is there a flush timeout? Completing processing without waiting for all 
of the messages to be consumed by pubsub could lead to data loss



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to