asemelianov opened a new issue, #45597:
URL: https://github.com/apache/airflow/issues/45597

   ### Apache Airflow version
   
   2.10.4
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   We use the official Helm Chart Airflow with configured CeleryKubernetes 
Executor and csi-s3. Our scheduler restarts with different periodicity in time, 
and sometimes it does not start at all. According to the logs, the problem is 
with the csi-s3 driver, unmount the directory with the logs and until you 
manually restart the scheduler service, it will hang with an error, but we only 
have this problem with the scheduler, there are no problems with other services.
   
   ```
   --- Logging error ---
   Traceback (most recent call last):
     File "/usr/local/lib/python3.12/logging/handlers.py", line 73, in emit
       if self.shouldRollover(record):
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/logging/handlers.py", line 193, in 
shouldRollover
       pos = self.stream.tell()
             ^^^^^^^^^^^^^^^^^^
   OSError: [Errno 107] Transport endpoint is not connected
   Call stack:
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 
62, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
116, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 59, in scheduler
       run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 62, in <lambda>
       callback=lambda: _run_scheduler_job(args),
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 48, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
421, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
450, in execute_job
       ret = execute_callable()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 983, in _execute
       self.processor_agent.start()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 172, in start
       process.start()
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in 
start
       self._popen = self._Popen(self)
     File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in 
_Popen
       return Popen(process_obj)
     File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19, 
in __init__
       self._launch(process_obj)
     File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71, 
in _launch
       code = process_obj._bootstrap(parent_sentinel=child_r)
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in 
_bootstrap
       self.run()
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in 
run
       self._target(*self._args, **self._kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 247, in _run_processor_manager
       processor_manager.start()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 489, in start
       return self._run_parsing_loop()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 667, in _run_parsing_loop
       self.collect_results()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 1196, in collect_results
       self._collect_results_from_processor(processor)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 1146, in _collect_results_from_processor
       self.log.error(
   Message: 'Processor for %s exited with return code %s.'
   Arguments: ('/opt/airflow/dags/repo/prod/ozon/ozon_dag.py', 1)
   --- Logging error ---
   Traceback (most recent call last):
     File "/usr/local/lib/python3.12/logging/handlers.py", line 73, in emit
       if self.shouldRollover(record):
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/logging/handlers.py", line 193, in 
shouldRollover
       pos = self.stream.tell()
             ^^^^^^^^^^^^^^^^^^
   OSError: [Errno 107] Transport endpoint is not connected
   Call stack:
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 
62, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
116, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 59, in scheduler
       run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 62, in <lambda>
       callback=lambda: _run_scheduler_job(args),
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 48, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
421, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
450, in execute_job
       ret = execute_callable()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 983, in _execute
       self.processor_agent.start()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 172, in start
       process.start()
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in 
start
       self._popen = self._Popen(self)
     File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in 
_Popen
       return Popen(process_obj)
     File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19, 
in __init__
       self._launch(process_obj)
     File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71, 
in _launch
       code = process_obj._bootstrap(parent_sentinel=child_r)
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in 
_bootstrap
       self.run()
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in 
run
       self._target(*self._args, **self._kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 247, in _run_processor_manager
       processor_manager.start()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 489, in start
       return self._run_parsing_loop()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 667, in _run_parsing_loop
       self.collect_results()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 1196, in collect_results
       self._collect_results_from_processor(processor)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 1146, in _collect_results_from_processor
       self.log.error(
   Message: 'Processor for %s exited with return code %s.'
   Arguments: ('/opt/airflow/dags/repo/prod/utils/dag_status.py', 1)
   [2025-01-11T22:44:51.127+0000] {process_utils.py:132} INFO - Sending 15 to 
group 55. PIDs of all processes in the group: [55]
   [2025-01-11T22:44:51.127+0000] {process_utils.py:87} INFO - Sending the 
signal 15 to group 55
   --- Logging error ---
   Traceback (most recent call last):
     File "/usr/local/lib/python3.12/logging/handlers.py", line 73, in emit
       if self.shouldRollover(record):
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/logging/handlers.py", line 193, in 
shouldRollover
       pos = self.stream.tell()
             ^^^^^^^^^^^^^^^^^^
   OSError: [Errno 107] Transport endpoint is not connected
   Call stack:
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 
62, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
116, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 59, in scheduler
       run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 62, in <lambda>
       callback=lambda: _run_scheduler_job(args),
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py",
 line 48, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
421, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
450, in execute_job
       ret = execute_callable()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 983, in _execute
       self.processor_agent.start()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 172, in start
       process.start()
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in 
start
       self._popen = self._Popen(self)
     File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in 
_Popen
       return Popen(process_obj)
     File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19, 
in __init__
       self._launch(process_obj)
     File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71, 
in _launch
       code = process_obj._bootstrap(parent_sentinel=child_r)
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in 
_bootstrap
       self.run()
     File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in 
run
       self._target(*self._args, **self._kwargs)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 247, in _run_processor_manager
       processor_manager.start()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 489, in start
       return self._run_parsing_loop()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 573, in _run_parsing_loop
       ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
     File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 1136, 
in wait
       ready = selector.select(timeout)
     File "/usr/local/lib/python3.12/selectors.py", line 415, in select
       fd_event_list = self._selector.poll(timeout)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py",
 line 465, in _exit_gracefully
       self.log.info("Exiting gracefully upon receiving signal %s", signum)
   Message: 'Exiting gracefully upon receiving signal %s'
   Arguments: (15,)
   [2025-01-11T22:44:51.500+0000] {process_utils.py:80} INFO - Process 
psutil.Process(pid=55, status='terminated', exitcode=0, started='2025-01-09 
12:30:47') (55) terminated with exit code 0
   [2025-01-11T22:44:51.501+0000] {kubernetes_executor.py:760} INFO - Shutting 
down Kubernetes executor
   [2025-01-11T22:44:51.501+0000] {scheduler_job_runner.py:1011} ERROR - 
Exception when executing Executor.end on 
CeleryKubernetesExecutor(parallelism=32)
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 987, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1176, in _run_scheduler_loop
       time.sleep(min(self._scheduler_idle_sleep_time, next_event or 0))
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 263, in _exit_gracefully
       sys.exit(os.EX_OK)
   SystemExit: 0
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1009, in _execute
       executor.end()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_kubernetes_executor.py",
 line 254, in end
       self.kubernetes_executor.end()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 763, in end
       self._flush_task_queue()
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 719, in _flush_task_queue
       self.log.debug("Executor shutting down, task_queue approximate size=%d", 
self.task_queue.qsize())
                                                                                
^^^^^^^^^^^^^^^^^^^^^^^
     File "<string>", line 2, in qsize
     File "/usr/local/lib/python3.12/multiprocessing/managers.py", line 820, in 
_callmethod
       conn.send((self._id, methodname, args, kwds))
     File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 206, 
in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 427, 
in _send_bytes
       self._send(header + buf)
     File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 384, 
in _send
       n = write(self._handle, buf)
           ^^^^^^^^^^^^^^^^^^^^^^^^
   BrokenPipeError: [Errno 32] Broken pipe
   [2025-01-11T22:44:51.521+0000] {process_utils.py:132} INFO - Sending 15 to 
group 55. PIDs of all processes in the group: []
   [2025-01-11T22:44:51.521+0000] {process_utils.py:87} INFO - Sending the 
signal 15 to group 55
   [2025-01-11T22:44:51.521+0000] {process_utils.py:101} INFO - Sending the 
signal 15 to process 55 as process group is missing.
   [2025-01-11T22:44:51.521+0000] {scheduler_job_runner.py:1017} INFO - Exited 
execute loop
   INFO: detected pid 1, running init handler
   ```
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   We launch airflow, the tasks are executed and after some time the scheduler 
service is restarted. It can restart and after the restart it continues to 
work, or it does not restart and the service stops launching new tasks.
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==9.0.0
   apache-airflow-providers-celery==3.8.3
   apache-airflow-providers-cncf-kubernetes==9.0.1
   apache-airflow-providers-common-compat==1.2.1
   apache-airflow-providers-common-io==1.4.2
   apache-airflow-providers-common-sql==1.19.0
   apache-airflow-providers-docker==3.14.0
   apache-airflow-providers-elasticsearch==5.5.2
   apache-airflow-providers-fab==1.5.0
   apache-airflow-providers-ftp==3.11.1
   apache-airflow-providers-google==10.25.0
   apache-airflow-providers-grpc==3.6.0
   apache-airflow-providers-hashicorp==3.8.0
   apache-airflow-providers-http==4.13.2
   apache-airflow-providers-imap==3.7.0
   apache-airflow-providers-microsoft-azure==11.0.0
   apache-airflow-providers-mysql==5.7.3
   apache-airflow-providers-odbc==4.8.0
   apache-airflow-providers-openlineage==1.13.0
   apache-airflow-providers-postgres==5.13.1
   apache-airflow-providers-redis==3.8.0
   apache-airflow-providers-sendgrid==3.6.0
   apache-airflow-providers-sftp==4.11.1
   apache-airflow-providers-slack==8.9.1
   apache-airflow-providers-smtp==1.8.0
   apache-airflow-providers-snowflake==5.8.0
   apache-airflow-providers-sqlite==3.9.0
   apache-airflow-providers-ssh==3.14.0
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   k8s
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to