TobKed commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite
Google PubSub Hook to Google Cloud Python
URL: https://github.com/apache/airflow/pull/6096#discussion_r324696829
##########
File path: airflow/gcp/hooks/pubsub.py
##########
@@ -177,40 +293,88 @@ def create_subscription(
:param fail_if_exists: if set, raise an exception if the topic
already exists
:type fail_if_exists: bool
+ :param push_config: If push delivery is used with this subscription,
+ this field is used to configure it. An empty ``pushConfig``
signifies
+ that the subscriber will pull and ack messages using API methods.
+ :type push_config: Union[Dict, google.cloud.pubsub_v1.types.PushConfig]
+ :param retain_acked_messages: Indicates whether to retain acknowledged
+ messages. If true, then messages are not expunged from the
subscription's
+ backlog, even if they are acknowledged, until they fall out of the
+ ``message_retention_duration`` window. This must be true if you
would
+ like to Seek to a timestamp.
+ :type retain_acked_messages: bool
+ :param message_retention_duration: How long to retain unacknowledged
messages
+ in the subscription's backlog, from the moment a message is
published. If
+ ``retain_acked_messages`` is true, then this also configures the
+ retention of acknowledged messages, and thus configures how far
back in
+ time a ``Seek`` can be done. Defaults to 7 days. Cannot be more
than 7
+ days or less than 10 minutes.
+ :type message_retention_duration: Union[Dict,
google.cloud.pubsub_v1.types.Duration]
+ :param labels: Client-assigned labels; see
+ https://cloud.google.com/pubsub/docs/labels
+ :type labels: Dict[str, str]
+ :param retry: (Optional) A retry object used to retry requests.
+ If None is specified, requests will not be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: (Optional) The amount of time, in seconds, to wait for
the request
+ to complete. Note that if retry is specified, the timeout applies
to each
+ individual attempt.
+ :type timeout: float
+ :param metadata: (Optional) Additional metadata that is provided to
the method.
+ :type metadata: Sequence[Tuple[str, str]]]
:return: subscription name which will be the system-generated value if
the ``subscription`` parameter is not supplied
:rtype: str
"""
- service = self.get_conn()
- full_topic = _format_topic(topic_project, topic)
+ subscriber = self.subscriber_client
+
if not subscription:
subscription = 'sub-{}'.format(uuid4())
if not subscription_project:
subscription_project = topic_project
- full_subscription = _format_subscription(subscription_project,
- subscription)
- body = {
- 'topic': full_topic,
- 'ackDeadlineSeconds': ack_deadline_secs
- }
+
+ # Add airflow-version label to the subscription
+ labels = labels or {}
+ labels['airflow-version'] = 'v' + version.replace('.',
'-').replace('+', '-')
+
+ # pylint: disable=no-member
+ subscription_path =
SubscriberClient.subscription_path(subscription_project, subscription)
+ topic_path = SubscriberClient.topic_path(topic_project, topic)
+
+ self.log.info("Creating subscription (path) %s for topic (path) %a",
subscription_path, topic_path)
try:
- service.projects().subscriptions().create( # pylint:
disable=no-member
- name=full_subscription,
body=body).execute(num_retries=self.num_retries)
- except HttpError as e:
- # Status code 409 indicates that the subscription already exists.
- if str(e.resp['status']) == '409':
- message = 'Subscription already exists: {}'.format(
- full_subscription)
- self.log.warning(message)
- if fail_if_exists:
- raise PubSubException(message)
- else:
- raise PubSubException(
- 'Error creating subscription {}'.format(full_subscription),
- e)
+ subscriber.create_subscription(
+ name=subscription_path,
+ topic=topic_path,
+ push_config=push_config,
+ ack_deadline_seconds=ack_deadline_secs,
+ retain_acked_messages=retain_acked_messages,
+ message_retention_duration=message_retention_duration,
+ labels=labels,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ except AlreadyExists:
+ message = 'Subscription already exists:
{}'.format(subscription_path)
+ self.log.warning(message)
+ if fail_if_exists:
+ raise PubSubException(message)
+ except GoogleAPICallError as e:
Review comment:
I've added test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services