This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7eef587f50f Trigger remote shutdown of edge worker (#50278)
7eef587f50f is described below
commit 7eef587f50fa49024fb1c6d8955332075f9392cc
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Tue May 6 15:10:52 2025 -0500
Trigger remote shutdown of edge worker (#50278)
---
.../airflow/providers/edge3/cli/edge_command.py | 21 +++++++++++++++++++++
.../airflow/providers/edge3/models/edge_worker.py | 22 +++++++++++++++++-----
.../providers/edge3/worker_api/routes/worker.py | 10 +++++++++-
3 files changed, 47 insertions(+), 6 deletions(-)
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
b/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
index 8ff5e953dd0..7e3fa6459e1 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
@@ -473,6 +473,9 @@ class _EdgeWorkerCli:
_EdgeWorkerCli.maintenance_comments =
worker_info.maintenance_comments
else:
_EdgeWorkerCli.maintenance_comments = None
+ if worker_info.state == EdgeWorkerState.SHUTDOWN_REQUEST:
+ logger.info("Shutdown requested!")
+ _EdgeWorkerCli.drain = True
worker_state_changed = worker_info.state != state
except EdgeWorkerVersionException:
@@ -712,6 +715,18 @@ def remove_remote_worker(args) -> None:
raise SystemExit
+@cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
+def remote_worker_request_shutdown(args) -> None:
+ """Initiate the shutdown of the remote edge worker."""
+ _check_valid_db_connection()
+ _check_if_registered_edge_host(hostname=args.edge_hostname)
+ from airflow.providers.edge3.models.edge_worker import request_shutdown
+
+ request_shutdown(args.edge_hostname)
+ logger.info("Requested shutdown of Edge Worker host %s by %s.",
args.edge_hostname, getuser())
+
+
ARG_CONCURRENCY = Arg(
("-c", "--concurrency"),
type=int,
@@ -855,4 +870,10 @@ EDGE_COMMANDS: list[ActionCommand] = [
func=remove_remote_worker,
args=(ARG_REQUIRED_EDGE_HOSTNAME,),
),
+ ActionCommand(
+ name="shutdown-remote-edge-worker",
+ help=remote_worker_request_shutdown.__doc__,
+ func=remote_worker_request_shutdown,
+ args=(ARG_REQUIRED_EDGE_HOSTNAME,),
+ ),
]
diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
index 75a1eac3cb8..bc614ecf612 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
@@ -55,6 +55,8 @@ class EdgeWorkerState(str, Enum):
"""Edge Worker is actively running a task."""
IDLE = "idle"
"""Edge Worker is active and waiting for a task."""
+ SHUTDOWN_REQUEST = "shutdown request"
+ """Request to shutdown Edge Worker."""
TERMINATING = "terminating"
"""Edge Worker is completing work and stopping."""
OFFLINE = "offline"
@@ -243,7 +245,7 @@ def remove_worker(worker_name: str, session: Session =
NEW_SESSION) -> None:
"""Remove a worker that is offline or just gone from DB."""
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name ==
worker_name)
worker: EdgeWorkerModel = session.scalar(query)
- if worker.state == EdgeWorkerState.OFFLINE or worker.state ==
EdgeWorkerState.OFFLINE_MAINTENANCE:
+ if worker.state in (EdgeWorkerState.OFFLINE,
EdgeWorkerState.OFFLINE_MAINTENANCE):
session.execute(delete(EdgeWorkerModel).where(EdgeWorkerModel.worker_name ==
worker_name))
else:
error_message = f"Cannot remove edge worker {worker_name} as it is in
{worker.state} state!"
@@ -258,12 +260,22 @@ def change_maintenance_comment(
"""Write maintenance comment in the db."""
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name ==
worker_name)
worker: EdgeWorkerModel = session.scalar(query)
- if (
- worker.state == EdgeWorkerState.MAINTENANCE_MODE
- or worker.state == EdgeWorkerState.OFFLINE_MAINTENANCE
- ):
+ if worker.state in (EdgeWorkerState.MAINTENANCE_MODE,
EdgeWorkerState.OFFLINE_MAINTENANCE):
worker.maintenance_comment = maintenance_comment
else:
error_message = f"Cannot change maintenance comment as {worker_name}
is not in maintenance!"
logger.error(error_message)
raise TypeError(error_message)
+
+
+@provide_session
+def request_shutdown(worker_name: str, session: Session = NEW_SESSION) -> None:
+ """Request to shutdown the edge worker."""
+ query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name ==
worker_name)
+ worker: EdgeWorkerModel = session.scalar(query)
+ if worker.state not in (
+ EdgeWorkerState.OFFLINE,
+ EdgeWorkerState.OFFLINE_MAINTENANCE,
+ EdgeWorkerState.UNKNOWN,
+ ):
+ worker.state = EdgeWorkerState.SHUTDOWN_REQUEST
diff --git
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
index 32b3729ec2f..f251d32acf9 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
@@ -114,7 +114,7 @@ _worker_queue_doc = Body(
def redefine_state(worker_state: EdgeWorkerState, body_state: EdgeWorkerState)
-> EdgeWorkerState:
- """Redefine the state of the worker based on maintenance request."""
+ """Redefine the state of the worker based on maintenance or shutdown
request."""
if (
worker_state == EdgeWorkerState.MAINTENANCE_REQUEST
and body_state
@@ -133,6 +133,14 @@ def redefine_state(worker_state: EdgeWorkerState,
body_state: EdgeWorkerState) -
if body_state == EdgeWorkerState.MAINTENANCE_MODE:
return EdgeWorkerState.IDLE
+ if worker_state == EdgeWorkerState.SHUTDOWN_REQUEST:
+ if body_state not in (
+ EdgeWorkerState.OFFLINE_MAINTENANCE,
+ EdgeWorkerState.OFFLINE,
+ EdgeWorkerState.UNKNOWN,
+ ):
+ return EdgeWorkerState.SHUTDOWN_REQUEST
+
return body_state