EugeneChung opened a new issue, #39016:
URL: https://github.com/apache/airflow/issues/39016
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.7.3
### What happened?
I'm using KubernetesPodOperator of
`apache-airflow-providers-cncf-kubernetes==7.9.0`.
The current flow of execute_sync() is like below;
1. create pod : pod is set
2. do xcom_push
3. remote_pod is set by find_pod
4. ...
5. cleanup(pod, remote_pod)
When the airflow DB has some problems and cannot be connected,
execute_sync() can fail because of xcom_push() failure.
Then cleanup() is called with remote_pod not set properly. In cleanup(), it
calls patch_already_checked() with remote_pod then patch_already_checked will
fail because of incomplete remote_pod.
Therefore, the pod created at first remains.
I think the cleanup function should consider the pod instance first and
remote_pod should be treated as optional.
```
MySQLdb.OperationalError: (2005, "Unknown MySQL server host '...' (-3)")
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
line 605, in execute_sync
ti.xcom_push(key="pod_namespace", value=self.pod.metadata.namespace)
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py",
line 79, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
line 2479, in xcom_push
XCom.set(
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/session.py",
line 76, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/models/xcom.py",
line 219, in set
dag_run_id = session.query(DagRun.id).filter_by(dag_id=dag_id,
run_id=run_id).scalar()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py",
line 2893, in scalar
ret = self.one()
^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py",
line 2870, in one
return self._iter().one()
^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/query.py",
line 2916, in _iter
result = self.session.execute(
^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1716, in execute
conn = self._connection_for_bind(bind)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 1555, in _connection_for_bind
return self._transaction._connection_for_bind(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/orm/session.py",
line 750, in _connection_for_bind
conn = bind.connect()
^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/future/engine.py",
line 406, in connect
return super(Engine, self).connect()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 3325, in connect
return self._connection_cls(self, close_with_result=close_with_result)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 96, in __init__
else engine.raw_connection()
^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 3404, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 3374, in _wrap_pool_connect
Connection._handle_dbapi_exception_noconnection(
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 2208, in _handle_dbapi_exception_noconnection
util.raise_(
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/base.py",
line 3371, in _wrap_pool_connect
return fn()
^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 327, in connect
return _ConnectionFairy._checkout(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 894, in _checkout
fairy = _ConnectionRecord.checkout(pool)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 493, in checkout
rec = pool._do_get()
^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py",
line 256, in _do_get
return self._create_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 273, in _create_connection
return _ConnectionRecord(self)
^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 388, in __init__
self.__connect()
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 690, in __connect
with util.safe_reraise():
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
compat.raise_(
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/pool/base.py",
line 686, in __connect
self.dbapi_connection = connection = pool._invoke_creator(self)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/create.py",
line 574, in connect
return dialect.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/sqlalchemy/engine/default.py",
line 598, in connect
return self.dbapi.connect(*cargs, **cparams)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/__init__.py", line
121, in Connect
return Connection(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/airflow/.local/lib/python3.11/site-packages/MySQLdb/connections.py",
line 193, in __init__
super().__init__(*args, **kwargs2)
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2005, "Unknown
MySQL server host '...' (-3)")
(Background on this error at: https://sqlalche.me/e/14/e3q8)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
line 865, in patch_already_checked
name=pod.metadata.name,
^^^^^^^^^^^^
...
```
### What you think should happen instead?
_No response_
### How to reproduce
It's not easy but if execute_sync() is failed from xcom_push, the problem
must happen.
### Operating System
N/A
### Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==7.9.0
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]