kaxil commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r765292706



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,130 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, 
extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+        """Returns an already-running pod for this task instance if one 
exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            raise AirflowException(f'More than one pod running with labels 
{label_selector}')
+        elif len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info("Found matching pod %s", pod.metadata.name)
+            self._compare_try_numbers(context, pod)
+            return pod
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+        if self.reattach_on_restart:
+            pod = self.find_pod(self.namespace or 
pod_request_obj.metadata.namespace, context=context)
+            if pod:
+                return pod

Review comment:
       ```suggestion
               if self.find_pod(self.namespace or 
pod_request_obj.metadata.namespace, context=context):
                   return pod
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to