tanvn commented on code in PR #39406:
URL: https://github.com/apache/airflow/pull/39406#discussion_r1627630412
##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -577,6 +577,17 @@ def try_adopt_task_instances(self, tis:
Sequence[TaskInstance]) -> Sequence[Task
for pod in pod_list:
self.adopt_launched_task(kube_client, pod,
tis_to_flush_by_key)
self._adopt_completed_pods(kube_client)
+ # as this method can be retried within a short time frame
+ # (wrapped in a run_with_db_retries of scheduler_job_runner,
+ # and get retried due to an OperationalError, for example),
+ # there is a chance that in second attempt, adopt_launched_task
will not be called even once
+ # as all pods are already adopted in the first attempt.
+ # and tis_to_flush_by_key will contain TIs that are already
adopted.
+ # therefore, we need to check if the TIs are already adopted by
the first attempt and remove them.
+ for ti in list(tis_to_flush_by_key.keys()):
+ if ti in self.running:
+ del tis_to_flush_by_key[ti]
+ self.log.info("%s is already adopted, no need to flush.",
ti)
tis_to_flush.extend(tis_to_flush_by_key.values())
Review Comment:
Thanks and fixed!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]