timgriffiths opened a new issue #16316:
URL: https://github.com/apache/airflow/issues/16316
**Description**
Some times we have more jobs than is able to be run in our k8s cluster, it
would be super nice if airflow had a feature that could catch the below error
and just re-queue the task rather than marking it as failed. Yes we can tune
this with queue sizes but if we lose nodes from the namespace or something like
this there is no feedback mechanism as far as I can tell to tell airflow to
slow down the number of jobs being scheduled.
Below in an example of the error that gets flagged up.
```
[2021-06-02` 11:09:31,774] {taskinstance.py:1481} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1336, in _execute_task
result = task_copy.execute(context=context)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
line 368, in execute
final_state, _, result = self.create_new_pod_for_operator(labels,
launcher)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
line 520, in create_new_pod_for_operator
launcher.start_pod(self.pod,
startup_timeout=self.startup_timeout_seconds)
File
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line
329, in wrapped_f
return self.call(f, *args, **kw)
File
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line
409, in call
do = self.iter(retry_state=retry_state)
File
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line
356, in iter
return fut.result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in
result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in
__get_result
raise self._exception
File
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line
412, in call
result = fn(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py",
line 122, in start_pod
resp = self.run_pod_async(pod)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py",
line 94, in run_pod_async
raise e
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py",
line 88, in run_pod_async
resp = self._client.create_namespaced_pod(
File
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py",
line 6174, in create_namespaced_pod
(data) = self.create_namespaced_pod_with_http_info(namespace, body,
**kwargs) # noqa: E501
File
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py",
line 6251, in create_namespaced_pod_with_http_info
return self.api_client.call_api(
File
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",
line 340, in call_api
return self.__call_api(resource_path, method,
File
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",
line 172, in __call_api
response_data = self.request(
File
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",
line 382, in request
return self.rest_client.POST(url,
File
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py",
line 272, in POST
return self.request("POST", url,
File
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py",
line 231, in request
raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id':
'fd69c385-845e-494b-8392-6a444e923378', 'Cache-Control': 'no-cache, private',
'Content-Type': 'application/json', 'Date': 'Wed, 02 Jun 2021 11:09:31 GMT',
'Content-Length': '383'})
HTTP response body:
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods
\"dean-fun-1295-backtest-2.64852cabc5194f149de539095fdede1d\" is forbidden:
exceeded quota: mem-cpu, requested: limits.cpu=2, used: limits.cpu=500,
limited:
limits.cpu=500","reason":"Forbidden","details":{"name":"dean-fun-1295-backtest-2.64852cabc5194f149de539095fdede1d","kind":"pods"},"code":403}
[2021-06-02 11:09:31,778] {taskinstance.py:1524} INFO - Marking task as
FAILED. dag_id=abc, task_id=backtest-2, execution_date=20210602T110820,
start_date=20210602T110931, end_date=20210602T110931
```
**Use case / motivation**
Ideally if this exception was raised the task would just re-queue itself
with some sort of time delay before it tried again.
**Are you willing to submit a PR?**
Sorry I don't have the technical skill to help here
**Related Issues**
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]