hterik opened a new issue #19657:
URL: https://github.com/apache/airflow/issues/19657
### Apache Airflow version
2.2.1
### Operating System
Debian GNU/Linux 10 (buster)
### Versions of Apache Airflow Providers
apache-airflow-providers-amazon==2.3.0
apache-airflow-providers-celery==2.1.0
apache-airflow-providers-cncf-kubernetes==2.0.3
apache-airflow-providers-docker==2.2.0
apache-airflow-providers-elasticsearch==2.0.3
apache-airflow-providers-ftp==2.0.1
apache-airflow-providers-google==6.0.0
apache-airflow-providers-grpc==2.0.1
apache-airflow-providers-hashicorp==2.1.1
apache-airflow-providers-http==2.0.1
apache-airflow-providers-imap==2.0.1
apache-airflow-providers-microsoft-azure==3.2.0
apache-airflow-providers-mysql==2.1.1
apache-airflow-providers-odbc==2.0.1
apache-airflow-providers-postgres==2.3.0
apache-airflow-providers-redis==2.0.1
apache-airflow-providers-sendgrid==2.0.1
apache-airflow-providers-sftp==2.1.1
apache-airflow-providers-slack==4.1.0
apache-airflow-providers-sqlite==2.0.1
apache-airflow-providers-ssh==2.2.0
### Deployment
Other Docker-based deployment
### Deployment details
_No response_
### What happened
1. Create a taskflow dag with `@task(executor_config={"pod_override":
POD_OVERRIDE})`, where POD_OVERRIDE contains a faulty configured pod.
2. Start dag
3. Wait a few seconds
4. Abort the dagrun by marking it as failed
5. Fix the dag and wait for airflow to reload dags
5. Inspect airflow logs, it will print following once every 1 seconds,
forever:
```
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
line 260, in run_pod_async
body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
File
"/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py",
line 6174, in create_namespaced_pod
(data) = self.create_namespaced_pod_with_http_info(namespace, body,
**kwargs) # noqa: E501
File
"/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py",
line 6265, in create_namespaced_pod_with_http_info
collection_formats=collection_formats)
File
"/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py",
line 345, in call_api
_preload_content, _request_timeout)
File
"/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py",
line 176, in __call_api
_request_timeout=_request_timeout)
File
"/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py",
line 388, in request
body=body)
File
"/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py",
line 278, in POST
body=body)
File
"/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py",
line 231, in request
raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (422)
Reason: Unprocessable Entity
[2021-11-17 16:21:36,370] {kubernetes_executor.py:608} WARNING -
ApiException when attempting to run task, re-queueing. Message: Pod
"mypod.a6861d053585481492e36034957e638d" is invalid: [spec.nodeSelector:
Invalid value: REDACTED
```
### What you expected to happen
A. After a dagrun/task has been aborted, the executor should not retry it
any more.
B. If launching task has failed X nr of times, it should not retry any more.
### How to reproduce
_No response_
### Anything else
Quick review of airflow.executor.kubernetes_executor.run() function where
exception is thrown. It appears task is unconditionally put back on queue and
in next iteration the task state is not inspected before issuing the kubernetes
api call.
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]