syun64 commented on PR #13265:
URL: https://github.com/apache/airflow/pull/13265#issuecomment-1487492931
Actually, after some more debugging it looks like this issue isn't specific
to happening when processing the executor events.
This is a error traceback that happened when the scheduler was fetching
active_runs_of_dags, that put the scheduler into a bad state.
```
Traceback (most recent call last):
File "/airflow/jobs/scheduler_job.py", line 759, in _execute
self._run_scheduler_loop()
File "/airflow/jobs/scheduler_job.py", line 885, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/airflow/jobs/scheduler_job.py", line 958, in _do_scheduling
self._start_queued_dagruns(session)
File "/airflow/jobs/scheduler_job.py", line 1213, in _start_queued_dagruns
DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs),
only_running=True, session=session),
File "/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/airflow/models/dagrun.py", line 277, in active_runs_of_dags
query = query.filter(cls.dag_id.in_(list(set(dag_ids))))
File "/airflow/jobs/scheduler_job.py", line 1213, in <genexpr>
DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs),
only_running=True, session=session),
File "/sqlalchemy/orm/query.py", line 2900, in __iter__
result = self._iter()
File "/sqlalchemy/orm/query.py", line 2915, in _iter
result = self.session.execute(
File "/sqlalchemy/orm/session.py", line 1714, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/sqlalchemy/engine/base.py", line 1705, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
File "/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
File "/sqlalchemy/engine/base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
util.raise_(
File "/sqlalchemy/util/compat.py", line 210, in raise_
raise exception
File "/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) error with status
PGRES_TUPLES_OK and no message from the libpq
```
This failure happened as soon as the application came up, and I'm now
wondering if this is related to the connection pool unintentionally being
shared across forked processes as well... To continue with my investigation,
I'm testing out disabling the usage of connection pooling on my Airflow cluster
`sql_alchemy_pool_enabled` to see if I'm still able to observe this error.
If setting up that flag does resolve this issue, I wonder if that's means
there are still some edge cases that make the usage of pooling unsafe with
celery executor (unless we do the converse and launch all tasks with new Python
Interpreter)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]