This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8daa53eaa5 Avoid resetting adopted task instances when retrying for 
kubernetes executor (#39406)
8daa53eaa5 is described below

commit 8daa53eaa5d64727abd7430c9f58eb8a14613db2
Author: Vu Tan <[email protected]>
AuthorDate: Fri Jun 7 17:00:24 2024 +0900

    Avoid resetting adopted task instances when retrying for kubernetes 
executor (#39406)
    
    * Avoid resetting adopted task instances when retrying
    
    * Stop using f-string when logging
    
    * Address comment
    
    * Remove return type of generator func
    
    * Add unit test
    
    * Add comment and fix linter error
---
 .../kubernetes/executors/kubernetes_executor.py    | 18 +++++++++++++--
 .../executors/test_kubernetes_executor.py          | 27 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index d2f9c61ce8..40a27d70fd 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -83,7 +83,6 @@ if TYPE_CHECKING:
         AirflowKubernetesScheduler,
     )
 
-
 # CLI Args
 ARG_NAMESPACE = Arg(
     ("--namespace",),
@@ -577,7 +576,22 @@ class KubernetesExecutor(BaseExecutor):
                 for pod in pod_list:
                     self.adopt_launched_task(kube_client, pod, 
tis_to_flush_by_key)
             self._adopt_completed_pods(kube_client)
-            tis_to_flush.extend(tis_to_flush_by_key.values())
+
+            # as this method can be retried within a short time frame
+            # (wrapped in a run_with_db_retries of scheduler_job_runner,
+            # and get retried due to an OperationalError, for example),
+            # there is a chance that in second attempt, adopt_launched_task 
will not be called even once
+            # as all pods are already adopted in the first attempt.
+            # and tis_to_flush_by_key will contain TIs that are already 
adopted.
+            # therefore, we need to check if the TIs are already adopted by 
the first attempt and remove them.
+            def _iter_tis_to_flush():
+                for key, ti in tis_to_flush_by_key.items():
+                    if key in self.running:
+                        self.log.info("%s is already adopted, no need to 
flush.", ti)
+                    else:
+                        yield ti
+
+            tis_to_flush.extend(_iter_tis_to_flush())
             return tis_to_flush
 
     def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
diff --git 
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py 
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index 9dffa3f778..f3831f3195 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -991,8 +991,35 @@ class TestKubernetesExecutor:
 
         tis_to_flush = executor.try_adopt_task_instances([mock_ti])
         assert tis_to_flush == [mock_ti]
+        assert executor.running == set()
+        mock_adopt_launched_task.assert_not_called()
+        mock_adopt_completed_pods.assert_called_once()
+
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+    @mock.patch(
+        
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task"
+    )
+    @mock.patch(
+        
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods"
+    )
+    def test_try_adopt_already_adopted_task_instances(
+        self, mock_adopt_completed_pods, mock_adopt_launched_task, 
mock_kube_dynamic_client
+    ):
+        """For TIs that are already adopted, we should not flush them"""
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.get.return_value.items = []
+        mock_kube_client = mock.MagicMock()
+        executor = self.kubernetes_executor
+        executor.kube_client = mock_kube_client
+        ti_key = TaskInstanceKey("dag", "task", "run_id", 1)
+        mock_ti = mock.MagicMock(queued_by_job_id="1", 
external_executor_id="1", key=ti_key)
+        executor.running = {ti_key}
+
+        tis_to_flush = executor.try_adopt_task_instances([mock_ti])
         mock_adopt_launched_task.assert_not_called()
         mock_adopt_completed_pods.assert_called_once()
+        assert tis_to_flush == []
+        assert executor.running == {ti_key}
 
     
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
     def test_adopt_launched_task(self, mock_kube_client):

Reply via email to