zchunhai opened a new issue, #41992:
URL: https://github.com/apache/airflow/issues/41992
### Apache Airflow version
2.10.0
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
[2024-09-04T16:08:33.735+0800] {kubernetes_executor.py:356} INFO - Changing
state of (TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0',
task_id='table_counts-child-3',
run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1,
map_index=-1), None,
'7402a0ed-4746-44b6-9a91-5e1682f32cf0-table-counts-child-3-p3lreggg',
'airflow', '41210824') to None
[2024-09-04T16:08:33.741+0800] {kubernetes_executor.py:441} INFO - Deleted
pod: TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0',
task_id='table_counts-child-3',
run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1,
map_index=-1) in namespace airflow
[2024-09-04T16:08:33.744+0800] {kubernetes_executor.py:360} ERROR -
Exception: None is not a valid TaskInstanceState when attempting to change
state of (TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0',
task_id='table_counts-child-3',
run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1,
map_index=-1), None,
'7402a0ed-4746-44b6-9a91-5e1682f32cf0-table-counts-child-3-p3lreggg',
'airflow', '41210824') to None, re-queueing.
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 358, in sync
self._change_state(key, state, pod_name, namespace)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 456, in _change_state
state = TaskInstanceState(state)
File "/usr/local/lib/python3.10/enum.py", line 385, in __call__
return cls.__new__(cls, value)
File "/usr/local/lib/python3.10/enum.py", line 710, in __new__
raise ve_exc
ValueError: None is not a valid TaskInstanceState
[2024-09-04T16:08:33.746+0800] {kubernetes_executor.py:356} INFO - Changing
state of (TaskInstanceKey(dag_id='7402a0ed-4746-44b6-9a91-5e1682f32cf0',
task_id='table_counts-child-3',
run_id='scheduled__2024-09-03T08:03:23.150918+00:00', try_number=1,
map_index=-1), None,
'7402a0ed-4746-44b6-9a91-5e1682f32cf0-table-counts-child-3-p3lreggg',
'airflow', '41210824') to None
[2024-09-04T16:08:33.747+0800] {scheduler_job_runner.py:260} INFO - Exiting
gracefully upon receiving signal 15
[2024-09-04T16:08:34.750+0800] {process_utils.py:132} INFO - Sending
Signals.SIGTERM to group 2540. PIDs of all processes in the group: [20146,
20284, 2540]
[2024-09-04T16:08:34.750+0800] {process_utils.py:87} INFO - Sending the
signal Signals.SIGTERM to group 2540
[2024-09-04T16:08:37.565+0800] {process_utils.py:266} INFO - Waiting up to 5
seconds for processes to exit...
[2024-09-04T16:08:37.572+0800] {process_utils.py:80} INFO - Process
psutil.Process(pid=20146, status='terminated', started='16:08:33') (20146)
terminated with exit code None
[2024-09-04T16:08:37.586+0800] {process_utils.py:266} INFO - Waiting up to 5
seconds for processes to exit...
[2024-09-04T16:08:37.586+0800] {process_utils.py:80} INFO - Process
psutil.Process(pid=20284, status='terminated', started='16:08:34') (20284)
terminated with exit code None
[2024-09-04T16:08:37.613+0800] {process_utils.py:80} INFO - Process
psutil.Process(pid=2540, status='terminated', exitcode=0, started='16:04:42')
(2540) terminated with exit code 0
[2024-09-04T16:08:37.615+0800] {scheduler_job_runner.py:1001} ERROR -
Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 358, in sync
self._change_state(key, state, pod_name, namespace)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 440, in _change_state
self.kube_scheduler.delete_pod(pod_name=pod_name, namespace=namespace)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
line 416, in delete_pod
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
File
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/models/v1_delete_options.py",
line 58, in __init__
local_vars_configuration = Configuration()
File
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
line 126, in __init__
self.debug = False
File
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
line 271, in debug
logger.setLevel(logging.WARNING)
File "/usr/local/lib/python3.10/logging/__init__.py", line 1453, in
setLevel
self.manager._clear_cache()
File "/usr/local/lib/python3.10/logging/__init__.py", line 1412, in
_clear_cache
logger._cache.clear()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 263, in _exit_gracefully
sys.exit(os.EX_OK)
SystemExit: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 984, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1127, in _run_scheduler_loop
executor.heartbeat()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/traces/tracer.py",
line 58, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py",
line 247, in heartbeat
self.sync()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 368, in sync
self.result_queue.task_done()
File "<string>", line 2, in task_done
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in
_callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206,
in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411,
in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368,
in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[2024-09-04T16:08:37.619+0800] {kubernetes_executor.py:695} INFO - Shutting
down Kubernetes executor
[2024-09-04T16:08:37.620+0800] {scheduler_job_runner.py:1008} ERROR -
Exception when executing Executor.end on KubernetesExecutor(parallelism=32)
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 358, in sync
self._change_state(key, state, pod_name, namespace)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 440, in _change_state
self.kube_scheduler.delete_pod(pod_name=pod_name, namespace=namespace)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
line 416, in delete_pod
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
File
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/models/v1_delete_options.py",
line 58, in __init__
local_vars_configuration = Configuration()
File
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
line 126, in __init__
self.debug = False
File
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
line 271, in debug
logger.setLevel(logging.WARNING)
File "/usr/local/lib/python3.10/logging/__init__.py", line 1453, in
setLevel
self.manager._clear_cache()
File "/usr/local/lib/python3.10/logging/__init__.py", line 1412, in
_clear_cache
logger._cache.clear()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 263, in _exit_gracefully
sys.exit(os.EX_OK)
SystemExit: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 984, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1127, in _run_scheduler_loop
executor.heartbeat()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/traces/tracer.py",
line 58, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py",
line 247, in heartbeat
self.sync()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 368, in sync
self.result_queue.task_done()
File "<string>", line 2, in task_done
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in
_callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206,
in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411,
in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368,
in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1006, in _execute
executor.end()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 698, in end
self._flush_task_queue()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 654, in _flush_task_queue
self.log.debug("Executor shutting down, task_queue approximate size=%d",
self.task_queue.qsize())
File "<string>", line 2, in qsize
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in
_callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206,
in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411,
in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368,
in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[2024-09-04T16:08:37.624+0800] {process_utils.py:132} INFO - Sending
Signals.SIGTERM to group 2540. PIDs of all processes in the group: []
[2024-09-04T16:08:37.625+0800] {process_utils.py:87} INFO - Sending the
signal Signals.SIGTERM to group 2540
[2024-09-04T16:08:37.625+0800] {process_utils.py:101} INFO - Sending the
signal Signals.SIGTERM to process 2540 as process group is missing.
[2024-09-04T16:08:37.626+0800] {scheduler_job_runner.py:1014} INFO - Exited
execute loop
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 358, in sync
self._change_state(key, state, pod_name, namespace)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 440, in _change_state
self.kube_scheduler.delete_pod(pod_name=pod_name, namespace=namespace)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py",
line 416, in delete_pod
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
File
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/models/v1_delete_options.py",
line 58, in __init__
local_vars_configuration = Configuration()
File
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
line 126, in __init__
self.debug = False
File
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/configuration.py",
line 271, in debug
logger.setLevel(logging.WARNING)
File "/usr/local/lib/python3.10/logging/__init__.py", line 1453, in
setLevel
self.manager._clear_cache()
File "/usr/local/lib/python3.10/logging/__init__.py", line 1412, in
_clear_cache
logger._cache.clear()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 263, in _exit_gracefully
sys.exit(os.EX_OK)
SystemExit: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line
62, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line
115, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/providers_configuration_loader.py",
line 55, in wrapped_function
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
line 59, in scheduler
run_command_with_daemon_option(
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
line 62, in <lambda>
callback=lambda: _run_scheduler_job(args),
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
line 48, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py",
line 97, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/job.py", line
421, in run_job
return execute_job(job, execute_callable=execute_callable)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/job.py", line
450, in execute_job
ret = execute_callable()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 984, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1127, in _run_scheduler_loop
executor.heartbeat()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/traces/tracer.py",
line 58, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py",
line 247, in heartbeat
self.sync()
File
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py",
line 368, in sync
self.result_queue.task_done()
File "<string>", line 2, in task_done
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in
_callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206,
in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411,
in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368,
in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
### What you think should happen instead?
_No response_
### How to reproduce
Use the Kubernetes scheduler of Airflow to execute a DAG that includes a
PythonOperator.
### Operating System
Centos 7.9
### Versions of Apache Airflow Providers
apache_airflow_providers_cncf_kubernetes-7.8.0
### Deployment
Other 3rd-party Helm chart
### Deployment details
https://github.com/open-metadata/openmetadata-helm-charts
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]