dimberman commented on a change in pull request #15165:
URL: https://github.com/apache/airflow/pull/15165#discussion_r606730124
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -237,12 +238,29 @@ def __init__(
self.namespace = self.kube_config.kube_namespace
self.log.debug("Kubernetes using namespace %s", self.namespace)
self.kube_client = kube_client
- self.launcher = PodLauncher(kube_client=self.kube_client)
self._manager = multiprocessing.Manager()
self.watcher_queue = self._manager.Queue()
self.scheduler_job_id = scheduler_job_id
self.kube_watcher = self._make_kube_watcher()
+ def run_pod_async(self, pod: V1Pod, **kwargs):
+ """Runs POD asynchronously"""
+ pod_mutation_hook(pod)
+
+ sanitized_pod =
self.kube_client.api_client.sanitize_for_serialization(pod)
+ json_pod = json.dumps(sanitized_pod, indent=2)
+
+ self.log.debug('Pod Creation Request: \n%s', json_pod)
+ try:
+ resp = self.kube_client.create_namespaced_pod(
+ body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
+ )
+ self.log.debug('Pod Creation Response: %s', resp)
+ except Exception as e:
+ self.log.exception('Exception when attempting to create Namespaced
Pod: %s', json_pod)
+ raise e
+ return resp
Review comment:
@ashb It makes sense. The KubernetesExecutor is fire and forget. We
don't monitor the task via the pod launcher for the KubernetesExecutor, just
monitor the task state via the job watcher.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]