dstandish commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r758731584



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, 
extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one 
exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?
+            raise AirflowException(f'More than one pod running with labels 
{label_selector}')
+
+        if len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info(f"Found matching pod {pod.metadata.name}")
+            self._compare_try_numbers(context, pod)
+
+            # In case of failed pods, should reattach the first time, but only 
once
+            # as the task will have already failed.
+            if not pod.metadata.labels.get(self.POD_CHECKED_KEY):
+                return pod
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+        if self.reattach_on_restart:
+            pod = self.find_pod(self.namespace or 
pod_request_obj.metadata.namespace, context=context)
+            if pod:
+                if not self.is_delete_operator_pod:
+                    self.patch_already_checked(pod)

Review comment:
       hmm yeah actually looks like this is not necessary and is not in the old 
behavior; i'm chopping it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to