This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dee05b2ebc Prevent KubernetesJobWatcher getting stuck on resource too 
old (#23521)
dee05b2ebc is described below

commit dee05b2ebca6ab66f1b447837e11fe204f98b2df
Author: Ruben Laguna <[email protected]>
AuthorDate: Wed May 11 08:25:49 2022 +0200

    Prevent KubernetesJobWatcher getting stuck on resource too old (#23521)
    
    * Prevent KubernetesJobWatcher getting stuck on resource too old
    
    If the watch fails because "resource too old" the
    KubernetesJobWatcher should not retry with the same resource version
    as that will end up in loop where there is no progress.
    
    * Reset ResourceVersion().resource_version to 0
---
 airflow/executors/kubernetes_executor.py    |  3 +++
 tests/executors/test_kubernetes_executor.py | 34 +++++++++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 9b9de71681..c76cf58f41 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -109,6 +109,8 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                 time.sleep(1)
             except Exception:
                 self.log.exception('Unknown error in KubernetesJobWatcher. 
Failing')
+                self.resource_version = "0"
+                ResourceVersion().resource_version = "0"
                 raise
             else:
                 self.log.warning(
@@ -288,6 +290,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
             self.log.error(
                 'Error while health checking kube watcher process. Process 
died for unknown reasons'
             )
+            ResourceVersion().resource_version = "0"
             self.kube_watcher = self._make_kube_watcher()
 
     def run_next(self, next_job: KubernetesJobType) -> None:
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index a677fe598b..a332a2fd6a 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -39,6 +39,7 @@ try:
         AirflowKubernetesScheduler,
         KubernetesExecutor,
         KubernetesJobWatcher,
+        ResourceVersion,
         create_pod_id,
         get_base_pod_from_template,
     )
@@ -957,3 +958,36 @@ class TestKubernetesJobWatcher(unittest.TestCase):
             f"Kubernetes failure for {raw_object['reason']} "
             f"with code {raw_object['code']} and message: 
{raw_object['message']}"
         )
+
+    def test_recover_from_resource_too_old(self):
+        # too old resource
+        mock_underscore_run = mock.MagicMock()
+
+        def effect():
+            yield '500'
+            while True:
+                yield Exception('sentinel')
+
+        mock_underscore_run.side_effect = effect()
+
+        self.watcher._run = mock_underscore_run
+
+        with 
mock.patch('airflow.executors.kubernetes_executor.get_kube_client'):
+            try:
+                # self.watcher._run() is mocked and return "500" as last 
resource_version
+                self.watcher.run()
+            except Exception as e:
+                assert e.args == ('sentinel',)
+
+            # both  resource_version should be 0 after _run raises and 
exception
+            assert self.watcher.resource_version == '0'
+            assert ResourceVersion().resource_version == '0'
+
+            # check that in the next run, _run is invoked with 
resource_version = 0
+            mock_underscore_run.reset_mock()
+            try:
+                self.watcher.run()
+            except Exception as e:
+                assert e.args == ('sentinel',)
+
+            mock_underscore_run.assert_called_once_with(mock.ANY, '0', 
mock.ANY, mock.ANY)

Reply via email to