dimberman commented on a change in pull request #15165:
URL: https://github.com/apache/airflow/pull/15165#discussion_r606730124



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -237,12 +238,29 @@ def __init__(
         self.namespace = self.kube_config.kube_namespace
         self.log.debug("Kubernetes using namespace %s", self.namespace)
         self.kube_client = kube_client
-        self.launcher = PodLauncher(kube_client=self.kube_client)
         self._manager = multiprocessing.Manager()
         self.watcher_queue = self._manager.Queue()
         self.scheduler_job_id = scheduler_job_id
         self.kube_watcher = self._make_kube_watcher()
 
+    def run_pod_async(self, pod: V1Pod, **kwargs):
+        """Runs POD asynchronously"""
+        pod_mutation_hook(pod)
+
+        sanitized_pod = 
self.kube_client.api_client.sanitize_for_serialization(pod)
+        json_pod = json.dumps(sanitized_pod, indent=2)
+
+        self.log.debug('Pod Creation Request: \n%s', json_pod)
+        try:
+            resp = self.kube_client.create_namespaced_pod(
+                body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
+            )
+            self.log.debug('Pod Creation Response: %s', resp)
+        except Exception as e:
+            self.log.exception('Exception when attempting to create Namespaced 
Pod: %s', json_pod)
+            raise e
+        return resp

Review comment:
       @ashb It makes sense. The KubernetesExecutor is fire and forget. We 
don't monitor the task via the pod launcher for the KubernetesExecutor, just 
monitor the task state via the job watcher.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to