josepaul1988 opened a new issue, #27384:
URL: https://github.com/apache/airflow/issues/27384

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==6.4.0
   
   
   
   ### Apache Airflow version
   
   2.2.3
   
   ### Operating System
   
   Container-Optimized OS,1.22.12-gke.2300      
   
   ### Deployment
   
   Composer
   
   ### Deployment details
   
   Composer version: composer-2.0.8-airflow-2.2.3 
   
   
   
   ### What happened
   
   **Requirement:** Acknowledge GCP Cloud PubSub messages in 
PullOperator/PullSensor's call back method (messages_callback) by leveraging 
PubSubHook().acknowledge(). Business requirement is that only messages that are 
successfully processed should be acknowledged.
   
   As per [documentation 
](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge)
 below syntax is valid
   
`PubSubHook().acknowledge(subscription=SUBSCRIPTION_ID,project_id=PROJECT_ID, 
ack_ids=ack_ids_list, retry=Retry , timeout=10)` .
   
   How ever if i give retry=Retry, it will throw an exception "__call__() got 
an unexpected keyword argument 'metadata' " and if i tried without retry option 
it would work, 
`PubSubHook().acknowledge(subscription=SUBSCRIPTION_ID,project_id=PROJECT_ID, 
ack_ids=ack_ids_list, timeout=10)` 
   
   
   
   ### What you think should happen instead
   
   As per the [documentation 
](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge),
 retry option is required to retry the request in case of failure. Now that i 
had to remove the retry option to make acknowledge operation work, i lost the 
ability to retry an acknowledgment failure.
   
   > retry (Retry | _MethodDefault) – (Optional) A retry object used to retry 
requests. If None is specified, requests will not be retried.
   
   Error StackTrace:
   
   ``` 
   
PubSubHook().acknowledge(subscription=SNOW_SUBSCRIPTION,project_id=PROJECT_ID, 
ack_ids=ack_id_list, retry=Retry , timeout=10)  
   
   Traceback:
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/pubsub.py",
 line 785, in execute 
       ret = handle_messages(pulled_messages, context)
     File "/home/airflow/gcs/dags/snow_ticket_creator_1.py", line 70, in 
print_messages
       
PubSubHook().acknowledge(subscription=SNOW_SUBSCRIPTION,project_id=PROJECT_ID, 
ack_ids=ack_id_list, retry=Retry , timeout=10)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py",
 line 457, in inner_wrapper
       return func(self, *args, **kwargs)
     File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/pubsub.py",
 line 561, in acknowledge
       subscriber.acknowledge(
     File 
"/opt/python3.8/lib/python3.8/site-packages/google/pubsub_v1/services/subscriber/client.py",
 line 1270, in acknowledge
       rpc(
     File 
"/opt/python3.8/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py",
 line 154, in __call__
       return wrapped_func(*args, **kwargs)
   TypeError: __call__() got an unexpected keyword argument 'metadata'  
   ```
   
   ### How to reproduce
   
   1. Create GCP Cloud PubSub Topic and Subscription.
   2. Publish some messages to Topic .
   3. Create a DAG to connect to Subscription and pull messages from it. Use 
below code to reproduce. 
   
   ``` 
   from __future__ import annotations
   
   import os
   from datetime import datetime
   import base64
   
   import airflow
   from airflow import DAG
   import json
   from airflow.operators.bash import BashOperator
   from airflow.providers.google.cloud.operators.pubsub import (
       PubSubCreateSubscriptionOperator,
       PubSubPullOperator,
   )
   from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor
   from  airflow.providers.google.cloud.hooks.pubsub import PubSubHook,Retry
   from airflow.utils.trigger_rule import TriggerRule
   
   ENV_ID = "Dev" #os.environ.get("SYSTEM_TESTS_ENV_ID")
   PROJECT_ID = "gp-ct-sbox-adv-dna" 
#os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "your-project-id")
   DAG_ID = "SnowTicketCreator_1"
   TOPIC_ID = "snow_alert_topic_jp" #f"topic-{DAG_ID}-{ENV_ID}"
   SNOW_SUBSCRIPTION="snow_alert_subscription_jp_1"
   
   def ack_messages(pulled_messages, context):
       for idx,m in enumerate(pulled_messages):
         data = m.message.data.decode('utf-8')      
         data_json_dict = json.loads(data)     
         print(f"AckID: { m.ack_id }, incident_id: { 
data_json_dict['incident']['incident_id'] } "
          f"scoping_project_id: { 
data_json_dict['incident']['scoping_project_id'] } "
          f"resource_name: { data_json_dict['incident']['resource_name'] } "
          f"summary: { data_json_dict['incident']['summary'] } ")
         #acknowldege message
         ack_id_list = [m.ack_id]      
         if idx == 0:
           
PubSubHook().acknowledge(subscription=SNOW_SUBSCRIPTION,project_id=PROJECT_ID, 
ack_ids=ack_id_list ,retry=Retry,timeout=10)        
           print(f"Successfully acknowldeged incident_id: { 
data_json_dict['incident']['incident_id'] }")
   # [END howto_operator_gcp_pubsub_pull_messages_result_cmd]
   
   with DAG(
       DAG_ID,
       schedule_interval='@once',  # Override to match your needs
       start_date=airflow.utils.dates.days_ago(0),
       catchup=False,
   ) as dag:
   
     subscribe_task = PubSubCreateSubscriptionOperator(
     task_id="subscribe_task", project_id=PROJECT_ID, 
topic=TOPIC_ID,subscription=SNOW_SUBSCRIPTION
     )
     
     subscription = subscribe_task.output
   
     pull_messages_operator = PubSubPullOperator(
       task_id="pull_messages_operator",
       ack_messages=False,
       project_id=PROJECT_ID,       
       messages_callback=ack_messages,
       subscription=subscription,
       max_messages=50,
     )
     (
       subscribe_task         
       >> pull_messages_operator    
       
     ) 
   ```
   
   ### Anything else
   
   This happens only when we provide retry option in the acknowledge method.
   
   ```
   
PubSubHook().acknowledge(subscription=SNOW_SUBSCRIPTION,project_id=PROJECT_ID, 
ack_ids=ack_id_list ,retry=Retry,timeout=10)        
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to