uranusjr commented on code in PR #39406:
URL: https://github.com/apache/airflow/pull/39406#discussion_r1627157954


##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -577,6 +577,17 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
                 for pod in pod_list:
                     self.adopt_launched_task(kube_client, pod, 
tis_to_flush_by_key)
             self._adopt_completed_pods(kube_client)
+            # as this method can be retried within a short time frame
+            # (wrapped in a run_with_db_retries of scheduler_job_runner,
+            # and get retried due to an OperationalError, for example),
+            # there is a chance that in second attempt, adopt_launched_task 
will not be called even once
+            # as all pods are already adopted in the first attempt.
+            # and tis_to_flush_by_key will contain TIs that are already 
adopted.
+            # therefore, we need to check if the TIs are already adopted by 
the first attempt and remove them.
+            for ti in list(tis_to_flush_by_key.keys()):

Review Comment:
   Is the `list()` part also due to the same reason? I don’t see it explained 
in the comment. Would this not be sufficient?
   
   ```suggestion
               for ti in tis_to_flush_by_key:
   ```



##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -577,6 +577,17 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
                 for pod in pod_list:
                     self.adopt_launched_task(kube_client, pod, 
tis_to_flush_by_key)
             self._adopt_completed_pods(kube_client)
+            # as this method can be retried within a short time frame
+            # (wrapped in a run_with_db_retries of scheduler_job_runner,
+            # and get retried due to an OperationalError, for example),
+            # there is a chance that in second attempt, adopt_launched_task 
will not be called even once
+            # as all pods are already adopted in the first attempt.
+            # and tis_to_flush_by_key will contain TIs that are already 
adopted.
+            # therefore, we need to check if the TIs are already adopted by 
the first attempt and remove them.
+            for ti in list(tis_to_flush_by_key.keys()):

Review Comment:
   Is the `list()` part also due to the same reason? I don’t see it explained 
in the comment. Would this not be sufficient?
   
   ```suggestion
               for ti in tis_to_flush_by_key:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to