potiuk commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite
Google PubSub Hook to Google Cloud Python
URL: https://github.com/apache/airflow/pull/6096#discussion_r328618023
##########
File path: airflow/gcp/hooks/pubsub.py
##########
@@ -52,254 +51,483 @@ class PubSubHook(GoogleCloudBaseHook):
def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to:
Optional[str] = None) -> None:
super().__init__(gcp_conn_id, delegate_to=delegate_to)
+ self._client = None
+
+ def get_conn(self) -> PublisherClient:
+ """
+ Retrieves connection to Google Cloud Pub/Sub.
- def get_conn(self) -> Any:
+ :return: Google Cloud Pub/Sub client object.
+ :rtype: google.cloud.pubsub_v1.PublisherClient
"""
- Returns a Pub/Sub service object.
+ if not self._client:
+ self._client = PublisherClient(
+ credentials=self._get_credentials(),
+ client_info=self.client_info
+ )
+ return self._client
- :rtype: googleapiclient.discovery.Resource
+ @cached_property
+ def subscriber_client(self) -> SubscriberClient:
"""
- http_authorized = self._authorize()
- return build(
- 'pubsub', 'v1', http=http_authorized, cache_discovery=False)
+ Creates SubscriberClient.
- def publish(self, project: str, topic: str, messages: List[Dict]) -> None:
+ :return: Google Cloud Pub/Sub client object.
+ :rtype: google.cloud.pubsub_v1.SubscriberClient
+ """
+ return SubscriberClient(
+ credentials=self._get_credentials(),
+ client_info=self.client_info
+ )
+
+ @GoogleCloudBaseHook.fallback_to_default_project_id
+ def publish(
+ self,
+ topic: str,
+ messages: List[Dict],
+ project_id: Optional[str] = None,
+ ) -> None:
"""
Publishes messages to a Pub/Sub topic.
- :param project: the GCP project ID in which to publish
- :type project: str
:param topic: the Pub/Sub topic to which to publish; do not
include the ``projects/{project}/topics/`` prefix.
:type topic: str
:param messages: messages to publish; if the data field in a
message is set, it should already be base64 encoded.
:type messages: list of PubSub messages; see
http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
+ :param project_id: Optional, the GCP project ID in which to publish.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: str
"""
- body = {'messages': messages}
- full_topic = _format_topic(project, topic)
- request = self.get_conn().projects().topics().publish( # pylint:
disable=no-member
- topic=full_topic, body=body)
+ assert project_id is not None
+ publisher = self.get_conn()
+ topic_path = PublisherClient.topic_path(project_id, topic) # pylint:
disable=no-member
+
+ # TODO validation of messages
+ self.log.info("Publish %d messages to topic (path) %s", len(messages),
topic_path)
try:
- request.execute(num_retries=self.num_retries)
- except HttpError as e:
- raise PubSubException(
- 'Error publishing to topic {}'.format(full_topic), e)
+ for message in messages:
+ publisher.publish(
+ topic=topic_path,
+ data=message.get("data", b''),
+ **message.get('attributes', {})
+ )
+ except GoogleAPICallError as e:
+ raise PubSubException('Error publishing to topic
{}'.format(topic_path), e)
+
+ self.log.info("Published %d messages to topic (path) %s",
len(messages), topic_path)
- def create_topic(self, project: str, topic: str, fail_if_exists: bool =
False) -> None:
+ # pylint: disable=too-many-arguments
+ @GoogleCloudBaseHook.fallback_to_default_project_id
+ def create_topic(
+ self,
+ topic: str,
+ project_id: Optional[str] = None,
+ fail_if_exists: bool = False,
+ labels: Optional[Dict[str, str]] = None,
+ message_storage_policy: Union[Dict, MessageStoragePolicy] = None,
+ kms_key_name: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> None:
"""
Creates a Pub/Sub topic, if it does not already exist.
- :param project: the GCP project ID in which to create
- the topic
- :type project: str
:param topic: the Pub/Sub topic name to create; do not
include the ``projects/{project}/topics/`` prefix.
:type topic: str
+ :param project_id: Optional, the GCP project ID in which to create the
topic
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: str
:param fail_if_exists: if set, raise an exception if the topic
already exists
:type fail_if_exists: bool
+ :param labels: Client-assigned labels; see
+ https://cloud.google.com/pubsub/docs/labels
+ :type labels: Dict[str, str]
+ :param message_storage_policy: Policy constraining the set
+ of Google Cloud Platform regions where messages published to
+ the topic may be stored. If not present, then no constraints
+ are in effect.
+ :type message_storage_policy:
+ Union[Dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]
+ :param kms_key_name: The resource name of the Cloud KMS CryptoKey
+ to be used to protect access to messages published on this topic.
+ The expected format is
+ ``projects/*/locations/*/keyRings/*/cryptoKeys/*``.
+ :type kms_key_name: str
+ :param retry: (Optional) A retry object used to retry requests.
+ If None is specified, requests will not be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: (Optional) The amount of time, in seconds, to wait for
the request
+ to complete. Note that if retry is specified, the timeout applies
to each
+ individual attempt.
+ :type timeout: float
+ :param metadata: (Optional) Additional metadata that is provided to
the method.
+ :type metadata: Sequence[Tuple[str, str]]]
"""
- service = self.get_conn()
- full_topic = _format_topic(project, topic)
+ assert project_id is not None
+ publisher = self.get_conn()
+ topic_path = PublisherClient.topic_path(project_id, topic) # pylint:
disable=no-member
+
+ # Add airflow-version label to the topic
+ labels = labels or {}
+ labels['airflow-version'] = 'v' + version.replace('.',
'-').replace('+', '-')
+
+ self.log.info("Creating topic (path) %s", topic_path)
try:
- service.projects().topics().create( # pylint: disable=no-member
- name=full_topic, body={}).execute(num_retries=self.num_retries)
- except HttpError as e:
- # Status code 409 indicates that the topic already exists.
- if str(e.resp['status']) == '409':
- message = 'Topic already exists: {}'.format(full_topic)
- self.log.warning(message)
- if fail_if_exists:
- raise PubSubException(message)
- else:
- raise PubSubException(
- 'Error creating topic {}'.format(full_topic), e)
-
- def delete_topic(self, project: str, topic: str, fail_if_not_exists: bool
= False) -> None:
+ # pylint: disable=no-member
+ publisher.create_topic(
+ name=topic_path,
+ labels=labels,
+ message_storage_policy=message_storage_policy,
+ kms_key_name=kms_key_name,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ except AlreadyExists:
+ self.log.warning('Topic already exists: %s', topic)
+ if fail_if_exists:
+ raise PubSubException('Topic already exists: {}'.format(topic))
+ except GoogleAPICallError as e:
+ raise PubSubException('Error creating topic {}'.format(topic), e)
+
+ self.log.info("Created topic (path) %s", topic_path)
+
+ @GoogleCloudBaseHook.fallback_to_default_project_id
+ def delete_topic(
+ self,
+ topic: str,
+ project_id: Optional[str] = None,
+ fail_if_not_exists: bool = False,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> None:
"""
Deletes a Pub/Sub topic if it exists.
- :param project: the GCP project ID in which to delete the topic
- :type project: str
:param topic: the Pub/Sub topic name to delete; do not
include the ``projects/{project}/topics/`` prefix.
:type topic: str
+ :param project_id: Optional, the GCP project ID in which to delete the
topic.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: str
:param fail_if_not_exists: if set, raise an exception if the topic
does not exist
:type fail_if_not_exists: bool
+ :param retry: (Optional) A retry object used to retry requests.
+ If None is specified, requests will not be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: (Optional) The amount of time, in seconds, to wait for
the request
+ to complete. Note that if retry is specified, the timeout applies
to each
+ individual attempt.
+ :type timeout: float
+ :param metadata: (Optional) Additional metadata that is provided to
the method.
+ :type metadata: Sequence[Tuple[str, str]]]
"""
- service = self.get_conn()
- full_topic = _format_topic(project, topic)
+ assert project_id is not None
+ publisher = self.get_conn()
+ topic_path = PublisherClient.topic_path(project_id, topic) # pylint:
disable=no-member
+
+ self.log.info("Deleting topic (path) %s", topic_path)
try:
- service.projects().topics().delete( # pylint: disable=no-member
- topic=full_topic).execute(num_retries=self.num_retries)
- except HttpError as e:
- # Status code 409 indicates that the topic was not found
- if str(e.resp['status']) == '404':
- message = 'Topic does not exist: {}'.format(full_topic)
- self.log.warning(message)
- if fail_if_not_exists:
- raise PubSubException(message)
- else:
- raise PubSubException(
- 'Error deleting topic {}'.format(full_topic), e)
+ # pylint: disable=no-member
+ publisher.delete_topic(
+ topic=topic_path,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ except NotFound:
+ self.log.warning('Topic does not exist: %s', topic_path)
+ if fail_if_not_exists:
+ raise PubSubException('Topic does not exist:
{}'.format(topic_path))
+ except GoogleAPICallError as e:
+ raise PubSubException('Error deleting topic {}'.format(topic), e)
+ self.log.info("Deleted topic (path) %s", topic_path)
+ # pylint: disable=too-many-arguments
+ @GoogleCloudBaseHook.fallback_to_default_project_id
def create_subscription(
self,
- topic_project: str,
topic: str,
+ project_id: Optional[str] = None,
subscription: Optional[str] = None,
- subscription_project: Optional[str] = None,
+ subscription_project_id: Optional[str] = None,
ack_deadline_secs: int = 10,
fail_if_exists: bool = False,
+ push_config: Optional[Union[Dict, PushConfig]] = None,
Review comment:
Same here 👍
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services