mik-laj commented on a change in pull request #6096: [AIRFLOW-5477] Rewrite 
Google PubSub Hook to Google Cloud Python
URL: https://github.com/apache/airflow/pull/6096#discussion_r324422317
 
 

 ##########
 File path: airflow/gcp/hooks/pubsub.py
 ##########
 @@ -259,29 +443,57 @@ def pull(
             return if no messages are available. Otherwise, the request will
             block for an undisclosed, but bounded period of time
         :type return_immediately: bool
+        :param retry: (Optional) A retry object used to retry requests.
+            If None is specified, requests will not be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: (Optional) The amount of time, in seconds, to wait for 
the request
+            to complete. Note that if retry is specified, the timeout applies 
to each
+            individual attempt.
+        :type timeout: float
+        :param metadata: (Optional) Additional metadata that is provided to 
the method.
+        :type metadata: Sequence[Tuple[str, str]]]
         :return: A list of Pub/Sub ReceivedMessage objects each containing
             an ``ackId`` property and a ``message`` property, which includes
             the base64-encoded message content. See
             
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull#ReceivedMessage
         """
-        service = self.get_conn()
-        full_subscription = _format_subscription(project, subscription)
-        body = {
-            'maxMessages': max_messages,
-            'returnImmediately': return_immediately
-        }
+        subscriber = self.subscriber_client
+        subscription_path = SubscriberClient.subscription_path(project, 
subscription)  # noqa E501 # pylint: disable=no-member,line-too-long
+
+        self.log.info("Pulling mex %d messages from subscription (path) %s", 
max_messages, subscription_path)
         try:
-            response = service.projects().subscriptions().pull(  # pylint: 
disable=no-member
-                subscription=full_subscription, 
body=body).execute(num_retries=self.num_retries)
-            return response.get('receivedMessages', [])
+            # pylint: disable=no-member
+            response = subscriber.pull(
+                subscription=subscription_path,
+                max_messages=max_messages,
+                return_immediately=return_immediately,
+                retry=retry,
+                timeout=timeout,
+                metadata=metadata,
+            )
+            result = getattr(response, 'received_messages', [])
+            self.log.info("Pulled %d messages from subscription (path) %s", 
len(result),
+                          subscription_path)
+            return result
         except HttpError as e:
             raise PubSubException(
                 'Error pulling messages from subscription {}'.format(
-                    full_subscription), e)
+                    subscription_path), e)
+        except GoogleAPICallError as e:
 
 Review comment:
   This part of code is not covered by unit test. Can you complete missing unit 
tests?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to