josepaul1988 opened a new issue, #27384: URL: https://github.com/apache/airflow/issues/27384
### Apache Airflow Provider(s) google ### Versions of Apache Airflow Providers apache-airflow-providers-google==6.4.0 ### Apache Airflow version 2.2.3 ### Operating System Container-Optimized OS,1.22.12-gke.2300 ### Deployment Composer ### Deployment details Composer version: composer-2.0.8-airflow-2.2.3 ### What happened **Requirement:** Acknowledge GCP Cloud PubSub messages in PullOperator/PullSensor's call back method (messages_callback) by leveraging PubSubHook().acknowledge(). Business requirement is that only messages that are successfully processed should be acknowledged. As per [documentation ](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge) below syntax is valid `PubSubHook().acknowledge(subscription=SUBSCRIPTION_ID,project_id=PROJECT_ID, ack_ids=ack_ids_list, retry=Retry , timeout=10)` . How ever if i give retry=Retry, it will throw an exception "__call__() got an unexpected keyword argument 'metadata' " and if i tried without retry option it would work, `PubSubHook().acknowledge(subscription=SUBSCRIPTION_ID,project_id=PROJECT_ID, ack_ids=ack_ids_list, timeout=10)` ### What you think should happen instead As per the [documentation ](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge), retry option is required to retry the request in case of failure. Now that i had to remove the retry option to make acknowledge operation work, i lost the ability to retry an acknowledgment failure. > retry (Retry | _MethodDefault) – (Optional) A retry object used to retry requests. If None is specified, requests will not be retried. Error StackTrace: ``` PubSubHook().acknowledge(subscription=SNOW_SUBSCRIPTION,project_id=PROJECT_ID, ack_ids=ack_id_list, retry=Retry , timeout=10) Traceback: File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/pubsub.py", line 785, in execute ret = handle_messages(pulled_messages, context) File "/home/airflow/gcs/dags/snow_ticket_creator_1.py", line 70, in print_messages PubSubHook().acknowledge(subscription=SNOW_SUBSCRIPTION,project_id=PROJECT_ID, ack_ids=ack_id_list, retry=Retry , timeout=10) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 457, in inner_wrapper return func(self, *args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/pubsub.py", line 561, in acknowledge subscriber.acknowledge( File "/opt/python3.8/lib/python3.8/site-packages/google/pubsub_v1/services/subscriber/client.py", line 1270, in acknowledge rpc( File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py", line 154, in __call__ return wrapped_func(*args, **kwargs) TypeError: __call__() got an unexpected keyword argument 'metadata' ``` ### How to reproduce 1. Create GCP Cloud PubSub Topic and Subscription. 2. Publish some messages to Topic . 3. Create a DAG to connect to Subscription and pull messages from it. Use below code to reproduce. ``` from __future__ import annotations import os from datetime import datetime import base64 import airflow from airflow import DAG import json from airflow.operators.bash import BashOperator from airflow.providers.google.cloud.operators.pubsub import ( PubSubCreateSubscriptionOperator, PubSubPullOperator, ) from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor from airflow.providers.google.cloud.hooks.pubsub import PubSubHook,Retry from airflow.utils.trigger_rule import TriggerRule ENV_ID = "Dev" #os.environ.get("SYSTEM_TESTS_ENV_ID") PROJECT_ID = "gp-ct-sbox-adv-dna" #os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "your-project-id") DAG_ID = "SnowTicketCreator_1" TOPIC_ID = "snow_alert_topic_jp" #f"topic-{DAG_ID}-{ENV_ID}" SNOW_SUBSCRIPTION="snow_alert_subscription_jp_1" def ack_messages(pulled_messages, context): for idx,m in enumerate(pulled_messages): data = m.message.data.decode('utf-8') data_json_dict = json.loads(data) print(f"AckID: { m.ack_id }, incident_id: { data_json_dict['incident']['incident_id'] } " f"scoping_project_id: { data_json_dict['incident']['scoping_project_id'] } " f"resource_name: { data_json_dict['incident']['resource_name'] } " f"summary: { data_json_dict['incident']['summary'] } ") #acknowldege message ack_id_list = [m.ack_id] if idx == 0: PubSubHook().acknowledge(subscription=SNOW_SUBSCRIPTION,project_id=PROJECT_ID, ack_ids=ack_id_list ,retry=Retry,timeout=10) print(f"Successfully acknowldeged incident_id: { data_json_dict['incident']['incident_id'] }") # [END howto_operator_gcp_pubsub_pull_messages_result_cmd] with DAG( DAG_ID, schedule_interval='@once', # Override to match your needs start_date=airflow.utils.dates.days_ago(0), catchup=False, ) as dag: subscribe_task = PubSubCreateSubscriptionOperator( task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID,subscription=SNOW_SUBSCRIPTION ) subscription = subscribe_task.output pull_messages_operator = PubSubPullOperator( task_id="pull_messages_operator", ack_messages=False, project_id=PROJECT_ID, messages_callback=ack_messages, subscription=subscription, max_messages=50, ) ( subscribe_task >> pull_messages_operator ) ``` ### Anything else This happens only when we provide retry option in the acknowledge method. ``` PubSubHook().acknowledge(subscription=SNOW_SUBSCRIPTION,project_id=PROJECT_ID, ack_ids=ack_id_list ,retry=Retry,timeout=10) ``` ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
