antoniocorralsierra opened a new issue, #39239:
URL: https://github.com/apache/airflow/issues/39239

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-cncf-kubernetes 8.0.1
   apache-airflow-providers-celery          3.6.1
   
   ### Apache Airflow version
   
   2.8.3
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   I have an Airflow instance deploy on GKE cluster with helm (keda is activate 
on workers to scale from 1 to 5 with a cooldownPeriod set to 240). I use 
CeleryKubernetesExecutor and I run the tasks with KubernetesPodOperator on the 
celery workers. To not have an active task on celery worker while 
KubernetesPodOperator running I activate deferrable mode on them. I set 
poll_interval to 10 seconds. After that many errors with 404 status code 
appears.
   
   Situation:
   
   `example_task = KubernetesPodOperator(task_id="example_task", 
name="example_task", get_logs=True, on_finish_action="delete_pod", 
log_events_on_failure=True, deferrable=True, poll_interval=10, 
logging_interval=None)`
   
   We can suppose that the rest of input params are correct.
   
   The steps are:
   
   1. The celery worker get the task.
   2. Launch the KubernetesPodOperators that create a new pod on the cluster 
that execute the task.
   3. The status of the task change to deferred.
   4. Checking the triggerer log I see that task is complete: 
`[2024-04-24T12:48:57.740+0000] {triggerer_job_runner.py:623} INFO - trigger 
example_task (ID 668324) completed`
   5. Check the status of the pod, it is completed (success) and finished at 
[2024-04-24 12:48:51.122].
   6. Task change to queue status, waiting for a gap on celery worker to run.
   7. When a gap is avalible on celery worker the task begin to run.
   8. After running, the task finish with error and change to up_for_retry 
status. 
   
   The task logs is:
   `[2024-04-24, 12:51:21 UTC] {taskinstance.py:2731} ERROR - Task failed with 
exception`
   `Traceback (most recent call last):`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 444, in _execute_task`
   `    result = _execute_callable(context=context, **execute_callable_kwargs)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py",
 line 414, in _execute_callable`
   `    return execute_callable(context=context, **execute_callable_kwargs)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py",
 line 1604, in resume_execution`
   `    return execute_callable(context)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 738, in trigger_reentry`
   `    self.write_logs(self.pod)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 771, in write_logs`
   `    logs = self.pod_manager.read_pod_logs(`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
324, in wrapped_f`
   `    return self(f, *args, **kw)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
404, in __call__`
   `    do = self.iter(retry_state=retry_state)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
360, in iter`
   `    raise retry_exc.reraise()`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
193, in reraise`
   `    raise self.last_attempt.result()`
   `  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, 
in result`
   `    return self.__get_result()`
   `  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, 
in __get_result`
   `    raise self._exception`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
407, in __call__`
   `    result = fn(*args, **kwargs)`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
 line 668, in read_pod_logs`
   `    logs = self._client.read_namespaced_pod_log(`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py",
 line 23957, in read_namespaced_pod_log`
   `    return self.read_namespaced_pod_log_with_http_info(name, namespace, 
**kwargs)  # noqa: E501`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api/core_v1_api.py",
 line 24076, in read_namespaced_pod_log_with_http_info`
   `    return self.api_client.call_api(`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py",
 line 348, in call_api`
   `    return self.__call_api(resource_path, method,`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py",
 line 180, in __call_api`
   `    response_data = self.request(`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/api_client.py",
 line 373, in request`
   `    return self.rest_client.GET(url,`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/rest.py", 
line 244, in GET`
   `    return self.request("GET", url,`
   `  File 
"/home/airflow/.local/lib/python3.10/site-packages/kubernetes/client/rest.py", 
line 238, in request`
   `    raise ApiException(http_resp=r)`
   `kubernetes.client.exceptions.ApiException: (404)`
   `Reason: Not Found`
   
   The trigger_reentry method causes the error. I think that this is due to the 
POST_TERMINATION_TIMEOUT value. This is set to 120 on KubernetesPodOperator and 
the worker try to read the pod log 3 minutes later (pod finish at 12:48:51 and 
worker reading at 12:51:21). In that time the pod don't exists it not possible 
read the log and 404 error is back.
   
   ### What you think should happen instead
   
   The task should finish success with a warning indicating that logs can't 
reading or maybe other option but not mark the task as failed and set to 
up_for_retry.
   
   ### How to reproduce
   
   The steps are indicate on "What happened" section. You just activate 
deferrable mode on KubernetesPodOperators and use CeleryKubernetesExecutor. 
Derive the task to celery workers.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to