gemini-code-assist[bot] commented on code in PR #39079:
URL: https://github.com/apache/beam/pull/39079#discussion_r3464409151


##########
sdks/python/apache_beam/runners/direct/transform_evaluator.py:
##########
@@ -577,13 +578,45 @@ def finish_bundle(self):
         self, self.bundles, unprocessed_bundles, None, {None: self.watermark})
 
 
+class _PubSubSubscriberClient(object):
+  """SubscriberClient state cached for one DirectRunner Pub/Sub read."""
+  def __init__(self, client):
+    self.client = client
+    self._temporary_subscription = None
+    self._closed = False
+
+  def set_temporary_subscription(self, subscription):
+    self._temporary_subscription = subscription
+
+  def close(self):
+    if self._closed:
+      return
+    self._closed = True
+
+    try:
+      if self._temporary_subscription:
+        self.client.delete_subscription(
+            subscription=self._temporary_subscription)
+    except Exception:
+      _LOGGER.warning(
+          'Failed to delete temporary Pub/Sub subscription %s',
+          self._temporary_subscription,
+          exc_info=True)
+
+    try:
+      self.client.close()
+    except Exception:
+      _LOGGER.warning(
+          'Failed to close Pub/Sub subscriber client', exc_info=True)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   During interpreter shutdown, module-level globals like `_LOGGER` can be 
cleared and set to `None`. Since `close()` is called from a finalizer during 
garbage collection (which can happen during shutdown), check if `_LOGGER` is 
not `None` before calling `_LOGGER.warning` to avoid raising an 
`AttributeError`.
   
   ```suggestion
       try:
         if self._temporary_subscription:
           self.client.delete_subscription(
               subscription=self._temporary_subscription)
       except Exception:
         if _LOGGER is not None:
           _LOGGER.warning(
               'Failed to delete temporary Pub/Sub subscription %s',
               self._temporary_subscription,
               exc_info=True)
   
       try:
         self.client.close()
       except Exception:
         if _LOGGER is not None:
           _LOGGER.warning(
               'Failed to close Pub/Sub subscriber client', exc_info=True)
   ```



##########
sdks/python/apache_beam/runners/direct/transform_evaluator.py:
##########
@@ -577,13 +578,45 @@ def finish_bundle(self):
         self, self.bundles, unprocessed_bundles, None, {None: self.watermark})
 
 
+class _PubSubSubscriberClient(object):
+  """SubscriberClient state cached for one DirectRunner Pub/Sub read."""
+  def __init__(self, client):
+    self.client = client
+    self._temporary_subscription = None
+    self._closed = False

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Add a per-client lock `self._lock` to serialize subscription creation for 
each transform without holding the global class-level lock during network calls.
   
   ```suggestion
     def __init__(self, client):
       self.client = client
       self._temporary_subscription = None
       self._closed = False
       self._lock = threading.Lock()
   ```



##########
sdks/python/apache_beam/runners/direct/transform_evaluator.py:
##########
@@ -627,18 +660,35 @@ def get_subscription(
     if short_sub_name:
       return pubsub.SubscriberClient.subscription_path(project, short_sub_name)
 
-    if transform in cls._subscription_cache:
+    with cls._subscriber_client_cache_lock:
+      if transform in cls._subscription_cache:
+        return cls._subscription_cache[transform]
+
+      subscriber_client = cls._get_subscriber_client_state_unlocked(transform)
+      sub_client = subscriber_client.client
+      sub_name = sub_client.subscription_path(
+          sub_project,
+          'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
+      topic_name = sub_client.topic_path(project, short_topic_name)
+      sub_client.create_subscription(name=sub_name, topic=topic_name)
+      subscriber_client.set_temporary_subscription(sub_name)
+      cls._subscription_cache[transform] = sub_name
       return cls._subscription_cache[transform]

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Holding the global class-level lock `_subscriber_client_cache_lock` during 
the synchronous network call `sub_client.create_subscription(...)` can block 
all other threads trying to read from or initialize other Pub/Sub sources.
   
   We can optimize this by releasing the global lock before making the network 
call, and using a per-transform lock (`subscriber_client._lock`) to serialize 
subscription creation for the same transform.
   
   ```suggestion
       with cls._subscriber_client_cache_lock:
         subscriber_client = 
cls._get_subscriber_client_state_unlocked(transform)
   
       with subscriber_client._lock:
         with cls._subscriber_client_cache_lock:
           if transform in cls._subscription_cache:
             return cls._subscription_cache[transform]
   
         sub_client = subscriber_client.client
         sub_name = sub_client.subscription_path(
             sub_project,
             'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
         topic_name = sub_client.topic_path(project, short_topic_name)
         sub_client.create_subscription(name=sub_name, topic=topic_name)
         subscriber_client.set_temporary_subscription(sub_name)
         with cls._subscriber_client_cache_lock:
           cls._subscription_cache[transform] = sub_name
           return sub_name
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to