This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1b122c1503 Move the try outside the loop when this is possible in
kubernetes provider (#33977)
1b122c1503 is described below
commit 1b122c15030e99cef9d4ff26d3781a7a9d6949bc
Author: Hussein Awala <[email protected]>
AuthorDate: Fri Sep 1 08:49:05 2023 +0200
Move the try outside the loop when this is possible in kubernetes provider
(#33977)
---
.../kubernetes/executors/kubernetes_executor.py | 25 +++-----
.../executors/kubernetes_executor_utils.py | 13 ++---
airflow/providers/cncf/kubernetes/triggers/pod.py | 66 +++++++++++-----------
3 files changed, 46 insertions(+), 58 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 90e4927c34..a0825d1d62 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -23,6 +23,7 @@ KubernetesExecutor.
"""
from __future__ import annotations
+import contextlib
import json
import logging
import multiprocessing
@@ -354,8 +355,8 @@ class KubernetesExecutor(BaseExecutor):
self.kube_scheduler.sync()
last_resource_version: dict[str, str] = defaultdict(lambda: "0")
- while True:
- try:
+ with contextlib.suppress(Empty):
+ while True:
results = self.result_queue.get_nowait()
try:
key, state, pod_name, namespace, resource_version = results
@@ -373,8 +374,6 @@ class KubernetesExecutor(BaseExecutor):
self.result_queue.put(results)
finally:
self.result_queue.task_done()
- except Empty:
- break
from
airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import
ResourceVersion
@@ -386,8 +385,8 @@ class KubernetesExecutor(BaseExecutor):
from kubernetes.client.rest import ApiException
- for _ in range(self.kube_config.worker_pods_creation_batch_size):
- try:
+ with contextlib.suppress(Empty):
+ for _ in range(self.kube_config.worker_pods_creation_batch_size):
task = self.task_queue.get_nowait()
try:
@@ -423,8 +422,6 @@ class KubernetesExecutor(BaseExecutor):
self.fail(key, e)
finally:
self.task_queue.task_done()
- except Empty:
- break
# Run any pending timed events
next_event = self.event_scheduler.run(blocking=False)
@@ -666,22 +663,20 @@ class KubernetesExecutor(BaseExecutor):
assert self.task_queue
self.log.debug("Executor shutting down, task_queue approximate
size=%d", self.task_queue.qsize())
- while True:
- try:
+ with contextlib.suppress(Empty):
+ while True:
task = self.task_queue.get_nowait()
# This is a new task to run thus ok to ignore.
self.log.warning("Executor shutting down, will NOT run
task=%s", task)
self.task_queue.task_done()
- except Empty:
- break
def _flush_result_queue(self) -> None:
if TYPE_CHECKING:
assert self.result_queue
self.log.debug("Executor shutting down, result_queue approximate
size=%d", self.result_queue.qsize())
- while True:
- try:
+ with contextlib.suppress(Empty):
+ while True:
results = self.result_queue.get_nowait()
self.log.warning("Executor shutting down, flushing
results=%s", results)
try:
@@ -700,8 +695,6 @@ class KubernetesExecutor(BaseExecutor):
)
finally:
self.result_queue.task_done()
- except Empty:
- break
def end(self) -> None:
"""Called when the executor shuts down."""
diff --git
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index f8868dff44..3277d3e60f 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import contextlib
import json
import multiprocessing
import time
@@ -440,16 +441,14 @@ class AirflowKubernetesScheduler(LoggingMixin):
"""
self.log.debug("Syncing KubernetesExecutor")
self._health_check_kube_watchers()
- while True:
- try:
+ with contextlib.suppress(Empty):
+ while True:
task = self.watcher_queue.get_nowait()
try:
self.log.debug("Processing task %s", task)
self.process_watcher_task(task)
finally:
self.watcher_queue.task_done()
- except Empty:
- break
def process_watcher_task(self, task: KubernetesWatchType) -> None:
"""Process the task by watcher."""
@@ -467,14 +466,12 @@ class AirflowKubernetesScheduler(LoggingMixin):
def _flush_watcher_queue(self) -> None:
self.log.debug("Executor shutting down, watcher_queue approx.
size=%d", self.watcher_queue.qsize())
- while True:
- try:
+ with contextlib.suppress(Empty):
+ while True:
task = self.watcher_queue.get_nowait()
# Ignoring it since it can only have either FAILED or
SUCCEEDED pods
self.log.warning("Executor shutting down, IGNORING watcher
task=%s", task)
self.watcher_queue.task_done()
- except Empty:
- break
def terminate(self) -> None:
"""Terminates the watcher."""
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index f330de1341..b7e7d7b818 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -140,8 +140,8 @@ class KubernetesPodTrigger(BaseTrigger):
"""Gets current pod status and yields a TriggerEvent."""
hook = self._get_async_hook()
self.log.info("Checking pod %r in namespace %r.", self.pod_name,
self.pod_namespace)
- while True:
- try:
+ try:
+ while True:
pod = await hook.get_pod(
name=self.pod_name,
namespace=self.pod_namespace,
@@ -195,40 +195,38 @@ class KubernetesPodTrigger(BaseTrigger):
}
)
return
- except CancelledError:
- # That means that task was marked as failed
- if self.get_logs:
- self.log.info("Outputting container logs...")
- await self._get_async_hook().read_logs(
- name=self.pod_name,
- namespace=self.pod_namespace,
- )
- if self.on_finish_action == OnFinishAction.DELETE_POD:
- self.log.info("Deleting pod...")
- await self._get_async_hook().delete_pod(
- name=self.pod_name,
- namespace=self.pod_namespace,
- )
- yield TriggerEvent(
- {
- "name": self.pod_name,
- "namespace": self.pod_namespace,
- "status": "cancelled",
- "message": "Pod execution was cancelled",
- }
+ except CancelledError:
+ # That means that task was marked as failed
+ if self.get_logs:
+ self.log.info("Outputting container logs...")
+ await self._get_async_hook().read_logs(
+ name=self.pod_name,
+ namespace=self.pod_namespace,
)
- return
- except Exception as e:
- self.log.exception("Exception occurred while checking pod
phase:")
- yield TriggerEvent(
- {
- "name": self.pod_name,
- "namespace": self.pod_namespace,
- "status": "error",
- "message": str(e),
- }
+ if self.on_finish_action == OnFinishAction.DELETE_POD:
+ self.log.info("Deleting pod...")
+ await self._get_async_hook().delete_pod(
+ name=self.pod_name,
+ namespace=self.pod_namespace,
)
- return
+ yield TriggerEvent(
+ {
+ "name": self.pod_name,
+ "namespace": self.pod_namespace,
+ "status": "cancelled",
+ "message": "Pod execution was cancelled",
+ }
+ )
+ except Exception as e:
+ self.log.exception("Exception occurred while checking pod phase:")
+ yield TriggerEvent(
+ {
+ "name": self.pod_name,
+ "namespace": self.pod_namespace,
+ "status": "error",
+ "message": str(e),
+ }
+ )
def _get_async_hook(self) -> AsyncKubernetesHook:
if self._hook is None: