Gollum999 opened a new issue, #31945:
URL: https://github.com/apache/airflow/issues/31945

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   We started seeing errors like this in one of our DAGs:
   
   ``` 2023-06-15 14:01:08.791 | ERROR    | airflow.task | Task failed with 
exception
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1015, in _commit_impl
       self.engine.dialect.do_commit(self.connection)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
 line 685, in do_commit
       dbapi_connection.commit()
   psycopg2.errors.IdleInTransactionSessionTimeout: terminating connection due 
to idle-in-transaction timeout
   server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/opt/conda/envs/production/lib/python3.9/contextlib.py", line 126, 
in __exit__
       next(self.gen)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 36, in create_session
       session.commit()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 1435, in commit
       self._transaction.commit(_to_root=self.future)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
 line 836, in commit
       trans.commit()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 2378, in commit
       self._do_commit()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 2568, in _do_commit
       self._connection_commit_impl()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 2539, in _connection_commit_impl
       self.connection._commit_impl()
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1017, in _commit_impl
       self._handle_dbapi_exception(e, None, None, None, None)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 2043, in _handle_dbapi_exception
       util.raise_(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
 line 207, in raise_
       raise exception
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
 line 1015, in _commit_impl
       self.engine.dialect.do_commit(self.connection)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
 line 685, in do_commit
       dbapi_connection.commit()
   sqlalchemy.exc.InternalError: 
(psycopg2.errors.IdleInTransactionSessionTimeout) terminating connection due to 
idle-in-transaction timeout
   server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.
   
   (Background on this error at: https://sqlalche.me/e/14/2j85)
   2023-06-15 14:01:08.796 | INFO     | airflow.task | Marking task as FAILED. 
dag_id=test_transaction_timeout, 
task_id=taskflow_True__sleep_60__inlets_True__outlets_True, 
execution_date=20230615T183240, start_date=20230615T190008, 
end_date=20230615T190108
   ```
   
   This error consistently shows up _after_ the body of the task is done 
executing.  Presumably it is failing when Airflow is trying to update the task 
state to "success" in the metadata DB.  The error also only shows up on Tasks 
that run longer than 5 minutes, which corresponds to how we have configured 
Postgres regarding idle transactions (`idle_in_transaction_session_timeout = 
300000`).
   
   After much troubleshooting, I tracked this down to the fact that we had 
started adding 
[`inlets`](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html)
 to tasks in that DAG.  Removing `inlets` and re-running the broken tasks 
resolves the error.
   
   My theory is that something around this feature is not properly cleaning up 
its DB connection, leaving a transaction open that eventually causes the 
connection to get killed by the DB backend.  SQLAlchemy must be keeping that 
zombie connection in its connection pool, which is then re-used when some 
internal Airflow method uses `create_session`/`@provide_session`.
   
   ### What you think should happen instead
   
   Tasks should never get a DB connection that was not properly terminated.
   
   ### How to reproduce
   
   First edit `postgresql.conf` to set `idle_in_transaction_session_timeout = 
60000`, then run the following DAG:
   
   ```
   #!/usr/bin/env python3
   import datetime
   import logging
   import time
   
   from airflow.datasets import Dataset
   from airflow.decorators import dag, task
   
   
   logger = logging.getLogger(__name__)
   
   ds_in = Dataset('in')
   ds_out = Dataset('out')
   
   
   @dag(
       schedule='@daily',
       start_date=datetime.datetime(2023, 6, 15),
       default_args={
           'retries': 0,
       },
   )
   def test_transaction_timeout():
       """
       With PostgreSQL backend, set `idle_in_transaction_session_timeout = 
60000` in `postgresql.conf`
       """
       def make_task(sleep_time, use_inlets, use_outlets):
           task_id = 
f'sleep_{sleep_time}__inlets_{use_inlets}__outlets_{use_outlets}'
   
           @task(task_id=task_id)
           def test_task():
               logger.info('hello')
               time.sleep(sleep_time)
               logger.info('world')
   
           if use_inlets:
               test_task = test_task.override(inlets=[ds_in])
           if use_outlets:
               test_task = test_task.override(outlets=[ds_out])
   
           return test_task()
   
       for sleep_time in [30, 60, 90]:
           for use_inlets in [True, False]:
               for use_outlets in [True, False]:
                   make_task(sleep_time, use_inlets, use_outlets)
   ```
   
   Running this DAG should look like:
   
   ![Example output of above DAG](https://i.imgur.com/TWt9acA.png)
   
   ### Operating System
   
   CentOS Stream 8
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Airflow 2.5.1
   Reproduced in self-hosted deployment and locally in standalone mode
   Postgres DB backend (v12.11)
   
   ### Anything else
   
   This affects both `@task` decorated functions as well as classic Operators.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to