This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c47a7c443056382401c05363d0e57b8301f1bf31 Author: Daniel Imberman <[email protected]> AuthorDate: Tue Aug 11 07:01:27 2020 -0700 Fix KubernetesPodOperator reattachment (#10230) (cherry picked from commit 8cd2be9e161635480581a0dc723b69ed24166f8d) --- .../contrib/operators/kubernetes_pod_operator.py | 46 ++++++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 41f0df3..98464b7 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -270,23 +270,16 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- pod_list = client.list_namespaced_pod(self.namespace, label_selector=label_selector) - if len(pod_list.items) > 1: + if len(pod_list.items) > 1 and self.reattach_on_restart: raise AirflowException( 'More than one pod running with labels: ' '{label_selector}'.format(label_selector=label_selector)) launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push) - if len(pod_list.items) == 1 and \ - self._try_numbers_do_not_match(context, pod_list.items[0]) and \ - self.reattach_on_restart: - self.log.info("found a running pod with labels %s but a different try_number" - "Will attach to this pod and monitor instead of starting new one", labels) - final_state, _, result = self.create_new_pod_for_operator(labels, launcher) - elif len(pod_list.items) == 1: - self.log.info("found a running pod with labels %s." - "Will monitor this pod instead of starting new one", labels) - final_state, result = self.monitor_launched_pod(launcher, pod_list[0]) + if len(pod_list.items) == 1: + try_numbers_match = self._try_numbers_match(context, pod_list.items[0]) + final_state, result = self.handle_pod_overlap(labels, try_numbers_match, launcher, pod_list) else: final_state, _, result = self.create_new_pod_for_operator(labels, launcher) if final_state != State.SUCCESS: @@ -296,14 +289,41 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- except AirflowException as ex: raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) + def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod_list): + """ + In cases where the Scheduler restarts while a KubernetsPodOperator task is running, + this function will either continue to monitor the existing pod or launch a new pod + based on the `reattach_on_restart` parameter. + :param labels: labels used to determine if a pod is repeated + :type labels: dict + :param try_numbers_match: do the try numbers match? Only needed for logging purposes + :type try_numbers_match: bool + :param launcher: PodLauncher + :param pod_list: list of pods found + """ + if try_numbers_match: + log_line = "found a running pod with labels {} and the same try_number.".format(labels) + else: + log_line = "found a running pod with labels {} but a different try_number.".format(labels) + + if self.reattach_on_restart: + log_line = log_line + " Will attach to this pod and monitor instead of starting new one" + self.log.info(log_line) + final_state, result = self.monitor_launched_pod(launcher, pod_list.items[0]) + else: + log_line = log_line + "creating pod with labels {} and launcher {}".format(labels, launcher) + self.log.info(log_line) + final_state, _, result = self.create_new_pod_for_operator(labels, launcher) + return final_state, result + @staticmethod def _get_pod_identifying_label_string(labels): filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'} return ','.join([label_id + '=' + label for label_id, label in sorted(filtered_labels.items())]) @staticmethod - def _try_numbers_do_not_match(context, pod): - return pod.metadata.labels['try_number'] != context['ti'].try_number + def _try_numbers_match(context, pod): + return pod.metadata.labels['try_number'] == context['ti'].try_number @staticmethod def _set_resources(resources):
