ephraimbuddy commented on issue #22612:
URL: https://github.com/apache/airflow/issues/22612#issuecomment-2136652960
```
[2024-05-29T06:28:38.937+0000] {process_utils.py:132} INFO - Sending
Signals.SIGTERM to group 97. PIDs of all processes in the group: [7350, 7363,
97]
[2024-05-29T06:28:38.939+0000] {process_utils.py:87} INFO - Sending the
signal Signals.SIGTERM to group 97
[2024-05-29T06:28:42.204+0000] {settings.py:425} DEBUG - Disposing DB
connection pool (PID 7350)
[2024-05-29T06:28:42.212+0000] {settings.py:425} DEBUG - Disposing DB
connection pool (PID 7363)
[2024-05-29T06:28:42.217+0000] {process_utils.py:263} INFO - Waiting up to 5
seconds for processes to exit...
[2024-05-29T06:28:42.237+0000] {process_utils.py:80} INFO - Process
psutil.Process(pid=7363, status='terminated', started='06:28:38') (7363)
terminated with exit code None
[2024-05-29T06:28:42.237+0000] {process_utils.py:80} INFO - Process
psutil.Process(pid=97, status='terminated', exitcode=0, started='06:18:49')
(97) terminated with exit code 0
[2024-05-29T06:28:42.237+0000] {process_utils.py:80} INFO - Process
psutil.Process(pid=7350, status='terminated', started='06:28:36') (7350)
terminated with exit code None
[2024-05-29T06:28:42.238+0000] {kubernetes_executor.py:740} INFO - Shutting
down Kubernetes executor
[2024-05-29T06:28:42.238+0000] {kubernetes_executor.py:742} DEBUG - Flushing
task_queue...
[2024-05-29T06:28:42.238+0000] {scheduler_job_runner.py:880} ERROR -
Exception when executing Executor.end on KubernetesExecutor(parallelism=32)
Traceback (most recent call last):
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 856, in
_execute
self._run_scheduler_loop()
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 989, in
_run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1071, in
_do_scheduling
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
File "/opt/airflow/airflow/utils/retries.py", line 89, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger,
**retry_kwargs):
File
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line
435, in __iter__
do = self.iter(retry_state=retry_state)
File
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line
368, in iter
result = action(retry_state)
File
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line
390, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in
result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in
__get_result
raise self._exception
File "/opt/airflow/airflow/utils/retries.py", line 98, in wrapped_function
return func(*args, **kwargs)
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1415, in
_schedule_all_dag_runs
callback_tuples = [(run, self._schedule_dag_run(run, session=session))
for run in dag_runs]
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1415, in
<listcomp>
callback_tuples = [(run, self._schedule_dag_run(run, session=session))
for run in dag_runs]
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1433, in
_schedule_dag_run
dag_model = DM.get_dagmodel(dag_run.dag_id, session)
File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/models/dag.py", line 3737, in get_dagmodel
return session.get(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 2853, in get
return self._get_impl(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 2975, in _get_impl
return db_load_fn(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py",
line 530, in load_on_pk_identity
session.execute(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 1720, in execute
result = compile_state_cls.orm_setup_cursor_result(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/context.py",
line 349, in orm_setup_cursor_result
return loading.instances(result, querycontext)
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py",
line 69, in instances
*[
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py",
line 70, in <listcomp>
query_entity.row_processor(context, cursor)
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/context.py",
line 2631, in row_processor
_instance = loading._instance_processor(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py",
line 796, in _instance_processor
prop.create_row_processor(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/interfaces.py",
line 658, in create_row_processor
strat.create_row_processor(
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 263, in
_exit_gracefully
sys.exit(os.EX_OK)
SystemExit: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 878, in
_execute
executor.end()
File
"/opt/airflow/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 743, in end
self._flush_task_queue()
File
"/opt/airflow/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 699, in _flush_task_queue
self.log.debug("Executor shutting down, task_queue approximate size=%d",
self.task_queue.qsize())
File "<string>", line 2, in qsize
File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 834, in
_callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 206,
in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 411,
in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 368,
in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[2024-05-29T06:28:42.247+0000] {process_utils.py:132} INFO - Sending
Signals.SIGTERM to group 97. PIDs of all processes in the group: []
[2024-05-29T06:28:42.247+0000] {process_utils.py:87} INFO - Sending the
signal Signals.SIGTERM to group 97
[2024-05-29T06:28:42.247+0000] {process_utils.py:101} INFO - Sending the
signal Signals.SIGTERM to process 97 as process group is missing.
[2024-05-29T06:28:42.247+0000] {scheduler_job_runner.py:886} INFO - Exited
execute loop
[2024-05-29T06:28:42.258+0000] {cli_action_loggers.py:94} DEBUG - Calling
callbacks: []
[2024-05-29T06:28:42.260+0000] {settings.py:425} DEBUG - Disposing DB
connection pool (PID 11)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]