kaxil commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r765293105
##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,130 @@ def create_labels_for_pod(context) -> dict:
labels[label_id] = safe_label
return labels
- def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
- return pod_launcher.PodLauncher(kube_client=self.client,
extract_xcom=self.do_xcom_push)
+ @cached_property
+ def launcher(self) -> pod_launcher.PodLauncher:
+ return pod_launcher.PodLauncher(kube_client=self.client)
- def execute(self, context) -> Optional[str]:
+ @cached_property
+ def client(self) -> CoreV1Api:
+ # todo: use airflow Connection / hook to authenticate to the cluster
+ kwargs: Dict[str, Any] = dict(
+ cluster_context=self.cluster_context,
+ config_file=self.config_file,
+ )
+ if self.in_cluster is not None:
+ kwargs.update(in_cluster=self.in_cluster)
+ return kube_client.get_kube_client(**kwargs)
+
+ def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+ """Returns an already-running pod for this task instance if one
exists."""
+ labels = self._create_labels_for_pod(context)
+ label_selector = self._get_pod_identifying_label_string(labels)
+ pod_list = self.client.list_namespaced_pod(
+ namespace=namespace,
+ label_selector=label_selector,
+ )
+ if len(pod_list.items) > 1:
+ raise AirflowException(f'More than one pod running with labels
{label_selector}')
+ elif len(pod_list.items) == 1:
+ pod = pod_list.items[0]
+ self.log.info("Found matching pod %s", pod.metadata.name)
+ self._compare_try_numbers(context, pod)
+ return pod
+
+ def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+ if self.reattach_on_restart:
+ pod = self.find_pod(self.namespace or
pod_request_obj.metadata.namespace, context=context)
+ if pod:
+ return pod
+ self.log.debug("Starting pod:\n%s",
yaml.safe_dump(pod_request_obj.to_dict()))
+ self.launcher.create_pod(pod=pod_request_obj)
+ return pod_request_obj
+
+ def await_pod_start(self, pod):
try:
- if self.in_cluster is not None:
- client = kube_client.get_kube_client(
- in_cluster=self.in_cluster,
- cluster_context=self.cluster_context,
- config_file=self.config_file,
- )
- else:
- client = kube_client.get_kube_client(
- cluster_context=self.cluster_context,
config_file=self.config_file
- )
-
- self.client = client
-
- self.pod = self.create_pod_request_obj()
- self.namespace = self.pod.metadata.namespace
-
- # Add combination of labels to uniquely identify a running pod
- labels = self.create_labels_for_pod(context)
-
- label_selector = self._get_pod_identifying_label_string(labels)
-
- pod_list = self.client.list_namespaced_pod(self.namespace,
label_selector=label_selector)
+ self.launcher.await_pod_start(pod=pod,
startup_timeout=self.startup_timeout_seconds)
+ except PodLaunchFailedException:
+ if self.log_events_on_failure:
+ for event in self.launcher.read_pod_events(pod).items:
+ self.log.error("Pod Event: %s - %s", event.reason,
event.message)
+ raise
- if len(pod_list.items) > 1 and self.reattach_on_restart:
- raise AirflowException(
- f'More than one pod running with labels: {label_selector}'
- )
+ def extract_xcom(self, pod):
+ """Retrieves xcom value and kills xcom sidecar container"""
+ result = self.launcher.extract_xcom(pod)
+ self.log.info(result)
+ # todo: add tests re handling of json vs non-json-serializable values
Review comment:
Is this already handled or we want to handle it separately?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]