kaxil commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r765292182
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,130 @@ def create_labels_for_pod(context) -> dict:
labels[label_id] = safe_label
return labels
- def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
- return pod_launcher.PodLauncher(kube_client=self.client,
extract_xcom=self.do_xcom_push)
+ @cached_property
+ def launcher(self) -> pod_launcher.PodLauncher:
+ return pod_launcher.PodLauncher(kube_client=self.client)
- def execute(self, context) -> Optional[str]:
+ @cached_property
+ def client(self) -> CoreV1Api:
+ # todo: use airflow Connection / hook to authenticate to the cluster
+ kwargs: Dict[str, Any] = dict(
+ cluster_context=self.cluster_context,
+ config_file=self.config_file,
+ )
+ if self.in_cluster is not None:
+ kwargs.update(in_cluster=self.in_cluster)
+ return kube_client.get_kube_client(**kwargs)
+
+ def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+ """Returns an already-running pod for this task instance if one
exists."""
+ labels = self._create_labels_for_pod(context)
+ label_selector = self._get_pod_identifying_label_string(labels)
+ pod_list = self.client.list_namespaced_pod(
+ namespace=namespace,
+ label_selector=label_selector,
+ )
+ if len(pod_list.items) > 1:
+ raise AirflowException(f'More than one pod running with labels
{label_selector}')
+ elif len(pod_list.items) == 1:
+ pod = pod_list.items[0]
+ self.log.info("Found matching pod %s", pod.metadata.name)
+ self._compare_try_numbers(context, pod)
+ return pod
Review comment:
```suggestion
pod_list_items = pod_list.items
num_pod_list_items = len(pod_list.items)
if num_pod_list_items > 1:
raise AirflowException(f'More than one pod running with labels
{label_selector}')
elif num_pod_list_items == 1:
pod = pod_list_items[0]
self.log.info("Found matching pod %s", pod.metadata.name)
self._compare_try_numbers(context, pod)
return pod
```
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,130 @@ def create_labels_for_pod(context) -> dict:
labels[label_id] = safe_label
return labels
- def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
- return pod_launcher.PodLauncher(kube_client=self.client,
extract_xcom=self.do_xcom_push)
+ @cached_property
+ def launcher(self) -> pod_launcher.PodLauncher:
+ return pod_launcher.PodLauncher(kube_client=self.client)
- def execute(self, context) -> Optional[str]:
+ @cached_property
+ def client(self) -> CoreV1Api:
+ # todo: use airflow Connection / hook to authenticate to the cluster
+ kwargs: Dict[str, Any] = dict(
+ cluster_context=self.cluster_context,
+ config_file=self.config_file,
+ )
+ if self.in_cluster is not None:
+ kwargs.update(in_cluster=self.in_cluster)
+ return kube_client.get_kube_client(**kwargs)
+
+ def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+ """Returns an already-running pod for this task instance if one
exists."""
+ labels = self._create_labels_for_pod(context)
+ label_selector = self._get_pod_identifying_label_string(labels)
+ pod_list = self.client.list_namespaced_pod(
+ namespace=namespace,
+ label_selector=label_selector,
+ )
+ if len(pod_list.items) > 1:
+ raise AirflowException(f'More than one pod running with labels
{label_selector}')
+ elif len(pod_list.items) == 1:
+ pod = pod_list.items[0]
+ self.log.info("Found matching pod %s", pod.metadata.name)
+ self._compare_try_numbers(context, pod)
+ return pod
Review comment:
nit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]