scwhittle commented on code in PR #36027: URL: https://github.com/apache/beam/pull/36027#discussion_r2318091327
########## sdks/python/apache_beam/io/gcp/pubsub.py: ########## @@ -541,11 +546,67 @@ def is_bounded(self): return False -# TODO(BEAM-27443): Remove in favor of a proper WriteToPubSub transform. +class _PubSubWriteDoFn(DoFn): + """DoFn for writing messages to Cloud Pub/Sub. + + This DoFn handles both streaming and batch modes by buffering messages + and publishing them in batches to optimize performance. + """ + BUFFER_SIZE_ELEMENTS = 100 + FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5 + + def __init__(self, transform): + self.project = transform.project + self.short_topic_name = transform.topic_name + self.id_label = transform.id_label + self.timestamp_attribute = transform.timestamp_attribute + self.with_attributes = transform.with_attributes + + # TODO(https://github.com/apache/beam/issues/18939): Add support for + # id_label and timestamp_attribute. + if transform.id_label: + raise NotImplementedError('id_label is not supported for PubSub writes') + if transform.timestamp_attribute: + raise NotImplementedError( + 'timestamp_attribute is not supported for PubSub writes') + + def start_bundle(self): + self._buffer = [] + + def process(self, elem): + self._buffer.append(elem) + if len(self._buffer) >= self.BUFFER_SIZE_ELEMENTS: + self._flush() + + def finish_bundle(self): + self._flush() + + def _flush(self): + if not self._buffer: + return + + from google.cloud import pubsub + import time + + pub_client = pubsub.PublisherClient() Review Comment: might be a benefit to having this be long-lived so it reuses channels etc instead of reconnecting. Can be created in Setup method. ########## sdks/python/apache_beam/io/gcp/pubsub.py: ########## @@ -541,11 +546,67 @@ def is_bounded(self): return False -# TODO(BEAM-27443): Remove in favor of a proper WriteToPubSub transform. +class _PubSubWriteDoFn(DoFn): + """DoFn for writing messages to Cloud Pub/Sub. + + This DoFn handles both streaming and batch modes by buffering messages + and publishing them in batches to optimize performance. + """ + BUFFER_SIZE_ELEMENTS = 100 + FLUSH_TIMEOUT_SECS = BUFFER_SIZE_ELEMENTS * 0.5 + + def __init__(self, transform): + self.project = transform.project + self.short_topic_name = transform.topic_name + self.id_label = transform.id_label + self.timestamp_attribute = transform.timestamp_attribute + self.with_attributes = transform.with_attributes + + # TODO(https://github.com/apache/beam/issues/18939): Add support for + # id_label and timestamp_attribute. + if transform.id_label: + raise NotImplementedError('id_label is not supported for PubSub writes') + if transform.timestamp_attribute: + raise NotImplementedError( + 'timestamp_attribute is not supported for PubSub writes') + + def start_bundle(self): + self._buffer = [] + + def process(self, elem): + self._buffer.append(elem) + if len(self._buffer) >= self.BUFFER_SIZE_ELEMENTS: + self._flush() + + def finish_bundle(self): + self._flush() + + def _flush(self): + if not self._buffer: + return + + from google.cloud import pubsub + import time + + pub_client = pubsub.PublisherClient() + topic = pub_client.topic_path(self.project, self.short_topic_name) + + # The elements in buffer are already serialized bytes from the previous + # transforms + futures = [pub_client.publish(topic, elem) for elem in self._buffer] + + timer_start = time.time() + for future in futures: + remaining = self.FLUSH_TIMEOUT_SECS - (time.time() - timer_start) + future.result(remaining) Review Comment: why is there a flush timeout? Completing processing without waiting for all of the messages to be consumed by pubsub could lead to data loss -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org