This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8918b435be Move the try outside the loop when this is possible in
Airflow core (#33975)
8918b435be is described below
commit 8918b435be8c683bbd6bb2ffa871dbd31d476f48
Author: Hussein Awala <[email protected]>
AuthorDate: Sun Sep 3 23:46:14 2023 +0200
Move the try outside the loop when this is possible in Airflow core (#33975)
* Move the try outside the loop when this is possible in Airflow core
* Use supress instead of except pass
---
airflow/api/__init__.py | 10 +++++-----
airflow/cli/commands/standalone_command.py | 8 ++++----
airflow/executors/local_executor.py | 7 +++----
airflow/jobs/triggerer_job_runner.py | 10 +++++-----
airflow/triggers/external_task.py | 9 ++++-----
airflow/www/extensions/init_security.py | 10 +++++-----
6 files changed, 26 insertions(+), 28 deletions(-)
diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py
index 656009b0dd..a58caea9fd 100644
--- a/airflow/api/__init__.py
+++ b/airflow/api/__init__.py
@@ -36,12 +36,12 @@ def load_auth():
pass
backends = []
- for backend in auth_backends.split(","):
- try:
+ try:
+ for backend in auth_backends.split(","):
auth = import_module(backend.strip())
log.info("Loaded API auth backend: %s", backend)
backends.append(auth)
- except ImportError as err:
- log.critical("Cannot import %s for API authentication due to: %s",
backend, err)
- raise AirflowException(err)
+ except ImportError as err:
+ log.critical("Cannot import %s for API authentication due to: %s",
backend, err)
+ raise AirflowException(err)
return backends
diff --git a/airflow/cli/commands/standalone_command.py
b/airflow/cli/commands/standalone_command.py
index 3265b6b747..eb5c2af0f5 100644
--- a/airflow/cli/commands/standalone_command.py
+++ b/airflow/cli/commands/standalone_command.py
@@ -94,8 +94,8 @@ class StandaloneCommand:
command.start()
# Run output loop
shown_ready = False
- while True:
- try:
+ try:
+ while True:
# Print all the current lines onto the screen
self.update_output()
# Print info banner when all components are ready and the
@@ -111,8 +111,8 @@ class StandaloneCommand:
shown_ready = True
# Ensure we idle-sleep rather than fast-looping
time.sleep(0.1)
- except KeyboardInterrupt:
- break
+ except KeyboardInterrupt:
+ pass
# Stop subcommand threads
self.print_output("standalone", "Shutting down components")
for command in self.subcommands.values():
diff --git a/airflow/executors/local_executor.py
b/airflow/executors/local_executor.py
index fd3cc6a629..16aa649ee7 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -24,6 +24,7 @@ LocalExecutor.
"""
from __future__ import annotations
+import contextlib
import logging
import os
import subprocess
@@ -331,15 +332,13 @@ class LocalExecutor(BaseExecutor):
def sync(self):
"""Sync will get called periodically by the heartbeat method."""
- while True:
- try:
+ with contextlib.suppress(Empty):
+ while True:
results = self.executor.result_queue.get_nowait()
try:
self.executor.change_state(*results)
finally:
self.executor.result_queue.task_done()
- except Empty:
- break
def end(self):
"""
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index bedacad4da..9262a53f30 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -468,8 +468,8 @@ class TriggerRunner(threading.Thread, LoggingMixin):
"""
watchdog = asyncio.create_task(self.block_watchdog())
last_status = time.time()
- while not self.stop:
- try:
+ try:
+ while not self.stop:
# Run core logic
await self.create_triggers()
await self.cancel_triggers()
@@ -481,9 +481,9 @@ class TriggerRunner(threading.Thread, LoggingMixin):
count = len(self.triggers)
self.log.info("%i triggers currently running", count)
last_status = time.time()
- except Exception:
- self.stop = True
- raise
+ except Exception:
+ self.stop = True
+ raise
# Wait for watchdog to complete
await watchdog
diff --git a/airflow/triggers/external_task.py
b/airflow/triggers/external_task.py
index 00c7c52284..c353c01159 100644
--- a/airflow/triggers/external_task.py
+++ b/airflow/triggers/external_task.py
@@ -95,8 +95,8 @@ class TaskStateTrigger(BaseTrigger):
If dag with specified name was not in the running state after
_timeout_sec seconds
after starting execution process of the trigger, terminate with status
'timeout'.
"""
- while True:
- try:
+ try:
+ while True:
delta = utcnow() - self.trigger_start_time
if delta.total_seconds() < self._timeout_sec:
# mypy confuses typing here
@@ -112,9 +112,8 @@ class TaskStateTrigger(BaseTrigger):
return
self.log.info("Task is still running, sleeping for %s
seconds...", self.poll_interval)
await asyncio.sleep(self.poll_interval)
- except Exception:
- yield TriggerEvent({"status": "failed"})
- return
+ except Exception:
+ yield TriggerEvent({"status": "failed"})
@sync_to_async
@provide_session
diff --git a/airflow/www/extensions/init_security.py
b/airflow/www/extensions/init_security.py
index 41a8dc6afc..390c7bc2ba 100644
--- a/airflow/www/extensions/init_security.py
+++ b/airflow/www/extensions/init_security.py
@@ -56,14 +56,14 @@ def init_api_experimental_auth(app):
pass
app.api_auth = []
- for backend in auth_backends.split(","):
- try:
+ try:
+ for backend in auth_backends.split(","):
auth = import_module(backend.strip())
auth.init_app(app)
app.api_auth.append(auth)
- except ImportError as err:
- log.critical("Cannot import %s for API authentication due to: %s",
backend, err)
- raise AirflowException(err)
+ except ImportError as err:
+ log.critical("Cannot import %s for API authentication due to: %s",
backend, err)
+ raise AirflowException(err)
def init_check_user_active(app):