This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b122c1503 Move the try outside the loop when this is possible in 
kubernetes provider (#33977)
1b122c1503 is described below

commit 1b122c15030e99cef9d4ff26d3781a7a9d6949bc
Author: Hussein Awala <[email protected]>
AuthorDate: Fri Sep 1 08:49:05 2023 +0200

    Move the try outside the loop when this is possible in kubernetes provider 
(#33977)
---
 .../kubernetes/executors/kubernetes_executor.py    | 25 +++-----
 .../executors/kubernetes_executor_utils.py         | 13 ++---
 airflow/providers/cncf/kubernetes/triggers/pod.py  | 66 +++++++++++-----------
 3 files changed, 46 insertions(+), 58 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 90e4927c34..a0825d1d62 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -23,6 +23,7 @@ KubernetesExecutor.
 """
 from __future__ import annotations
 
+import contextlib
 import json
 import logging
 import multiprocessing
@@ -354,8 +355,8 @@ class KubernetesExecutor(BaseExecutor):
         self.kube_scheduler.sync()
 
         last_resource_version: dict[str, str] = defaultdict(lambda: "0")
-        while True:
-            try:
+        with contextlib.suppress(Empty):
+            while True:
                 results = self.result_queue.get_nowait()
                 try:
                     key, state, pod_name, namespace, resource_version = results
@@ -373,8 +374,6 @@ class KubernetesExecutor(BaseExecutor):
                         self.result_queue.put(results)
                 finally:
                     self.result_queue.task_done()
-            except Empty:
-                break
 
         from 
airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import 
ResourceVersion
 
@@ -386,8 +385,8 @@ class KubernetesExecutor(BaseExecutor):
 
         from kubernetes.client.rest import ApiException
 
-        for _ in range(self.kube_config.worker_pods_creation_batch_size):
-            try:
+        with contextlib.suppress(Empty):
+            for _ in range(self.kube_config.worker_pods_creation_batch_size):
                 task = self.task_queue.get_nowait()
 
                 try:
@@ -423,8 +422,6 @@ class KubernetesExecutor(BaseExecutor):
                     self.fail(key, e)
                 finally:
                     self.task_queue.task_done()
-            except Empty:
-                break
 
         # Run any pending timed events
         next_event = self.event_scheduler.run(blocking=False)
@@ -666,22 +663,20 @@ class KubernetesExecutor(BaseExecutor):
             assert self.task_queue
 
         self.log.debug("Executor shutting down, task_queue approximate 
size=%d", self.task_queue.qsize())
-        while True:
-            try:
+        with contextlib.suppress(Empty):
+            while True:
                 task = self.task_queue.get_nowait()
                 # This is a new task to run thus ok to ignore.
                 self.log.warning("Executor shutting down, will NOT run 
task=%s", task)
                 self.task_queue.task_done()
-            except Empty:
-                break
 
     def _flush_result_queue(self) -> None:
         if TYPE_CHECKING:
             assert self.result_queue
 
         self.log.debug("Executor shutting down, result_queue approximate 
size=%d", self.result_queue.qsize())
-        while True:
-            try:
+        with contextlib.suppress(Empty):
+            while True:
                 results = self.result_queue.get_nowait()
                 self.log.warning("Executor shutting down, flushing 
results=%s", results)
                 try:
@@ -700,8 +695,6 @@ class KubernetesExecutor(BaseExecutor):
                         )
                 finally:
                     self.result_queue.task_done()
-            except Empty:
-                break
 
     def end(self) -> None:
         """Called when the executor shuts down."""
diff --git 
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index f8868dff44..3277d3e60f 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import contextlib
 import json
 import multiprocessing
 import time
@@ -440,16 +441,14 @@ class AirflowKubernetesScheduler(LoggingMixin):
         """
         self.log.debug("Syncing KubernetesExecutor")
         self._health_check_kube_watchers()
-        while True:
-            try:
+        with contextlib.suppress(Empty):
+            while True:
                 task = self.watcher_queue.get_nowait()
                 try:
                     self.log.debug("Processing task %s", task)
                     self.process_watcher_task(task)
                 finally:
                     self.watcher_queue.task_done()
-            except Empty:
-                break
 
     def process_watcher_task(self, task: KubernetesWatchType) -> None:
         """Process the task by watcher."""
@@ -467,14 +466,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
 
     def _flush_watcher_queue(self) -> None:
         self.log.debug("Executor shutting down, watcher_queue approx. 
size=%d", self.watcher_queue.qsize())
-        while True:
-            try:
+        with contextlib.suppress(Empty):
+            while True:
                 task = self.watcher_queue.get_nowait()
                 # Ignoring it since it can only have either FAILED or 
SUCCEEDED pods
                 self.log.warning("Executor shutting down, IGNORING watcher 
task=%s", task)
                 self.watcher_queue.task_done()
-            except Empty:
-                break
 
     def terminate(self) -> None:
         """Terminates the watcher."""
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py 
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index f330de1341..b7e7d7b818 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -140,8 +140,8 @@ class KubernetesPodTrigger(BaseTrigger):
         """Gets current pod status and yields a TriggerEvent."""
         hook = self._get_async_hook()
         self.log.info("Checking pod %r in namespace %r.", self.pod_name, 
self.pod_namespace)
-        while True:
-            try:
+        try:
+            while True:
                 pod = await hook.get_pod(
                     name=self.pod_name,
                     namespace=self.pod_namespace,
@@ -195,40 +195,38 @@ class KubernetesPodTrigger(BaseTrigger):
                         }
                     )
                     return
-            except CancelledError:
-                # That means that task was marked as failed
-                if self.get_logs:
-                    self.log.info("Outputting container logs...")
-                    await self._get_async_hook().read_logs(
-                        name=self.pod_name,
-                        namespace=self.pod_namespace,
-                    )
-                if self.on_finish_action == OnFinishAction.DELETE_POD:
-                    self.log.info("Deleting pod...")
-                    await self._get_async_hook().delete_pod(
-                        name=self.pod_name,
-                        namespace=self.pod_namespace,
-                    )
-                yield TriggerEvent(
-                    {
-                        "name": self.pod_name,
-                        "namespace": self.pod_namespace,
-                        "status": "cancelled",
-                        "message": "Pod execution was cancelled",
-                    }
+        except CancelledError:
+            # That means that task was marked as failed
+            if self.get_logs:
+                self.log.info("Outputting container logs...")
+                await self._get_async_hook().read_logs(
+                    name=self.pod_name,
+                    namespace=self.pod_namespace,
                 )
-                return
-            except Exception as e:
-                self.log.exception("Exception occurred while checking pod 
phase:")
-                yield TriggerEvent(
-                    {
-                        "name": self.pod_name,
-                        "namespace": self.pod_namespace,
-                        "status": "error",
-                        "message": str(e),
-                    }
+            if self.on_finish_action == OnFinishAction.DELETE_POD:
+                self.log.info("Deleting pod...")
+                await self._get_async_hook().delete_pod(
+                    name=self.pod_name,
+                    namespace=self.pod_namespace,
                 )
-                return
+            yield TriggerEvent(
+                {
+                    "name": self.pod_name,
+                    "namespace": self.pod_namespace,
+                    "status": "cancelled",
+                    "message": "Pod execution was cancelled",
+                }
+            )
+        except Exception as e:
+            self.log.exception("Exception occurred while checking pod phase:")
+            yield TriggerEvent(
+                {
+                    "name": self.pod_name,
+                    "namespace": self.pod_namespace,
+                    "status": "error",
+                    "message": str(e),
+                }
+            )
 
     def _get_async_hook(self) -> AsyncKubernetesHook:
         if self._hook is None:

Reply via email to