vchiapaikeo opened a new issue, #26278: URL: https://github.com/apache/airflow/issues/26278
### Apache Airflow Provider(s) google ### Versions of Apache Airflow Providers 8.3.0 ### Apache Airflow version 2.3.4 ### Operating System Debian 11 ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### What happened During routine CloudSQL maintenance, a GCSObjectExistenceSensor failed with the below `sqlalchemy.exc.OperationalError` exception. While this is somewhat expected and can be mitigated with top level Airflow retries and retry_delay policies, we would like the ability to define a more rigorous retry policy on the sensor itself. This ability currently exists for certain GCP operators - for example, the [DataprocCreateClusterOperator](https://github.com/apache/airflow/blob/eb03959e437e11891b8c3696b76f664a991a37a4/airflow/providers/google/cloud/operators/dataproc.py#L468). However, it is not comprehensive. The GCSObjectExistenceSensor is a place where its absence is felt because of how long these sensors run for and the need for them to be robust. ``` [2022-08-31, 16:12:28 UTC] {base_job.py:229} ERROR - LocalTaskJob heartbeat got an exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 3141, in _wrap_pool_connect return fn() File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 301, in connect return _ConnectionFairy._checkout(self) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 755, in _checkout fairy = _ConnectionRecord.checkout(pool) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 419, in checkout rec = pool._do_get() File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/impl.py", line 259, in _do_get return self._create_connection() File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 247, in _create_connection return _ConnectionRecord(self) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 362, in __init__ self.__connect(first_connect_check=True) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 605, in __connect pool.logger.debug("Error on connect(): %s", e) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 599, in __connect connection = pool._invoke_creator(self) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/create.py", line 578, in connect return dialect.connect(*cargs, **cparams) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 583, in connect return self.dbapi.connect(*cargs, **cparams) File "/home/airflow/.local/lib/python3.9/site-packages/MySQLdb/__init__.py", line 123, in Connect return Connection(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/MySQLdb/connections.py", line 185, in __init__ super().__init__(*args, **kwargs2) MySQLdb.OperationalError: (2003, "Can't connect to MySQL server on 'XYZ:3306' (111)") The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 201, in heartbeat session.merge(self) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2872, in merge return self._merge( File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2950, in _merge merged = self.get(mapper.class_, key[1], identity_token=key[2]) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2695, in get return self._get_impl( File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2796, in _get_impl return db_load_fn( File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 531, in load_on_pk_identity session.execute( File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1669, in execute conn = self._connection_for_bind(bind, close_with_result=True) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1519, in _connection_for_bind return self._transaction._connection_for_bind( File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 747, in _connection_for_bind conn = bind.connect() File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 3095, in connect return self._connection_cls(self, close_with_result=close_with_result) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 91, in __init__ else engine.raw_connection() File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 3174, in raw_connection return self._wrap_pool_connect(self.pool.connect, _connection) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 3144, in _wrap_pool_connect Connection._handle_dbapi_exception_noconnection( File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2003, in _handle_dbapi_exception_noconnection util.raise_( File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 3141, in _wrap_pool_connect return fn() File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 301, in connect return _ConnectionFairy._checkout(self) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 755, in _checkout fairy = _ConnectionRecord.checkout(pool) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 419, in checkout rec = pool._do_get() File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/impl.py", line 259, in _do_get return self._create_connection() File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 247, in _create_connection return _ConnectionRecord(self) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 362, in __init__ self.__connect(first_connect_check=True) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 605, in __connect pool.logger.debug("Error on connect(): %s", e) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/pool/base.py", line 599, in __connect connection = pool._invoke_creator(self) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/create.py", line 578, in connect return dialect.connect(*cargs, **cparams) File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 583, in connect return self.dbapi.connect(*cargs, **cparams) File "/home/airflow/.local/lib/python3.9/site-packages/MySQLdb/__init__.py", line 123, in Connect return Connection(*args, **kwargs) File "/home/airflow/.local/lib/python3.9/site-packages/MySQLdb/connections.py", line 185, in __init__ super().__init__(*args, **kwargs2) sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2003, "Can't connect to MySQL server on 'XYZ:3306' (111)") (Background on this error at: http://sqlalche.me/e/14/e3q8) [2022-08-31, 16:12:45 UTC] {gcs.py:81} INFO - Sensor checks existence of : my-bucket-name, vchiapaikeo/trigger.file ``` ### What you think should happen instead Currently, the sensor makes a call to [hook.exists()](https://github.com/apache/airflow/blob/eb03959e437e11891b8c3696b76f664a991a37a4/airflow/providers/google/cloud/sensors/gcs.py#L87) to perform its check. This function uses the [underlying blob resource's exists](https://github.com/apache/airflow/blob/eb03959e437e11891b8c3696b76f664a991a37a4/airflow/providers/google/cloud/hooks/gcs.py#L544-L555) method to check on whether the object exists. The google-cloud-storage library's Blob.exists method actually [accepts retry object](https://github.com/googleapis/python-storage/blob/282e3b605e62c4f8d93debff20d40afbf2e718f5/google/cloud/storage/blob.py#L636) - and if unpassed, it will use the [DEFAULT_RETRY constant](https://github.com/googleapis/python-storage/blob/282e3b605e62c4f8d93debff20d40afbf2e718f5/google/cloud/storage/retry.py#L52). We should allow for the retry object to be overridden by exposing a retry parameter to the GCSObjectExistenceSensor operator which will then allow us to pass that argument through in the sensor's poke implementation (via the hook.exists() call). Something like: ```py def poke(self, context: "Context") -> bool: self.log.info('Sensor checks existence of : %s, %s', self.bucket, self.object) hook = GCSHook( gcp_conn_id=self.google_cloud_conn_id, delegate_to=self.delegate_to, impersonation_chain=self.impersonation_chain, ) return hook.exists(self.bucket, self.object, retry=self.retry) ``` To maintain backwards compatibility, we should use the same default retry that is defined in google-cloud-storage: https://github.com/googleapis/python-storage/blob/282e3b605e62c4f8d93debff20d40afbf2e718f5/google/cloud/storage/retry.py#L52 ### How to reproduce Simple dag to repro. Once dag is running, initiate a DB failover. ```py import datetime from airflow import DAG from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor DEFAULT_TASK_ARGS = { "owner": "xyz", "start_date": "2022-09-01", "retries": 3, "retry_delay": 300, "project_id": "my-project-id", } sensor_count = 50 with DAG( schedule_interval="@daily", max_active_runs=1, max_active_tasks=sensor_count, catchup=False, dag_id="test_gcs_sensor", default_args=DEFAULT_TASK_ARGS, ) as dag: for i in range(sensor_count): _ = GCSObjectExistenceSensor( bucket="my-bucket-name", object="vchiapaikeo/trigger.file", retry_delay=(datetime.timedelta(seconds=60)), start_date=(datetime.datetime(2021, 4, 16, 0, 0)), task_id=f"test_sensor_{i}", poke_interval=300, ) ``` ### Anything else Only when certain conditions are met - need a DB failover or some other event ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
