jedcunningham commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r758523268



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, 
extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one 
exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?

Review comment:
       That's already done in `_get_pod_identifying_label_string`.

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, 
extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one 
exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?
+            raise AirflowException(f'More than one pod running with labels 
{label_selector}')
+
+        if len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info(f"Found matching pod {pod.metadata.name}")
+            self._compare_try_numbers(context, pod)
+
+            # In case of failed pods, should reattach the first time, but only 
once
+            # as the task will have already failed.
+            if not pod.metadata.labels.get(self.POD_CHECKED_KEY):
+                return pod

Review comment:
       ```suggestion
               return pod
   ```
   
   We already exclude the `already_checked` pods, so I don't think this 
conditional does anything any longer?

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, 
extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one 
exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?
+            raise AirflowException(f'More than one pod running with labels 
{label_selector}')
+
+        if len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info(f"Found matching pod {pod.metadata.name}")

Review comment:
       ```suggestion
               self.log.info("Found matching pod %s", pod.metadata.name)
   ```

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -51,10 +55,23 @@ def should_retry_start_pod(exception: Exception) -> bool:
 class PodStatus:
     """Status of the PODs"""
 
-    PENDING = 'pending'
-    RUNNING = 'running'
-    FAILED = 'failed'
-    SUCCEEDED = 'succeeded'
+    PENDING = 'Pending'
+    RUNNING = 'Running'
+    FAILED = 'Failed'
+    SUCCEEDED = 'Succeeded'
+
+    terminal_states = {FAILED, SUCCEEDED}
+
+
+def container_is_running(event: V1Pod, container_name: str):

Review comment:
       `event` being a `V1Pod` seems odd to me? Should we rename this to `pod`?

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -51,10 +55,23 @@ def should_retry_start_pod(exception: Exception) -> bool:
 class PodStatus:
     """Status of the PODs"""
 
-    PENDING = 'pending'
-    RUNNING = 'running'
-    FAILED = 'failed'
-    SUCCEEDED = 'succeeded'
+    PENDING = 'Pending'
+    RUNNING = 'Running'
+    FAILED = 'Failed'
+    SUCCEEDED = 'Succeeded'
+
+    terminal_states = {FAILED, SUCCEEDED}
+
+
+def container_is_running(event: V1Pod, container_name: str):
+    container_statuses = event.status.container_statuses if event and 
event.status else None
+    if not container_statuses:
+        return False
+    container_status = next(iter([x for x in container_statuses if x.name == 
container_name]), None)
+    if not container_status:
+        return False
+    else:
+        return container_status.state.running is not None

Review comment:
       ```suggestion
       return container_status.state.running is not None
   ```
   
   nit

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -501,83 +572,64 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
         if self.do_xcom_push:
             self.log.debug("Adding xcom sidecar to task %s", self.task_id)
             pod = xcom_sidecar.add_xcom_sidecar(pod)
-        return pod
-
-    def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, 
k8s.V1Pod, Optional[str]]:
-        """
-        Creates a new pod and monitors for duration of task
 
-        :param labels: labels used to track pod
-        :param launcher: pod launcher that will manage launching and 
monitoring pods
-        :return:
-        """
-        self.log.debug(
-            "Adding KubernetesPodOperator labels to pod before launch for task 
%s", self.task_id
-        )
+        labels = self._create_labels_for_pod(context)
+        self.log.info(f"creating pod with labels {labels} and launcher 
{self.launcher}")

Review comment:
       ```suggestion
           self.log.info("creating pod with labels %s and launcher %s", labels, 
launcher)
   ```

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, 
extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one 
exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?
+            raise AirflowException(f'More than one pod running with labels 
{label_selector}')
+
+        if len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info(f"Found matching pod {pod.metadata.name}")
+            self._compare_try_numbers(context, pod)
+
+            # In case of failed pods, should reattach the first time, but only 
once
+            # as the task will have already failed.
+            if not pod.metadata.labels.get(self.POD_CHECKED_KEY):
+                return pod
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+        if self.reattach_on_restart:
+            pod = self.find_pod(self.namespace or 
pod_request_obj.metadata.namespace, context=context)
+            if pod:
+                if not self.is_delete_operator_pod:
+                    self.patch_already_checked(pod)

Review comment:
       Doesn't this mean we can only reattach once? I think we should be 
patching this in cleanup, not when we reattach.

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +131,102 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> None:
+        """
+        Launches the pod asynchronously.
+
+        :param pod:
+        :return:
+        """
+        pod = self.run_pod_async(pod)
+        return pod
+
+    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
         """
-        Launches the pod synchronously and waits for completion.
+        Waits for the pod to reach phase other than ``Pending``
 
         :param pod:
         :param startup_timeout: Timeout (in seconds) for startup of the pod
             (if pod is pending for too long, fails task)
         :return:
         """
-        resp = self.run_pod_async(pod)
         curr_time = dt.now()
-        if resp.status.start_time is None:
-            while self.pod_not_started(pod):
-                self.log.warning("Pod not yet started: %s", pod.metadata.name)
-                delta = dt.now() - curr_time
-                if delta.total_seconds() >= startup_timeout:
-                    msg = (
-                        f"Pod took longer than {startup_timeout} seconds to 
start. "
-                        "Check the pod events in kubernetes to determine why."
-                    )
-                    raise AirflowException(msg)
-                time.sleep(1)
-
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, 
Optional[str]]:
+        while True:
+            event = self.read_pod(pod)
+            if event.status.phase != PodStatus.PENDING:
+                break
+            self.log.warning("Pod not yet started: %s", pod.metadata.name)
+            delta = dt.now() - curr_time
+            if delta.total_seconds() >= startup_timeout:
+                msg = (
+                    f"Pod took longer than {startup_timeout} seconds to start. 
"
+                    "Check the pod events in kubernetes to determine why."
+                )
+                raise PodLaunchFailedException(msg)
+            time.sleep(1)
+
+    def follow_container_logs(self, pod: V1Pod, container_name: str):
+        """
+        Follows the logs of container and streams to airflow logging.
+        Returns when container exits.
+        """
+        container_stopped = False
+        read_logs_since_sec = None
+        last_log_time = None
+        while True:
+            try:
+                logs = self.read_pod_logs(
+                    pod=pod,
+                    container_name=container_name,
+                    timestamps=True,
+                    since_seconds=read_logs_since_sec,
+                )
+                for line in logs:  # type: bytes
+                    timestamp, message = 
self.parse_log_line(line.decode('utf-8'))
+                    self.log.info(message)
+                    if timestamp:
+                        last_log_time = timestamp
+            except BaseHTTPError:  # Catches errors like 
ProtocolError(TimeoutError).
+                self.log.warning(
+                    'Failed to read logs for pod %s',
+                    pod.metadata.name,
+                    exc_info=True,
+                )
+
+            if container_stopped is True:
+                break
+
+            if last_log_time:
+                delta = pendulum.now() - last_log_time
+                read_logs_since_sec = math.ceil(delta.total_seconds())
+
+            time.sleep(1)
+
+            if not self.container_is_running(pod, 
container_name=container_name):
+                container_stopped = True  # fetch logs once more and exit
+            else:
+                self.log.info('Container %s has state %s', pod.metadata.name, 
State.RUNNING)

Review comment:
       I realize this was how it was before, but `State.RUNNING` (as in, 
Airflows task running state) seems like the wrong thing to use here, no? Maybe 
this should be `pod.status.phase` instead like `await_pod_completion`?

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +131,102 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> None:
+        """
+        Launches the pod asynchronously.
+
+        :param pod:
+        :return:
+        """
+        pod = self.run_pod_async(pod)
+        return pod

Review comment:
       ```suggestion
           return self.run_pod_async(pod)
   ```
   
   nit

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, 
extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one 
exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?
+            raise AirflowException(f'More than one pod running with labels 
{label_selector}')
+
+        if len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info(f"Found matching pod {pod.metadata.name}")
+            self._compare_try_numbers(context, pod)
+
+            # In case of failed pods, should reattach the first time, but only 
once
+            # as the task will have already failed.
+            if not pod.metadata.labels.get(self.POD_CHECKED_KEY):
+                return pod
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+        if self.reattach_on_restart:
+            pod = self.find_pod(self.namespace or 
pod_request_obj.metadata.namespace, context=context)
+            if pod:
+                if not self.is_delete_operator_pod:
+                    self.patch_already_checked(pod)
+                return pod
+        self.log.debug("Starting pod:\n%s", 
yaml.safe_dump(pod_request_obj.to_dict()))
+        self.launcher.create_pod(pod=pod_request_obj)
+        return pod_request_obj
+
+    def await_pod_start(self, pod):
         try:
-            if self.in_cluster is not None:
-                client = kube_client.get_kube_client(
-                    in_cluster=self.in_cluster,
-                    cluster_context=self.cluster_context,
-                    config_file=self.config_file,
-                )
-            else:
-                client = kube_client.get_kube_client(
-                    cluster_context=self.cluster_context, 
config_file=self.config_file
-                )
-
-            self.client = client
-
-            self.pod = self.create_pod_request_obj()
-            self.namespace = self.pod.metadata.namespace
-
-            # Add combination of labels to uniquely identify a running pod
-            labels = self.create_labels_for_pod(context)
-
-            label_selector = self._get_pod_identifying_label_string(labels)
+            self.launcher.await_pod_start(pod=pod, 
startup_timeout=self.startup_timeout_seconds)
+        except PodLaunchFailedException:
+            if self.log_events_on_failure:
+                for event in self.launcher.read_pod_events(pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
+            raise
 
-            pod_list = self.client.list_namespaced_pod(self.namespace, 
label_selector=label_selector)
+    def extract_xcom(self, pod):
+        """Retrieves xcom value and kills xcom sidecar container"""
+        result = self.launcher.extract_xcom(pod)
+        self.log.info(result)
+        # todo: add tests re handling of json vs non-json-serializable values
+        return json.loads(result)
 
-            if len(pod_list.items) > 1 and self.reattach_on_restart:
-                raise AirflowException(
-                    f'More than one pod running with labels: {label_selector}'
+    def execute(self, context):
+        base_container_stopped = False
+        remote_pod = None
+        try:
+            self.pod_request_obj = self.build_pod_request_obj(context)
+            self.pod = self.get_or_create_pod(  # must set `self.pod` for 
`on_kill`
+                pod_request_obj=self.pod_request_obj,
+                context=context,
+            )
+            self.await_pod_start(pod=self.pod)
+            if self.get_logs:
+                base_container_stopped = self.launcher.follow_container_logs(
+                    pod=self.pod,
+                    container_name=self.BASE_CONTAINER_NAME,
                 )
 
-            launcher = self.create_pod_launcher()
+            # if not getting logs, still need to wait for base container 
before getting xcom
+            if not base_container_stopped:
+                self.launcher.await_container(pod=self.pod, 
container_name=self.BASE_CONTAINER_NAME)
 
-            if len(pod_list.items) == 1:
-                try_numbers_match = self._try_numbers_match(context, 
pod_list.items[0])
-                final_state, remote_pod, result = self.handle_pod_overlap(
-                    labels, try_numbers_match, launcher, pod_list.items[0]
-                )
-            else:
-                self.log.info("creating pod with labels %s and launcher %s", 
labels, launcher)
-                final_state, remote_pod, result = 
self.create_new_pod_for_operator(labels, launcher)
-            if final_state != State.SUCCESS:
-                raise AirflowException(f'Pod {self.pod.metadata.name} returned 
a failure: {remote_pod}')
-            context['task_instance'].xcom_push(key='pod_name', 
value=self.pod.metadata.name)
-            context['task_instance'].xcom_push(key='pod_namespace', 
value=self.namespace)
+            if self.do_xcom_push:
+                result = self.extract_xcom(pod=self.pod)
+            remote_pod = self.launcher.await_pod_completion(self.pod)
+        finally:
+            self.cleanup(
+                pod=self.pod or self.pod_request_obj,
+                remote_pod=remote_pod,
+            )
+        if self.do_xcom_push:
+            ti = context['ti']
+            if remote_pod:
+                ti.xcom_push(key='pod_name', value=remote_pod.metadata.name)
+                ti.xcom_push(key='pod_namespace', 
value=remote_pod.metadata.namespace)
             return result
-        except AirflowException as ex:
-            raise AirflowException(f'Pod Launching failed: {ex}')
 
-    def handle_pod_overlap(
-        self, labels: dict, try_numbers_match: bool, launcher: Any, pod: 
k8s.V1Pod
-    ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
-        """
+    def cleanup(self, pod, remote_pod):
+        with _suppress_with_logging(self, Exception):
+            self.process_pod_deletion(pod)
 
-        In cases where the Scheduler restarts while a KubernetesPodOperator 
task is running,
-        this function will either continue to monitor the existing pod or 
launch a new pod
-        based on the `reattach_on_restart` parameter.
-
-        :param labels: labels used to determine if a pod is repeated
-        :type labels: dict
-        :param try_numbers_match: do the try numbers match? Only needed for 
logging purposes
-        :type try_numbers_match: bool
-        :param launcher: PodLauncher
-        :param pod: Pod found with matching labels
-        """
-        if try_numbers_match:
-            log_line = f"found a running pod with labels {labels} and the same 
try_number."
-        else:
-            log_line = f"found a running pod with labels {labels} but a 
different try_number."
-
-        # In case of failed pods, should reattach the first time, but only once
-        # as the task will have already failed.
-        if self.reattach_on_restart and not 
pod.metadata.labels.get("already_checked"):
-            log_line += " Will attach to this pod and monitor instead of 
starting new one"
-            self.log.info(log_line)
-            self.pod = pod
-            final_state, remote_pod, result = 
self.monitor_launched_pod(launcher, pod)
+        pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') 
else None
+        if pod_phase != PodStatus.SUCCEEDED:
+            if self.log_events_on_failure:
+                with _suppress_with_logging(self, Exception):
+                    for event in self.launcher.read_pod_events(pod).items:
+                        self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
+            if not self.is_delete_operator_pod:
+                with _suppress_with_logging(self, Exception):
+                    self.patch_already_checked(pod)
+            raise AirflowException(f'Pod {pod and pod.metadata.name} returned 
a failure: {remote_pod}')
+
+    def process_pod_deletion(self, pod):
+        if self.is_delete_operator_pod:
+            self.log.info("deleting pod")
+            self.launcher.delete_pod(pod)
         else:
-            log_line += f"creating pod with labels {labels} and launcher 
{launcher}"
-            self.log.info(log_line)
-            final_state, remote_pod, result = 
self.create_new_pod_for_operator(labels, launcher)
-        return final_state, remote_pod, result
+            self.log.info("skipping pod delete")
 
-    @staticmethod
-    def _get_pod_identifying_label_string(labels) -> str:
+    def _get_pod_identifying_label_string(self, labels) -> str:
         label_strings = [
             f'{label_id}={label}' for label_id, label in 
sorted(labels.items()) if label_id != 'try_number'
         ]
-        return ','.join(label_strings) + ',already_checked!=True'
-
-    @staticmethod
-    def _try_numbers_match(context, pod) -> bool:
-        return pod.metadata.labels['try_number'] == context['ti'].try_number
+        return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True'
+
+    def _compare_try_numbers(self, context, pod):
+        tries_match = pod.metadata.labels['try_number'] == 
context['ti'].try_number
+        self.log.info(
+            ' '.join(
+                [
+                    f"found a running pod with labels {pod.metadata.labels}",
+                    "and the same try_number." if tries_match else "but a 
different try_number.",
+                ]
+            )

Review comment:
       ```suggestion
               (
                   "found a running pod with labels %s"
                   " and the same try_number." if tries_match else "but a 
different try_number."
               ), pod.metadata.labels
   ```
   Might need a little more formatting, but don't use fstring, and you don't 
need to do the join/list thing either.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to