Gollum999 opened a new issue, #31945:
URL: https://github.com/apache/airflow/issues/31945
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
We started seeing errors like this in one of our DAGs:
``` 2023-06-15 14:01:08.791 | ERROR | airflow.task | Task failed with
exception
Traceback (most recent call last):
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1015, in _commit_impl
self.engine.dialect.do_commit(self.connection)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
line 685, in do_commit
dbapi_connection.commit()
psycopg2.errors.IdleInTransactionSessionTimeout: terminating connection due
to idle-in-transaction timeout
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/conda/envs/production/lib/python3.9/contextlib.py", line 126,
in __exit__
next(self.gen)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
line 36, in create_session
session.commit()
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 1435, in commit
self._transaction.commit(_to_root=self.future)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 836, in commit
trans.commit()
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 2378, in commit
self._do_commit()
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 2568, in _do_commit
self._connection_commit_impl()
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 2539, in _connection_commit_impl
self.connection._commit_impl()
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1017, in _commit_impl
self._handle_dbapi_exception(e, None, None, None, None)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 2043, in _handle_dbapi_exception
util.raise_(
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_
raise exception
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1015, in _commit_impl
self.engine.dialect.do_commit(self.connection)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
line 685, in do_commit
dbapi_connection.commit()
sqlalchemy.exc.InternalError:
(psycopg2.errors.IdleInTransactionSessionTimeout) terminating connection due to
idle-in-transaction timeout
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
(Background on this error at: https://sqlalche.me/e/14/2j85)
2023-06-15 14:01:08.796 | INFO | airflow.task | Marking task as FAILED.
dag_id=test_transaction_timeout,
task_id=taskflow_True__sleep_60__inlets_True__outlets_True,
execution_date=20230615T183240, start_date=20230615T190008,
end_date=20230615T190108
```
This error consistently shows up _after_ the body of the task is done
executing. Presumably it is failing when Airflow is trying to update the task
state to "success" in the metadata DB. The error also only shows up on Tasks
that run longer than 5 minutes, which corresponds to how we have configured
Postgres regarding idle transactions (`idle_in_transaction_session_timeout =
300000`).
After much troubleshooting, I tracked this down to the fact that we had
started adding
[`inlets`](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html)
to tasks in that DAG. Removing `inlets` and re-running the broken tasks
resolves the error.
My theory is that something around this feature is not properly cleaning up
its DB connection, leaving a transaction open that eventually causes the
connection to get killed by the DB backend. SQLAlchemy must be keeping that
zombie connection in its connection pool, which is then re-used when some
internal Airflow method uses `create_session`/`@provide_session`.
### What you think should happen instead
Tasks should never get a DB connection that was not properly terminated.
### How to reproduce
First edit `postgresql.conf` to set `idle_in_transaction_session_timeout =
60000`, then run the following DAG:
```
#!/usr/bin/env python3
import datetime
import logging
import time
from airflow.datasets import Dataset
from airflow.decorators import dag, task
logger = logging.getLogger(__name__)
ds_in = Dataset('in')
ds_out = Dataset('out')
@dag(
schedule='@daily',
start_date=datetime.datetime(2023, 6, 15),
default_args={
'retries': 0,
},
)
def test_transaction_timeout():
"""
With PostgreSQL backend, set `idle_in_transaction_session_timeout =
60000` in `postgresql.conf`
"""
def make_task(sleep_time, use_inlets, use_outlets):
task_id =
f'sleep_{sleep_time}__inlets_{use_inlets}__outlets_{use_outlets}'
@task(task_id=task_id)
def test_task():
logger.info('hello')
time.sleep(sleep_time)
logger.info('world')
if use_inlets:
test_task = test_task.override(inlets=[ds_in])
if use_outlets:
test_task = test_task.override(outlets=[ds_out])
return test_task()
for sleep_time in [30, 60, 90]:
for use_inlets in [True, False]:
for use_outlets in [True, False]:
make_task(sleep_time, use_inlets, use_outlets)
```
Running this DAG should look like:

### Operating System
CentOS Stream 8
### Versions of Apache Airflow Providers
N/A
### Deployment
Other
### Deployment details
Airflow 2.5.1
Reproduced in self-hosted deployment and locally in standalone mode
Postgres DB backend (v12.11)
### Anything else
This affects both `@task` decorated functions as well as classic Operators.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]