This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 96aae9718f Edge worker graceful shutdown on version mismatch (#43462)
96aae9718f is described below
commit 96aae9718f51af1414ba90ac082d74e8ceaaf34c
Author: majorosdonat <[email protected]>
AuthorDate: Tue Oct 29 13:44:05 2024 +0100
Edge worker graceful shutdown on version mismatch (#43462)
* React on EdgeWorkerVersionException
* update version number
* import EdgeWorkerVersionException
* edge worker exception is thrown later
* fix missing variable
* Add pytests
* changelog added
* fix static checks
---------
Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]>
---
providers/src/airflow/providers/edge/CHANGELOG.rst | 8 ++++++++
providers/src/airflow/providers/edge/__init__.py | 2 +-
providers/src/airflow/providers/edge/cli/edge_command.py | 16 +++++++++++++---
.../src/airflow/providers/edge/models/edge_worker.py | 2 +-
providers/src/airflow/providers/edge/provider.yaml | 2 +-
providers/tests/edge/cli/test_edge_command.py | 8 +++++++-
6 files changed, 31 insertions(+), 7 deletions(-)
diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst
b/providers/src/airflow/providers/edge/CHANGELOG.rst
index afd17c6b29..7f09a7ae6a 100644
--- a/providers/src/airflow/providers/edge/CHANGELOG.rst
+++ b/providers/src/airflow/providers/edge/CHANGELOG.rst
@@ -27,6 +27,14 @@
Changelog
---------
+0.5.0pre0
+.........
+
+Misc
+~~~~
+
+* ``Edge worker triggers graceful shutdown, if worker version and main
instance do not match.``
+
0.4.0pre0
.........
diff --git a/providers/src/airflow/providers/edge/__init__.py
b/providers/src/airflow/providers/edge/__init__.py
index 9b565f8e89..47db38e168 100644
--- a/providers/src/airflow/providers/edge/__init__.py
+++ b/providers/src/airflow/providers/edge/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
__all__ = ["__version__"]
-__version__ = "0.4.0pre0"
+__version__ = "0.5.0pre0"
if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
"2.10.0"
diff --git a/providers/src/airflow/providers/edge/cli/edge_command.py
b/providers/src/airflow/providers/edge/cli/edge_command.py
index 5e900a8a2c..487b3adde7 100644
--- a/providers/src/airflow/providers/edge/cli/edge_command.py
+++ b/providers/src/airflow/providers/edge/cli/edge_command.py
@@ -39,7 +39,7 @@ from airflow.exceptions import AirflowException
from airflow.providers.edge import __version__ as edge_provider_version
from airflow.providers.edge.models.edge_job import EdgeJob
from airflow.providers.edge.models.edge_logs import EdgeLogs
-from airflow.providers.edge.models.edge_worker import EdgeWorker,
EdgeWorkerState
+from airflow.providers.edge.models.edge_worker import EdgeWorker,
EdgeWorkerState, EdgeWorkerVersionException
from airflow.utils import cli as cli_utils
from airflow.utils.platform import IS_WINDOWS
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
@@ -175,6 +175,9 @@ class _EdgeWorkerCli:
self.last_hb = EdgeWorker.register_worker(
self.hostname, EdgeWorkerState.STARTING, self.queues,
self._get_sysinfo()
).last_update
+ except EdgeWorkerVersionException as e:
+ logger.info("Version mismatch of Edge worker and Core. Shutting
down worker.")
+ raise SystemExit(str(e))
except AirflowException as e:
if "404:NOT FOUND" in str(e):
raise SystemExit("Error: API endpoint is not ready, please set
[edge] api_enabled=True.")
@@ -186,7 +189,10 @@ class _EdgeWorkerCli:
self.loop()
logger.info("Quitting worker, signal being offline.")
- EdgeWorker.set_state(self.hostname, EdgeWorkerState.OFFLINE, 0,
self._get_sysinfo())
+ try:
+ EdgeWorker.set_state(self.hostname, EdgeWorkerState.OFFLINE,
0, self._get_sysinfo())
+ except EdgeWorkerVersionException:
+ logger.info("Version mismatch of Edge worker and Core.
Quitting worker anyway.")
finally:
remove_existing_pidfile(self.pid_file_path)
@@ -259,7 +265,11 @@ class _EdgeWorkerCli:
else EdgeWorkerState.IDLE
)
sysinfo = self._get_sysinfo()
- self.queues = EdgeWorker.set_state(self.hostname, state,
len(self.jobs), sysinfo)
+ try:
+ self.queues = EdgeWorker.set_state(self.hostname, state,
len(self.jobs), sysinfo)
+ except EdgeWorkerVersionException:
+ logger.info("Version mismatch of Edge worker and Core. Shutting
down worker.")
+ _EdgeWorkerCli.drain = True
def interruptible_sleep(self):
"""Sleeps but stops sleeping if drain is made."""
diff --git a/providers/src/airflow/providers/edge/models/edge_worker.py
b/providers/src/airflow/providers/edge/models/edge_worker.py
index 61bb627cf1..b65d935038 100644
--- a/providers/src/airflow/providers/edge/models/edge_worker.py
+++ b/providers/src/airflow/providers/edge/models/edge_worker.py
@@ -265,7 +265,6 @@ class EdgeWorker(BaseModel, LoggingMixin):
session: Session = NEW_SESSION,
) -> list[str] | None:
"""Set state of worker and returns the current assigned queues."""
- EdgeWorker.assert_version(sysinfo)
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name ==
worker_name)
worker: EdgeWorkerModel = session.scalar(query)
worker.state = state
@@ -283,6 +282,7 @@ class EdgeWorker(BaseModel, LoggingMixin):
concurrency=int(sysinfo["concurrency"]),
queues=worker.queues,
)
+ EdgeWorker.assert_version(sysinfo) # Exception only after worker
state is in the DB
return worker.queues
@staticmethod
diff --git a/providers/src/airflow/providers/edge/provider.yaml
b/providers/src/airflow/providers/edge/provider.yaml
index 28de7ca89b..78fcf5ce7d 100644
--- a/providers/src/airflow/providers/edge/provider.yaml
+++ b/providers/src/airflow/providers/edge/provider.yaml
@@ -27,7 +27,7 @@ source-date-epoch: 1729683247
# note that those versions are maintained by release manager - do not update
them manually
versions:
- - 0.4.0pre0
+ - 0.5.0pre0
dependencies:
- apache-airflow>=2.10.0
diff --git a/providers/tests/edge/cli/test_edge_command.py
b/providers/tests/edge/cli/test_edge_command.py
index b7a5e3ae3a..f340813024 100644
--- a/providers/tests/edge/cli/test_edge_command.py
+++ b/providers/tests/edge/cli/test_edge_command.py
@@ -29,7 +29,7 @@ import time_machine
from airflow.exceptions import AirflowException
from airflow.providers.edge.cli.edge_command import _EdgeWorkerCli, _Job,
_write_pid_to_pidfile
from airflow.providers.edge.models.edge_job import EdgeJob
-from airflow.providers.edge.models.edge_worker import EdgeWorker,
EdgeWorkerState
+from airflow.providers.edge.models.edge_worker import EdgeWorker,
EdgeWorkerState, EdgeWorkerVersionException
from airflow.utils.state import TaskInstanceState
from tests_common.test_utils.config import conf_vars
@@ -266,6 +266,12 @@ class TestEdgeWorkerCli:
assert "queue1" in (queue_list)
assert "queue2" in (queue_list)
+ @patch("airflow.providers.edge.models.edge_worker.EdgeWorker.set_state")
+ def test_version_mismatch(self, mock_set_state, worker_with_job):
+ mock_set_state.side_effect = EdgeWorkerVersionException("")
+ worker_with_job.heartbeat()
+ assert worker_with_job.drain
+
@patch("airflow.providers.edge.models.edge_worker.EdgeWorker.register_worker")
def test_start_missing_apiserver(self, mock_register_worker,
worker_with_job: _EdgeWorkerCli):
mock_register_worker.side_effect = AirflowException(