This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 96aae9718f Edge worker graceful shutdown on version mismatch (#43462)
96aae9718f is described below

commit 96aae9718f51af1414ba90ac082d74e8ceaaf34c
Author: majorosdonat <[email protected]>
AuthorDate: Tue Oct 29 13:44:05 2024 +0100

    Edge worker graceful shutdown on version mismatch (#43462)
    
    * React on EdgeWorkerVersionException
    
    * update version number
    
    * import EdgeWorkerVersionException
    
    * edge worker exception is thrown later
    
    * fix missing variable
    
    * Add pytests
    
    * changelog added
    
    * fix static checks
    
    ---------
    
    Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]>
---
 providers/src/airflow/providers/edge/CHANGELOG.rst       |  8 ++++++++
 providers/src/airflow/providers/edge/__init__.py         |  2 +-
 providers/src/airflow/providers/edge/cli/edge_command.py | 16 +++++++++++++---
 .../src/airflow/providers/edge/models/edge_worker.py     |  2 +-
 providers/src/airflow/providers/edge/provider.yaml       |  2 +-
 providers/tests/edge/cli/test_edge_command.py            |  8 +++++++-
 6 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst 
b/providers/src/airflow/providers/edge/CHANGELOG.rst
index afd17c6b29..7f09a7ae6a 100644
--- a/providers/src/airflow/providers/edge/CHANGELOG.rst
+++ b/providers/src/airflow/providers/edge/CHANGELOG.rst
@@ -27,6 +27,14 @@
 Changelog
 ---------
 
+0.5.0pre0
+.........
+
+Misc
+~~~~
+
+* ``Edge worker triggers graceful shutdown, if worker version and main 
instance do not match.``
+
 0.4.0pre0
 .........
 
diff --git a/providers/src/airflow/providers/edge/__init__.py 
b/providers/src/airflow/providers/edge/__init__.py
index 9b565f8e89..47db38e168 100644
--- a/providers/src/airflow/providers/edge/__init__.py
+++ b/providers/src/airflow/providers/edge/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
 
 __all__ = ["__version__"]
 
-__version__ = "0.4.0pre0"
+__version__ = "0.5.0pre0"
 
 if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
     "2.10.0"
diff --git a/providers/src/airflow/providers/edge/cli/edge_command.py 
b/providers/src/airflow/providers/edge/cli/edge_command.py
index 5e900a8a2c..487b3adde7 100644
--- a/providers/src/airflow/providers/edge/cli/edge_command.py
+++ b/providers/src/airflow/providers/edge/cli/edge_command.py
@@ -39,7 +39,7 @@ from airflow.exceptions import AirflowException
 from airflow.providers.edge import __version__ as edge_provider_version
 from airflow.providers.edge.models.edge_job import EdgeJob
 from airflow.providers.edge.models.edge_logs import EdgeLogs
-from airflow.providers.edge.models.edge_worker import EdgeWorker, 
EdgeWorkerState
+from airflow.providers.edge.models.edge_worker import EdgeWorker, 
EdgeWorkerState, EdgeWorkerVersionException
 from airflow.utils import cli as cli_utils
 from airflow.utils.platform import IS_WINDOWS
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
@@ -175,6 +175,9 @@ class _EdgeWorkerCli:
             self.last_hb = EdgeWorker.register_worker(
                 self.hostname, EdgeWorkerState.STARTING, self.queues, 
self._get_sysinfo()
             ).last_update
+        except EdgeWorkerVersionException as e:
+            logger.info("Version mismatch of Edge worker and Core. Shutting 
down worker.")
+            raise SystemExit(str(e))
         except AirflowException as e:
             if "404:NOT FOUND" in str(e):
                 raise SystemExit("Error: API endpoint is not ready, please set 
[edge] api_enabled=True.")
@@ -186,7 +189,10 @@ class _EdgeWorkerCli:
                 self.loop()
 
             logger.info("Quitting worker, signal being offline.")
-            EdgeWorker.set_state(self.hostname, EdgeWorkerState.OFFLINE, 0, 
self._get_sysinfo())
+            try:
+                EdgeWorker.set_state(self.hostname, EdgeWorkerState.OFFLINE, 
0, self._get_sysinfo())
+            except EdgeWorkerVersionException:
+                logger.info("Version mismatch of Edge worker and Core. 
Quitting worker anyway.")
         finally:
             remove_existing_pidfile(self.pid_file_path)
 
@@ -259,7 +265,11 @@ class _EdgeWorkerCli:
             else EdgeWorkerState.IDLE
         )
         sysinfo = self._get_sysinfo()
-        self.queues = EdgeWorker.set_state(self.hostname, state, 
len(self.jobs), sysinfo)
+        try:
+            self.queues = EdgeWorker.set_state(self.hostname, state, 
len(self.jobs), sysinfo)
+        except EdgeWorkerVersionException:
+            logger.info("Version mismatch of Edge worker and Core. Shutting 
down worker.")
+            _EdgeWorkerCli.drain = True
 
     def interruptible_sleep(self):
         """Sleeps but stops sleeping if drain is made."""
diff --git a/providers/src/airflow/providers/edge/models/edge_worker.py 
b/providers/src/airflow/providers/edge/models/edge_worker.py
index 61bb627cf1..b65d935038 100644
--- a/providers/src/airflow/providers/edge/models/edge_worker.py
+++ b/providers/src/airflow/providers/edge/models/edge_worker.py
@@ -265,7 +265,6 @@ class EdgeWorker(BaseModel, LoggingMixin):
         session: Session = NEW_SESSION,
     ) -> list[str] | None:
         """Set state of worker and returns the current assigned queues."""
-        EdgeWorker.assert_version(sysinfo)
         query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == 
worker_name)
         worker: EdgeWorkerModel = session.scalar(query)
         worker.state = state
@@ -283,6 +282,7 @@ class EdgeWorker(BaseModel, LoggingMixin):
             concurrency=int(sysinfo["concurrency"]),
             queues=worker.queues,
         )
+        EdgeWorker.assert_version(sysinfo)  #  Exception only after worker 
state is in the DB
         return worker.queues
 
     @staticmethod
diff --git a/providers/src/airflow/providers/edge/provider.yaml 
b/providers/src/airflow/providers/edge/provider.yaml
index 28de7ca89b..78fcf5ce7d 100644
--- a/providers/src/airflow/providers/edge/provider.yaml
+++ b/providers/src/airflow/providers/edge/provider.yaml
@@ -27,7 +27,7 @@ source-date-epoch: 1729683247
 
 # note that those versions are maintained by release manager - do not update 
them manually
 versions:
-  - 0.4.0pre0
+  - 0.5.0pre0
 
 dependencies:
   - apache-airflow>=2.10.0
diff --git a/providers/tests/edge/cli/test_edge_command.py 
b/providers/tests/edge/cli/test_edge_command.py
index b7a5e3ae3a..f340813024 100644
--- a/providers/tests/edge/cli/test_edge_command.py
+++ b/providers/tests/edge/cli/test_edge_command.py
@@ -29,7 +29,7 @@ import time_machine
 from airflow.exceptions import AirflowException
 from airflow.providers.edge.cli.edge_command import _EdgeWorkerCli, _Job, 
_write_pid_to_pidfile
 from airflow.providers.edge.models.edge_job import EdgeJob
-from airflow.providers.edge.models.edge_worker import EdgeWorker, 
EdgeWorkerState
+from airflow.providers.edge.models.edge_worker import EdgeWorker, 
EdgeWorkerState, EdgeWorkerVersionException
 from airflow.utils.state import TaskInstanceState
 
 from tests_common.test_utils.config import conf_vars
@@ -266,6 +266,12 @@ class TestEdgeWorkerCli:
         assert "queue1" in (queue_list)
         assert "queue2" in (queue_list)
 
+    @patch("airflow.providers.edge.models.edge_worker.EdgeWorker.set_state")
+    def test_version_mismatch(self, mock_set_state, worker_with_job):
+        mock_set_state.side_effect = EdgeWorkerVersionException("")
+        worker_with_job.heartbeat()
+        assert worker_with_job.drain
+
     
@patch("airflow.providers.edge.models.edge_worker.EdgeWorker.register_worker")
     def test_start_missing_apiserver(self, mock_register_worker, 
worker_with_job: _EdgeWorkerCli):
         mock_register_worker.side_effect = AirflowException(

Reply via email to