zchunhai opened a new issue, #41992:
URL: https://github.com/apache/airflow/issues/41992

   ### Apache Airflow version
   
   2.10.0
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   [2024-09-04T16:08:33.735+0800] {kubernetes_executor.py:356} INFO - Changing 
state of (TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0', 
task_id='table_counts-child-3', 
run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1, 
map_index=-1), None, 
'7402a0ed-4746-44b6-9a91-5e1682f32cf0-table-counts-child-3-p3lreggg', 
'airflow', '41210824') to None
   [2024-09-04T16:08:33.741+0800] {kubernetes_executor.py:441} INFO - Deleted 
pod: TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0', 
task_id='table_counts-child-3', 
run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1, 
map_index=-1) in namespace airflow
   [2024-09-04T16:08:33.744+0800] {kubernetes_executor.py:360} ERROR - 
Exception: None is not a valid TaskInstanceState when attempting to change 
state of (TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0', 
task_id='table_counts-child-3', 
run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1, 
map_index=-1), None, 
'7402a0ed-4746-44b6-9a91-5e1682f32cf0-table-counts-child-3-p3lreggg', 
'airflow', '41210824') to None, re-queueing.
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 358, in sync
       self._change_state(key, state, pod_name, namespace)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 456, in _change_state
       state = TaskInstanceState(state)
     File "/usr/local/lib/python3.10/enum.py", line 385, in __call__
       return cls.__new__(cls, value)
     File "/usr/local/lib/python3.10/enum.py", line 710, in __new__
       raise ve_exc
   ValueError: None is not a valid TaskInstanceState
   [2024-09-04T16:08:33.746+0800] {kubernetes_executor.py:356} INFO - Changing 
state of (TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0', 
task_id='table_counts-child-3', 
run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1, 
map_index=-1), None, 
'7402a0ed-4746-44b6-9a91-5e1682f32cf0-table-counts-child-3-p3lreggg', 
'airflow', '41210824') to None
   [2024-09-04T16:08:33.747+0800] {scheduler_job_runner.py:260} INFO - Exiting 
gracefully upon receiving signal 15
   [2024-09-04T16:08:34.750+0800] {process_utils.py:132} INFO - Sending 
Signals.SIGTERM to group 2540. PIDs of all processes in the group: [20146, 
20284, 2540]
   [2024-09-04T16:08:34.750+0800] {process_utils.py:87} INFO - Sending the 
signal Signals.SIGTERM to group 2540
   [2024-09-04T16:08:37.565+0800] {process_utils.py:266} INFO - Waiting up to 5 
seconds for processes to exit...
   [2024-09-04T16:08:37.572+0800] {process_utils.py:80} INFO - Process 
psutil.Process(pid=20146, status='terminated', started='16:08:33') (20146) 
terminated with exit code None
   [2024-09-04T16:08:37.586+0800] {process_utils.py:266} INFO - Waiting up to 5 
seconds for processes to exit...
   [2024-09-04T16:08:37.586+0800] {process_utils.py:80} INFO - Process 
psutil.Process(pid=20284, status='terminated', started='16:08:34') (20284) 
terminated with exit code None
   [2024-09-04T16:08:37.613+0800] {process_utils.py:80} INFO - Process 
psutil.Process(pid=2540, status='terminated', exitcode=0, started='16:04:42') 
(2540) terminated with exit code 0
   [2024-09-04T16:08:37.615+0800] {scheduler_job_runner.py:1001} ERROR - 
Exception when executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 358, in sync
       self._change_state(key, state, pod_name, namespace)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 440, in _change_state
       self.kube_scheduler.delete_pod(pod_name=pod_name, namespace=namespace)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
 line 416, in delete_pod
       body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
     File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/models/v1_delete_options.py",
 line 58, in __init__
       local_vars_configuration = Configuration()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
 line 126, in __init__
       self.debug = False
     File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
 line 271, in debug
       logger.setLevel(logging.WARNING)
     File "/usr/local/lib/python3.10/logging/__init__.py", line 1453, in 
setLevel
       self.manager._clear_cache()
     File "/usr/local/lib/python3.10/logging/__init__.py", line 1412, in 
_clear_cache
       logger._cache.clear()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 263, in _exit_gracefully
       sys.exit(os.EX_OK)
   SystemExit: 0
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 984, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1127, in _run_scheduler_loop
       executor.heartbeat()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py",
 line 247, in heartbeat
       self.sync()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 368, in sync
       self.result_queue.task_done()
     File "<string>", line 2, in task_done
     File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in 
_callmethod
       conn.send((self._id, methodname, args, kwds))
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, 
in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, 
in _send_bytes
       self._send(header + buf)
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, 
in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
   [2024-09-04T16:08:37.619+0800] {kubernetes_executor.py:695} INFO - Shutting 
down Kubernetes executor
   [2024-09-04T16:08:37.620+0800] {scheduler_job_runner.py:1008} ERROR - 
Exception when executing Executor.end on KubernetesExecutor(parallelism=32)
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 358, in sync
       self._change_state(key, state, pod_name, namespace)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 440, in _change_state
       self.kube_scheduler.delete_pod(pod_name=pod_name, namespace=namespace)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
 line 416, in delete_pod
       body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
     File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/models/v1_delete_options.py",
 line 58, in __init__
       local_vars_configuration = Configuration()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
 line 126, in __init__
       self.debug = False
     File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
 line 271, in debug
       logger.setLevel(logging.WARNING)
     File "/usr/local/lib/python3.10/logging/__init__.py", line 1453, in 
setLevel
       self.manager._clear_cache()
     File "/usr/local/lib/python3.10/logging/__init__.py", line 1412, in 
_clear_cache
       logger._cache.clear()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 263, in _exit_gracefully
       sys.exit(os.EX_OK)
   SystemExit: 0
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 984, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1127, in _run_scheduler_loop
       executor.heartbeat()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py",
 line 247, in heartbeat
       self.sync()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 368, in sync
       self.result_queue.task_done()
     File "<string>", line 2, in task_done
     File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in 
_callmethod
       conn.send((self._id, methodname, args, kwds))
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, 
in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, 
in _send_bytes
       self._send(header + buf)
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, 
in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1006, in _execute
       executor.end()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 698, in end
       self._flush_task_queue()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 654, in _flush_task_queue
       self.log.debug("Executor shutting down, task_queue approximate size=%d", 
self.task_queue.qsize())
     File "<string>", line 2, in qsize
     File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in 
_callmethod
       conn.send((self._id, methodname, args, kwds))
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, 
in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, 
in _send_bytes
       self._send(header + buf)
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, 
in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
   [2024-09-04T16:08:37.624+0800] {process_utils.py:132} INFO - Sending 
Signals.SIGTERM to group 2540. PIDs of all processes in the group: []
   [2024-09-04T16:08:37.625+0800] {process_utils.py:87} INFO - Sending the 
signal Signals.SIGTERM to group 2540
   [2024-09-04T16:08:37.625+0800] {process_utils.py:101} INFO - Sending the 
signal Signals.SIGTERM to process 2540 as process group is missing.
   [2024-09-04T16:08:37.626+0800] {scheduler_job_runner.py:1014} INFO - Exited 
execute loop
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 358, in sync
       self._change_state(key, state, pod_name, namespace)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 440, in _change_state
       self.kube_scheduler.delete_pod(pod_name=pod_name, namespace=namespace)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
 line 416, in delete_pod
       body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
     File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/models/v1_delete_options.py",
 line 58, in __init__
       local_vars_configuration = Configuration()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
 line 126, in __init__
       self.debug = False
     File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
 line 271, in debug
       logger.setLevel(logging.WARNING)
     File "/usr/local/lib/python3.10/logging/__init__.py", line 1453, in 
setLevel
       self.manager._clear_cache()
     File "/usr/local/lib/python3.10/logging/__init__.py", line 1412, in 
_clear_cache
       logger._cache.clear()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 263, in _exit_gracefully
       sys.exit(os.EX_OK)
   SystemExit: 0
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 
62, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 
115, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
 line 59, in scheduler
       run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
 line 62, in <lambda>
       callback=lambda: _run_scheduler_job(args),
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
 line 48, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", 
line 97, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/job.py", line 
421, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/job.py", line 
450, in execute_job
       ret = execute_callable()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 984, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1127, in _run_scheduler_loop
       executor.heartbeat()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py",
 line 247, in heartbeat
       self.sync()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
 line 368, in sync
       self.result_queue.task_done()
     File "<string>", line 2, in task_done
     File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in 
_callmethod
       conn.send((self._id, methodname, args, kwds))
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, 
in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, 
in _send_bytes
       self._send(header + buf)
     File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, 
in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   Use the Kubernetes scheduler of Airflow to execute a DAG that includes a 
PythonOperator.
   
   
   ### Operating System
   
   Centos 7.9
   
   ### Versions of Apache Airflow Providers
   
   apache_airflow_providers_cncf_kubernetes-7.8.0
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   https://github.com/open-metadata/openmetadata-helm-charts
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to