This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7eef587f50f Trigger remote shutdown of edge worker (#50278)
7eef587f50f is described below

commit 7eef587f50fa49024fb1c6d8955332075f9392cc
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Tue May 6 15:10:52 2025 -0500

    Trigger remote shutdown of edge worker (#50278)
---
 .../airflow/providers/edge3/cli/edge_command.py    | 21 +++++++++++++++++++++
 .../airflow/providers/edge3/models/edge_worker.py  | 22 +++++++++++++++++-----
 .../providers/edge3/worker_api/routes/worker.py    | 10 +++++++++-
 3 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py 
b/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
index 8ff5e953dd0..7e3fa6459e1 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
@@ -473,6 +473,9 @@ class _EdgeWorkerCli:
                 _EdgeWorkerCli.maintenance_comments = 
worker_info.maintenance_comments
             else:
                 _EdgeWorkerCli.maintenance_comments = None
+            if worker_info.state == EdgeWorkerState.SHUTDOWN_REQUEST:
+                logger.info("Shutdown requested!")
+                _EdgeWorkerCli.drain = True
 
             worker_state_changed = worker_info.state != state
         except EdgeWorkerVersionException:
@@ -712,6 +715,18 @@ def remove_remote_worker(args) -> None:
         raise SystemExit
 
 
+@cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
+def remote_worker_request_shutdown(args) -> None:
+    """Initiate the shutdown of the remote edge worker."""
+    _check_valid_db_connection()
+    _check_if_registered_edge_host(hostname=args.edge_hostname)
+    from airflow.providers.edge3.models.edge_worker import request_shutdown
+
+    request_shutdown(args.edge_hostname)
+    logger.info("Requested shutdown of Edge Worker host %s by %s.", 
args.edge_hostname, getuser())
+
+
 ARG_CONCURRENCY = Arg(
     ("-c", "--concurrency"),
     type=int,
@@ -855,4 +870,10 @@ EDGE_COMMANDS: list[ActionCommand] = [
         func=remove_remote_worker,
         args=(ARG_REQUIRED_EDGE_HOSTNAME,),
     ),
+    ActionCommand(
+        name="shutdown-remote-edge-worker",
+        help=remote_worker_request_shutdown.__doc__,
+        func=remote_worker_request_shutdown,
+        args=(ARG_REQUIRED_EDGE_HOSTNAME,),
+    ),
 ]
diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py 
b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
index 75a1eac3cb8..bc614ecf612 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
@@ -55,6 +55,8 @@ class EdgeWorkerState(str, Enum):
     """Edge Worker is actively running a task."""
     IDLE = "idle"
     """Edge Worker is active and waiting for a task."""
+    SHUTDOWN_REQUEST = "shutdown request"
+    """Request to shutdown Edge Worker."""
     TERMINATING = "terminating"
     """Edge Worker is completing work and stopping."""
     OFFLINE = "offline"
@@ -243,7 +245,7 @@ def remove_worker(worker_name: str, session: Session = 
NEW_SESSION) -> None:
     """Remove a worker that is offline or just gone from DB."""
     query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == 
worker_name)
     worker: EdgeWorkerModel = session.scalar(query)
-    if worker.state == EdgeWorkerState.OFFLINE or worker.state == 
EdgeWorkerState.OFFLINE_MAINTENANCE:
+    if worker.state in (EdgeWorkerState.OFFLINE, 
EdgeWorkerState.OFFLINE_MAINTENANCE):
         
session.execute(delete(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == 
worker_name))
     else:
         error_message = f"Cannot remove edge worker {worker_name} as it is in 
{worker.state} state!"
@@ -258,12 +260,22 @@ def change_maintenance_comment(
     """Write maintenance comment in the db."""
     query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == 
worker_name)
     worker: EdgeWorkerModel = session.scalar(query)
-    if (
-        worker.state == EdgeWorkerState.MAINTENANCE_MODE
-        or worker.state == EdgeWorkerState.OFFLINE_MAINTENANCE
-    ):
+    if worker.state in (EdgeWorkerState.MAINTENANCE_MODE, 
EdgeWorkerState.OFFLINE_MAINTENANCE):
         worker.maintenance_comment = maintenance_comment
     else:
         error_message = f"Cannot change maintenance comment as {worker_name} 
is not in maintenance!"
         logger.error(error_message)
         raise TypeError(error_message)
+
+
+@provide_session
+def request_shutdown(worker_name: str, session: Session = NEW_SESSION) -> None:
+    """Request to shutdown the edge worker."""
+    query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == 
worker_name)
+    worker: EdgeWorkerModel = session.scalar(query)
+    if worker.state not in (
+        EdgeWorkerState.OFFLINE,
+        EdgeWorkerState.OFFLINE_MAINTENANCE,
+        EdgeWorkerState.UNKNOWN,
+    ):
+        worker.state = EdgeWorkerState.SHUTDOWN_REQUEST
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
index 32b3729ec2f..f251d32acf9 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
@@ -114,7 +114,7 @@ _worker_queue_doc = Body(
 
 
 def redefine_state(worker_state: EdgeWorkerState, body_state: EdgeWorkerState) 
-> EdgeWorkerState:
-    """Redefine the state of the worker based on maintenance request."""
+    """Redefine the state of the worker based on maintenance or shutdown 
request."""
     if (
         worker_state == EdgeWorkerState.MAINTENANCE_REQUEST
         and body_state
@@ -133,6 +133,14 @@ def redefine_state(worker_state: EdgeWorkerState, 
body_state: EdgeWorkerState) -
         if body_state == EdgeWorkerState.MAINTENANCE_MODE:
             return EdgeWorkerState.IDLE
 
+    if worker_state == EdgeWorkerState.SHUTDOWN_REQUEST:
+        if body_state not in (
+            EdgeWorkerState.OFFLINE_MAINTENANCE,
+            EdgeWorkerState.OFFLINE,
+            EdgeWorkerState.UNKNOWN,
+        ):
+            return EdgeWorkerState.SHUTDOWN_REQUEST
+
     return body_state
 
 

Reply via email to