asemelianov opened a new issue, #45597:
URL: https://github.com/apache/airflow/issues/45597
### Apache Airflow version
2.10.4
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
We use the official Helm Chart Airflow with configured CeleryKubernetes
Executor and csi-s3. Our scheduler restarts with different periodicity in time,
and sometimes it does not start at all. According to the logs, the problem is
with the csi-s3 driver, unmount the directory with the logs and until you
manually restart the scheduler service, it will hang with an error, but we only
have this problem with the scheduler, there are no problems with other services.
```
--- Logging error ---
Traceback (most recent call last):
File "/usr/local/lib/python3.12/logging/handlers.py", line 73, in emit
if self.shouldRollover(record):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/logging/handlers.py", line 193, in
shouldRollover
pos = self.stream.tell()
^^^^^^^^^^^^^^^^^^
OSError: [Errno 107] Transport endpoint is not connected
Call stack:
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line
62, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line
116, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
line 55, in wrapped_function
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 59, in scheduler
run_command_with_daemon_option(
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 62, in <lambda>
callback=lambda: _run_scheduler_job(args),
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 48, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line
421, in run_job
return execute_job(job, execute_callable=execute_callable)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line
450, in execute_job
ret = execute_callable()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 983, in _execute
self.processor_agent.start()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 172, in start
process.start()
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in
start
self._popen = self._Popen(self)
File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in
_Popen
return Popen(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19,
in __init__
self._launch(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71,
in _launch
code = process_obj._bootstrap(parent_sentinel=child_r)
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in
_bootstrap
self.run()
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in
run
self._target(*self._args, **self._kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 247, in _run_processor_manager
processor_manager.start()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 489, in start
return self._run_parsing_loop()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 667, in _run_parsing_loop
self.collect_results()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 1196, in collect_results
self._collect_results_from_processor(processor)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 1146, in _collect_results_from_processor
self.log.error(
Message: 'Processor for %s exited with return code %s.'
Arguments: ('/opt/airflow/dags/repo/prod/ozon/ozon_dag.py', 1)
--- Logging error ---
Traceback (most recent call last):
File "/usr/local/lib/python3.12/logging/handlers.py", line 73, in emit
if self.shouldRollover(record):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/logging/handlers.py", line 193, in
shouldRollover
pos = self.stream.tell()
^^^^^^^^^^^^^^^^^^
OSError: [Errno 107] Transport endpoint is not connected
Call stack:
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line
62, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line
116, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
line 55, in wrapped_function
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 59, in scheduler
run_command_with_daemon_option(
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 62, in <lambda>
callback=lambda: _run_scheduler_job(args),
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 48, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line
421, in run_job
return execute_job(job, execute_callable=execute_callable)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line
450, in execute_job
ret = execute_callable()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 983, in _execute
self.processor_agent.start()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 172, in start
process.start()
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in
start
self._popen = self._Popen(self)
File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in
_Popen
return Popen(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19,
in __init__
self._launch(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71,
in _launch
code = process_obj._bootstrap(parent_sentinel=child_r)
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in
_bootstrap
self.run()
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in
run
self._target(*self._args, **self._kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 247, in _run_processor_manager
processor_manager.start()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 489, in start
return self._run_parsing_loop()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 667, in _run_parsing_loop
self.collect_results()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 1196, in collect_results
self._collect_results_from_processor(processor)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 1146, in _collect_results_from_processor
self.log.error(
Message: 'Processor for %s exited with return code %s.'
Arguments: ('/opt/airflow/dags/repo/prod/utils/dag_status.py', 1)
[2025-01-11T22:44:51.127+0000] {process_utils.py:132} INFO - Sending 15 to
group 55. PIDs of all processes in the group: [55]
[2025-01-11T22:44:51.127+0000] {process_utils.py:87} INFO - Sending the
signal 15 to group 55
--- Logging error ---
Traceback (most recent call last):
File "/usr/local/lib/python3.12/logging/handlers.py", line 73, in emit
if self.shouldRollover(record):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/logging/handlers.py", line 193, in
shouldRollover
pos = self.stream.tell()
^^^^^^^^^^^^^^^^^^
OSError: [Errno 107] Transport endpoint is not connected
Call stack:
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line
62, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line
116, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
line 55, in wrapped_function
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 59, in scheduler
run_command_with_daemon_option(
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 62, in <lambda>
callback=lambda: _run_scheduler_job(args),
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
line 48, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line
421, in run_job
return execute_job(job, execute_callable=execute_callable)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line
450, in execute_job
ret = execute_callable()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 983, in _execute
self.processor_agent.start()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 172, in start
process.start()
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in
start
self._popen = self._Popen(self)
File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in
_Popen
return Popen(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19,
in __init__
self._launch(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71,
in _launch
code = process_obj._bootstrap(parent_sentinel=child_r)
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in
_bootstrap
self.run()
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in
run
self._target(*self._args, **self._kwargs)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 247, in _run_processor_manager
processor_manager.start()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 489, in start
return self._run_parsing_loop()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 573, in _run_parsing_loop
ready = multiprocessing.connection.wait(self.waitables.keys(),
timeout=poll_time)
File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 1136,
in wait
ready = selector.select(timeout)
File "/usr/local/lib/python3.12/selectors.py", line 415, in select
fd_event_list = self._selector.poll(timeout)
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
line 465, in _exit_gracefully
self.log.info("Exiting gracefully upon receiving signal %s", signum)
Message: 'Exiting gracefully upon receiving signal %s'
Arguments: (15,)
[2025-01-11T22:44:51.500+0000] {process_utils.py:80} INFO - Process
psutil.Process(pid=55, status='terminated', exitcode=0, started='2025-01-09
12:30:47') (55) terminated with exit code 0
[2025-01-11T22:44:51.501+0000] {kubernetes_executor.py:760} INFO - Shutting
down Kubernetes executor
[2025-01-11T22:44:51.501+0000] {scheduler_job_runner.py:1011} ERROR -
Exception when executing Executor.end on
CeleryKubernetesExecutor(parallelism=32)
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 987, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1176, in _run_scheduler_loop
time.sleep(min(self._scheduler_idle_sleep_time, next_event or 0))
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 263, in _exit_gracefully
sys.exit(os.EX_OK)
SystemExit: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1009, in _execute
executor.end()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_kubernetes_executor.py",
line 254, in end
self.kubernetes_executor.end()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 763, in end
self._flush_task_queue()
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 719, in _flush_task_queue
self.log.debug("Executor shutting down, task_queue approximate size=%d",
self.task_queue.qsize())
^^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 2, in qsize
File "/usr/local/lib/python3.12/multiprocessing/managers.py", line 820, in
_callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 206,
in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 427,
in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 384,
in _send
n = write(self._handle, buf)
^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
[2025-01-11T22:44:51.521+0000] {process_utils.py:132} INFO - Sending 15 to
group 55. PIDs of all processes in the group: []
[2025-01-11T22:44:51.521+0000] {process_utils.py:87} INFO - Sending the
signal 15 to group 55
[2025-01-11T22:44:51.521+0000] {process_utils.py:101} INFO - Sending the
signal 15 to process 55 as process group is missing.
[2025-01-11T22:44:51.521+0000] {scheduler_job_runner.py:1017} INFO - Exited
execute loop
INFO: detected pid 1, running init handler
```
### What you think should happen instead?
_No response_
### How to reproduce
We launch airflow, the tasks are executed and after some time the scheduler
service is restarted. It can restart and after the restart it continues to
work, or it does not restart and the service stops launching new tasks.
### Operating System
Debian GNU/Linux 12 (bookworm)
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==9.0.0
apache-airflow-providers-celery==3.8.3
apache-airflow-providers-cncf-kubernetes==9.0.1
apache-airflow-providers-common-compat==1.2.1
apache-airflow-providers-common-io==1.4.2
apache-airflow-providers-common-sql==1.19.0
apache-airflow-providers-docker==3.14.0
apache-airflow-providers-elasticsearch==5.5.2
apache-airflow-providers-fab==1.5.0
apache-airflow-providers-ftp==3.11.1
apache-airflow-providers-google==10.25.0
apache-airflow-providers-grpc==3.6.0
apache-airflow-providers-hashicorp==3.8.0
apache-airflow-providers-http==4.13.2
apache-airflow-providers-imap==3.7.0
apache-airflow-providers-microsoft-azure==11.0.0
apache-airflow-providers-mysql==5.7.3
apache-airflow-providers-odbc==4.8.0
apache-airflow-providers-openlineage==1.13.0
apache-airflow-providers-postgres==5.13.1
apache-airflow-providers-redis==3.8.0
apache-airflow-providers-sendgrid==3.6.0
apache-airflow-providers-sftp==4.11.1
apache-airflow-providers-slack==8.9.1
apache-airflow-providers-smtp==1.8.0
apache-airflow-providers-snowflake==5.8.0
apache-airflow-providers-sqlite==3.9.0
apache-airflow-providers-ssh==3.14.0
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
k8s
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]