gkarg commented on issue #21087:
URL: https://github.com/apache/airflow/issues/21087#issuecomment-1087870738

   > I have the same issue.
   > 
   > Looks like it is happening, because they are now(don't know from which 
version) handling 410 error on kubernetes library side, there is one retry and 
then exception is raised if the event is of type = 'ERROR'
   > 
   > I checked the kubernetes library and it was changed in this pull request: 
https://github.com/kubernetes-client/python-base/pull/133/files
   > 
   > On Airflow Kubernetes Executor it is being handled here:
   > 
   > 
https://github.com/apache/airflow/blob/d7265791187fb2117dfd090cdb7cce3f8c20866c/airflow/executors/kubernetes_executor.py#L148
   > 
   > By process_error function, but it probably should be now enclosed by try 
catch ApiException with check for 410 code
   
   I concur with @arkadiusz-bach 's analysis.
   
   The error is simply happening because newer versions on kubernetes lib throw 
exceptions which airflow fails to handle.
   ```
   Traceback (most recent call last):
   File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 297, in 
_bootstrap
       self.run()
   File 
"/opt/conda/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
 line 103, in run
       kube_client, self.resource_version, self.scheduler_job_id, 
self.kube_config
   File 
"/opt/conda/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
 line 145, in _run
       for event in list_worker_pods():
   File "/opt/conda/lib/python3.7/site-packages/kubernetes/watch/watch.py", 
line 183, in stream
       status=obj['code'], reason=reason)
   kubernetes.client.exceptions.ApiException: (410)
   Reason: Expired: too old resource version: 891097969 (891307823)
   ```
   
   The culprit is here `for event in list_worker_pods():`, thus 
`_process_error` never gets a chance to run.
   
   So, a simple fix would be to do something like this:
   
   
https://github.com/apache/airflow/blob/2cf1ae30538e109627417e8f0c1650addac3311b/airflow/executors/kubernetes_executor.py#L95
   
   ```diff
       def run(self) -> None:
           """Performs watching"""
           kube_client: client.CoreV1Api = get_kube_client()
           if not self.scheduler_job_id:
               raise AirflowException(NOT_STARTED_MESSAGE)
           while True:
               try:
                   self.resource_version = self._run(
                       kube_client, self.resource_version, 
self.scheduler_job_id, self.kube_config
                   )
               except ReadTimeoutError:
                   self.log.warning(
                       "There was a timeout error accessing the Kube API. 
Retrying request.", exc_info=True
                   )
                   time.sleep(1)
   +            except ApiException as e:
   +                if e.status == 410:
   +                    self.resource_version = "0"
   +                else:
   +                    raise
               except Exception:
                   self.log.exception('Unknown error in KubernetesJobWatcher. 
Failing')
                   raise
               else:
                   self.log.warning(
                       'Watch died gracefully, starting back up with: last 
resource_version: %s',
                       self.resource_version,
                   )
   
   ```
   
   I'm not (yet) submitting a PR, as this is probably too much of a hack, but 
if this looks good, I'll gladly do.
   
   @cansjt I do not see any blockers from lib-kubernets side, this appears to 
be an airflow-related issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to