timgriffiths opened a new issue #16316:
URL: https://github.com/apache/airflow/issues/16316


   **Description**
   
   Some times we have more jobs than is able to be run in our k8s cluster, it 
would be super nice if airflow had a feature that could catch the below error 
and just re-queue the task rather than marking it as failed. Yes we can tune 
this with queue sizes but if we lose nodes from the namespace or something like 
this there is no feedback mechanism as far as I can tell to tell airflow to 
slow down the number of jobs being scheduled. 
   
   Below in an example of the error that gets flagged up.
   
   ```
   [2021-06-02` 11:09:31,774] {taskinstance.py:1481} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1137, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1311, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1336, in _execute_task
       result = task_copy.execute(context=context)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
 line 368, in execute
       final_state, _, result = self.create_new_pod_for_operator(labels, 
launcher)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
 line 520, in create_new_pod_for_operator
       launcher.start_pod(self.pod, 
startup_timeout=self.startup_timeout_seconds)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
329, in wrapped_f
       return self.call(f, *args, **kw)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
409, in call
       do = self.iter(retry_state=retry_state)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
356, in iter
       return fut.result()
     File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in 
result
       return self.__get_result()
     File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in 
__get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
412, in call
       result = fn(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py",
 line 122, in start_pod
       resp = self.run_pod_async(pod)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py",
 line 94, in run_pod_async
       raise e
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/utils/pod_launcher.py",
 line 88, in run_pod_async
       resp = self._client.create_namespaced_pod(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py",
 line 6174, in create_namespaced_pod
       (data) = self.create_namespaced_pod_with_http_info(namespace, body, 
**kwargs)  # noqa: E501
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/core_v1_api.py",
 line 6251, in create_namespaced_pod_with_http_info
       return self.api_client.call_api(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",
 line 340, in call_api
       return self.__call_api(resource_path, method,
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",
 line 172, in __call_api
       response_data = self.request(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",
 line 382, in request
       return self.rest_client.POST(url,
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", 
line 272, in POST
       return self.request("POST", url,
     File 
"/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", 
line 231, in request
       raise ApiException(http_resp=r)
   kubernetes.client.rest.ApiException: (403)
   Reason: Forbidden
   HTTP response headers: HTTPHeaderDict({'Audit-Id': 
'fd69c385-845e-494b-8392-6a444e923378', 'Cache-Control': 'no-cache, private', 
'Content-Type': 'application/json', 'Date': 'Wed, 02 Jun 2021 11:09:31 GMT', 
'Content-Length': '383'})
   HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods
 \"dean-fun-1295-backtest-2.64852cabc5194f149de539095fdede1d\" is forbidden: 
exceeded quota: mem-cpu, requested: limits.cpu=2, used: limits.cpu=500, 
limited: 
limits.cpu=500","reason":"Forbidden","details":{"name":"dean-fun-1295-backtest-2.64852cabc5194f149de539095fdede1d","kind":"pods"},"code":403}
   
   
   [2021-06-02 11:09:31,778] {taskinstance.py:1524} INFO - Marking task as 
FAILED. dag_id=abc, task_id=backtest-2, execution_date=20210602T110820, 
start_date=20210602T110931, end_date=20210602T110931
   ```
   
   **Use case / motivation**
   
   Ideally if this exception was raised the task would just re-queue itself 
with some sort of time delay before it tried again.
   
   **Are you willing to submit a PR?**
   
   Sorry I don't have the technical skill to help here
   
   **Related Issues**
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to