This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b89f8dc36ae44727e6bca12b22a19f4631d00f55 Author: Daniel Imberman <[email protected]> AuthorDate: Fri Oct 9 16:56:56 2020 -0700 Add "already checked" to failed pods in K8sPodOperator (#11368) --- .../contrib/operators/kubernetes_pod_operator.py | 24 +++++++++++--- airflow/kubernetes/pod_generator.py | 8 +++++ kubernetes_tests/test_kubernetes_pod_operator.py | 38 ++++++++++++++++++++++ 3 files changed, 65 insertions(+), 5 deletions(-) diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 7754fd7..dcd6c3e 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -299,7 +299,9 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- if len(pod_list.items) == 1: try_numbers_match = self._try_numbers_match(context, pod_list.items[0]) - final_state, result = self.handle_pod_overlap(labels, try_numbers_match, launcher, pod_list) + final_state, result = self.handle_pod_overlap( + labels, try_numbers_match, launcher, pod_list.items[0] + ) else: final_state, _, result = self.create_new_pod_for_operator(labels, launcher) if final_state != State.SUCCESS: @@ -309,7 +311,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- except AirflowException as ex: raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) - def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod_list): + def handle_pod_overlap(self, labels, try_numbers_match, launcher, pod): """ In cases where the Scheduler restarts while a KubernetsPodOperator task is running, this function will either continue to monitor the existing pod or launch a new pod @@ -319,17 +321,20 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- :param try_numbers_match: do the try numbers match? Only needed for logging purposes :type try_numbers_match: bool :param launcher: PodLauncher - :param pod_list: list of pods found + :param pod: Pod found """ if try_numbers_match: log_line = "found a running pod with labels {} and the same try_number.".format(labels) else: log_line = "found a running pod with labels {} but a different try_number.".format(labels) - if self.reattach_on_restart: + # In case of failed pods, should reattach the first time, but only once + # as the task will have already failed. + if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"): log_line = log_line + " Will attach to this pod and monitor instead of starting new one" self.log.info(log_line) - final_state, result = self.monitor_launched_pod(launcher, pod_list.items[0]) + self.pod = pod + final_state, result = self.monitor_launched_pod(launcher, pod) else: log_line = log_line + "creating pod with labels {} and launcher {}".format(labels, launcher) self.log.info(log_line) @@ -452,6 +457,14 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- launcher.delete_pod(self.pod) return final_state, self.pod, result + def patch_already_checked(self, pod): + """ + Add an "already tried annotation to ensure we only retry once + """ + pod.metadata.labels["already_checked"] = "True" + body = PodGenerator.serialize_pod(pod) + self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body) + def monitor_launched_pod(self, launcher, pod): """ Monitors a pod to completion that was created by a previous KubernetesPodOperator @@ -469,6 +482,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- if self.log_events_on_failure: for event in launcher.read_pod_events(pod).items: self.log.error("Pod Event: %s - %s", event.reason, event.message) + self.patch_already_checked(self.pod) raise AirflowException( 'Pod returned a failure: {state}'.format(state=final_state) ) diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 5a57230..2d30ca9 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -564,6 +564,14 @@ class PodGenerator(object): return reduce(PodGenerator.reconcile_pods, pod_list) @staticmethod + def serialize_pod(pod): + """ + Converts a k8s.V1Pod into a jsonified object + """ + api_client = ApiClient() + return api_client.sanitize_for_serialization(pod) + + @staticmethod def deserialize_model_file(path): """ :param path: Path to the file diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index dce9e86..2208c2d 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -1087,4 +1087,42 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): with self.assertRaises(ApiException): pod = client.read_namespaced_pod(name=name, namespace=namespace) + def test_reattach_failing_pod_once(self): + from airflow.utils.state import State + client = kube_client.get_kube_client(in_cluster=False) + name = "test" + namespace = "default" + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["exit 1"], + labels={"foo": "bar"}, + name="test", + task_id=name, + in_cluster=False, + do_xcom_push=False, + is_delete_operator_pod=False, + termination_grace_period=0, + ) + + context = create_context(k) + + with mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") as monitor_mock: + monitor_mock.return_value = (State.SUCCESS, None) + k.execute(context) + name = k.pod.metadata.name + pod = client.read_namespaced_pod(name=name, namespace=namespace) + while pod.status.phase != "Failed": + pod = client.read_namespaced_pod(name=name, namespace=namespace) + with self.assertRaises(AirflowException): + k.execute(context) + pod = client.read_namespaced_pod(name=name, namespace=namespace) + self.assertEqual(pod.metadata.labels["already_checked"], "True") + with mock.patch("airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator" + ".create_new_pod_for_operator") as create_mock: + create_mock.return_value = ("success", {}, {}) + k.execute(context) + create_mock.assert_called_once() + # pylint: enable=unused-argument
