This is an automated email from the ASF dual-hosted git repository.
ccy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1a4a4f0 Don't map transforms to pubsub subscriptions unless neccessary
new 0226799 Merge pull request #9146 from
ostrokach/bugfix/pubsub_reader_global_scope
1a4a4f0 is described below
commit 1a4a4f0543273608e0469ca648945d772783a338
Author: Alexey Strokach <[email protected]>
AuthorDate: Tue Jul 23 16:55:03 2019 -0700
Don't map transforms to pubsub subscriptions unless neccessary
---
.../runners/direct/transform_evaluator.py | 62 ++++++++--------------
1 file changed, 21 insertions(+), 41 deletions(-)
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 29e3a48..8fa9b2d 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -19,6 +19,7 @@
from __future__ import absolute_import
+import atexit
import collections
import logging
import random
@@ -376,45 +377,11 @@ class _TestStreamEvaluator(_TransformEvaluator):
self, self.bundles, unprocessed_bundles, None, {None: hold})
-class _PubSubSubscriptionWrapper(object):
- """Wrapper for managing temporary PubSub subscriptions."""
-
- def __init__(self, project, short_topic_name, short_sub_name):
- """Initialize subscription wrapper.
-
- If sub_name is None, will create a temporary subscription to topic_name.
-
- Args:
- project: GCP project name for topic and subscription. May be None.
- Required if sub_name is None.
- short_topic_name: Valid topic name without
- 'projects/{project}/topics/' prefix. May be None.
- Required if sub_name is None.
- short_sub_name: Valid subscription name without
- 'projects/{project}/subscriptions/' prefix. May be None.
- """
- from google.cloud import pubsub
- self.sub_client = pubsub.SubscriberClient()
-
- if short_sub_name is None:
- self.sub_name = self.sub_client.subscription_path(
- project, 'beam_%d_%x' % (int(time.time()), random.randrange(1 <<
32)))
- topic_name = self.sub_client.topic_path(project, short_topic_name)
- self.sub_client.create_subscription(self.sub_name, topic_name)
- self._should_cleanup = True
- else:
- self.sub_name = self.sub_client.subscription_path(project,
short_sub_name)
- self._should_cleanup = False
-
- def __del__(self):
- if self._should_cleanup:
- self.sub_client.delete_subscription(self.sub_name)
-
-
class _PubSubReadEvaluator(_TransformEvaluator):
"""TransformEvaluator for PubSub read."""
# A mapping of transform to _PubSubSubscriptionWrapper.
+ # TODO(BEAM-7750): Prevents garbage collection of pipeline instances.
_subscription_cache = {}
def __init__(self, evaluation_context, applied_ptransform,
@@ -428,16 +395,29 @@ class _PubSubReadEvaluator(_TransformEvaluator):
if self.source.id_label:
raise NotImplementedError(
'DirectRunner: id_label is not supported for PubSub reads')
- self._sub_name = _PubSubReadEvaluator.get_subscription(
+ self._sub_name = self.get_subscription(
self._applied_ptransform, self.source.project, self.source.topic_name,
self.source.subscription_name)
@classmethod
- def get_subscription(cls, transform, project, topic, short_sub_name):
- if transform not in cls._subscription_cache:
- wrapper = _PubSubSubscriptionWrapper(project, topic, short_sub_name)
- cls._subscription_cache[transform] = wrapper
- return cls._subscription_cache[transform].sub_name
+ def get_subscription(cls, transform, project, short_topic_name,
+ short_sub_name):
+ from google.cloud import pubsub
+
+ if short_sub_name:
+ return pubsub.SubscriberClient.subscription_path(project, short_sub_name)
+
+ if transform in cls._subscription_cache:
+ return cls._subscription_cache[transform]
+
+ sub_client = pubsub.SubscriberClient()
+ sub_name = sub_client.subscription_path(
+ project, 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
+ topic_name = sub_client.topic_path(project, short_topic_name)
+ sub_client.create_subscription(sub_name, topic_name)
+ atexit.register(sub_client.delete_subscription, sub_name)
+ cls._subscription_cache[transform] = sub_name
+ return cls._subscription_cache[transform]
def start_bundle(self):
pass