TobKed commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite 
Google PubSub Hook to Google Cloud Python
URL: https://github.com/apache/airflow/pull/6096#discussion_r329463926
 
 

 ##########
 File path: airflow/gcp/hooks/pubsub.py
 ##########
 @@ -52,254 +51,483 @@ class PubSubHook(GoogleCloudBaseHook):
 
     def __init__(self, gcp_conn_id: str = 'google_cloud_default', delegate_to: 
Optional[str] = None) -> None:
         super().__init__(gcp_conn_id, delegate_to=delegate_to)
+        self._client = None
+
+    def get_conn(self) -> PublisherClient:
+        """
+        Retrieves connection to Google Cloud Pub/Sub.
 
-    def get_conn(self) -> Any:
+        :return: Google Cloud Pub/Sub client object.
+        :rtype: google.cloud.pubsub_v1.PublisherClient
         """
-        Returns a Pub/Sub service object.
+        if not self._client:
+            self._client = PublisherClient(
+                credentials=self._get_credentials(),
+                client_info=self.client_info
+            )
+        return self._client
 
-        :rtype: googleapiclient.discovery.Resource
+    @cached_property
+    def subscriber_client(self) -> SubscriberClient:
         """
-        http_authorized = self._authorize()
-        return build(
-            'pubsub', 'v1', http=http_authorized, cache_discovery=False)
+        Creates SubscriberClient.
 
-    def publish(self, project: str, topic: str, messages: List[Dict]) -> None:
+        :return: Google Cloud Pub/Sub client object.
+        :rtype: google.cloud.pubsub_v1.SubscriberClient
+        """
+        return SubscriberClient(
+            credentials=self._get_credentials(),
+            client_info=self.client_info
+        )
+
+    @GoogleCloudBaseHook.fallback_to_default_project_id
+    def publish(
+        self,
+        topic: str,
+        messages: List[Dict],
+        project_id: Optional[str] = None,
+    ) -> None:
         """
         Publishes messages to a Pub/Sub topic.
 
-        :param project: the GCP project ID in which to publish
-        :type project: str
         :param topic: the Pub/Sub topic to which to publish; do not
             include the ``projects/{project}/topics/`` prefix.
         :type topic: str
         :param messages: messages to publish; if the data field in a
             message is set, it should already be base64 encoded.
         :type messages: list of PubSub messages; see
             http://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
+        :param project_id: Optional, the GCP project ID in which to publish.
+            If set to None or missing, the default project_id from the GCP 
connection is used.
+        :type project_id: str
         """
-        body = {'messages': messages}
-        full_topic = _format_topic(project, topic)
-        request = self.get_conn().projects().topics().publish(  # pylint: 
disable=no-member
-            topic=full_topic, body=body)
+        assert project_id is not None
+        publisher = self.get_conn()
+        topic_path = PublisherClient.topic_path(project_id, topic)  # pylint: 
disable=no-member
+
+        # TODO validation of messages
 
 Review comment:
   I think this comment should not be present here.
   I have separate PR for message validation waiting in queue:
   https://github.com/PolideaInternal/airflow/pull/304/commits

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to