potiuk commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite 
Google PubSub Hook to Google Cloud Python
URL: https://github.com/apache/airflow/pull/6096#discussion_r328617353
 
 

 ##########
 File path: airflow/gcp/hooks/pubsub.py
 ##########
 @@ -52,254 +51,483 @@ class PubSubHook(GoogleCloudBaseHook):
 
     def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: 
Optional[str] = None) -> None:
         super().__init__(gcp_conn_id, delegate_to=delegate_to)
+        self._client = None
+
+    def get_conn(self) -> PublisherClient:
+        """
+        Retrieves connection to Google Cloud Pub/Sub.
 
-    def get_conn(self) -> Any:
+        :return: Google Cloud Pub/Sub client object.
+        :rtype: google.cloud.pubsub_v1.PublisherClient
         """
-        Returns a Pub/Sub service object.
+        if not self._client:
+            self._client = PublisherClient(
+                credentials=self._get_credentials(),
+                client_info=self.client_info
+            )
+        return self._client
 
-        :rtype: googleapiclient.discovery.Resource
+    @cached_property
+    def subscriber_client(self) -> SubscriberClient:
         """
-        http_authorized = self._authorize()
-        return build(
-            'pubsub', 'v1', http=http_authorized, cache_discovery=False)
+        Creates SubscriberClient.
 
-    def publish(self, project: str, topic: str, messages: List[Dict]) -> None:
+        :return: Google Cloud Pub/Sub client object.
+        :rtype: google.cloud.pubsub_v1.SubscriberClient
+        """
+        return SubscriberClient(
+            credentials=self._get_credentials(),
+            client_info=self.client_info
+        )
+
+    @GoogleCloudBaseHook.fallback_to_default_project_id
+    def publish(
+        self,
+        topic: str,
+        messages: List[Dict],
+        project_id: Optional[str] = None,
+    ) -> None:
         """
         Publishes messages to a Pub/Sub topic.
 
-        :param project: the GCP project ID in which to publish
-        :type project: str
         :param topic: the Pub/Sub topic to which to publish; do not
             include the ``projects/{project}/topics/`` prefix.
         :type topic: str
         :param messages: messages to publish; if the data field in a
             message is set, it should already be base64 encoded.
         :type messages: list of PubSub messages; see
             http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
+        :param project_id: Optional, the GCP project ID in which to publish.
+            If set to None or missing, the default project_id from the GCP 
connection is used.
+        :type project_id: str
         """
-        body = {'messages': messages}
-        full_topic = _format_topic(project, topic)
-        request = self.get_conn().projects().topics().publish(  # pylint: 
disable=no-member
-            topic=full_topic, body=body)
+        assert project_id is not None
+        publisher = self.get_conn()
+        topic_path = PublisherClient.topic_path(project_id, topic)  # pylint: 
disable=no-member
+
+        # TODO validation of messages
+        self.log.info("Publish %d messages to topic (path) %s", len(messages), 
topic_path)
         try:
-            request.execute(num_retries=self.num_retries)
-        except HttpError as e:
-            raise PubSubException(
-                'Error publishing to topic {}'.format(full_topic), e)
+            for message in messages:
+                publisher.publish(
+                    topic=topic_path,
+                    data=message.get("data", b''),
+                    **message.get('attributes', {})
+                )
+        except GoogleAPICallError as e:
+            raise PubSubException('Error publishing to topic 
{}'.format(topic_path), e)
+
+        self.log.info("Published %d messages to topic (path) %s", 
len(messages), topic_path)
 
-    def create_topic(self, project: str, topic: str, fail_if_exists: bool = 
False) -> None:
+    # pylint: disable=too-many-arguments
+    @GoogleCloudBaseHook.fallback_to_default_project_id
+    def create_topic(
+        self,
+        topic: str,
+        project_id: Optional[str] = None,
+        fail_if_exists: bool = False,
+        labels: Optional[Dict[str, str]] = None,
+        message_storage_policy: Union[Dict, MessageStoragePolicy] = None,
+        kms_key_name: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ) -> None:
         """
         Creates a Pub/Sub topic, if it does not already exist.
 
-        :param project: the GCP project ID in which to create
-            the topic
-        :type project: str
         :param topic: the Pub/Sub topic name to create; do not
             include the ``projects/{project}/topics/`` prefix.
         :type topic: str
+        :param project_id: Optional, the GCP project ID in which to create the 
topic
 
 Review comment:
   Cool with all those params below. Looks like fully-featured  implementation 
now 👍 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to