This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2ed106c4fa0 Add shutdown-all-workers command to Edge CLI (#55626)
2ed106c4fa0 is described below
commit 2ed106c4fa0a1f923391de30b2474aaea91415c7
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Sat Sep 13 14:43:34 2025 -0500
Add shutdown-all-workers command to Edge CLI (#55626)
* Add shutdown-all-workers command to Edge CLI
Implement new Edge CLI command to gracefully shutdown all registered edge
workers with confirmation prompt. Command follows similar
pattern to celery shutdown-all-workers, requiring user confirmation with
y/n prompt unless bypassed with --yes flag.
* Doc update
---
providers/edge3/docs/deployment.rst | 1 +
.../airflow/providers/edge3/cli/edge_command.py | 43 ++++++++++++++++++++++
2 files changed, 44 insertions(+)
diff --git a/providers/edge3/docs/deployment.rst
b/providers/edge3/docs/deployment.rst
index 3b90db06810..ee303e32a9f 100644
--- a/providers/edge3/docs/deployment.rst
+++ b/providers/edge3/docs/deployment.rst
@@ -189,6 +189,7 @@ instance. The commands are:
- ``airflow edge remote-edge-worker-update-maintenance-comment``: Updates the
maintenance comment for a remote edge worker
- ``airflow edge remote-edge-worker-exit-maintenance``: Request a remote edge
worker to exit maintenance mode
- ``airflow edge shutdown-remote-edge-worker``: Shuts down a remote edge
worker gracefully
+- ``airflow edge shutdown-all-workers``: Request graceful shutdown of all
registered edge workers
- ``airflow edge remove-remote-edge-worker``: Remove a worker instance from
the cluster
- ``airflow edge add-worker-queues``: Add queues to an edge worker
- ``airflow edge remove-worker-queues``: Remove queues from an edge worker
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
b/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
index 39bdb0c6575..63296917683 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
@@ -351,6 +351,37 @@ def remote_worker_request_shutdown(args) -> None:
logger.info("Requested shutdown of Edge Worker host %s by %s.",
args.edge_hostname, getuser())
+@cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
+def shutdown_all_workers(args) -> None:
+ """Request graceful shutdown of all edge workers."""
+ _check_valid_db_connection()
+ if not (
+ args.yes
+ or input("This will shutdown all active edge workers, this cannot be
undone! Proceed? (y/n)").upper()
+ == "Y"
+ ):
+ raise SystemExit("Cancelled")
+
+ from airflow.providers.edge3.models.edge_worker import
get_registered_edge_hosts, request_shutdown
+
+ all_hosts = list(get_registered_edge_hosts())
+ if not all_hosts:
+ logger.info("No edge workers found to shutdown.")
+ return
+
+ shutdown_count = 0
+ for host in all_hosts:
+ try:
+ request_shutdown(host.worker_name)
+ logger.info("Requested shutdown of Edge Worker host %s",
host.worker_name)
+ shutdown_count += 1
+ except Exception as e:
+ logger.error("Failed to shutdown Edge Worker host %s: %s",
host.worker_name, e)
+
+ logger.info("Requested shutdown of %d edge workers by %s.",
shutdown_count, getuser())
+
+
@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def add_worker_queues(args) -> None:
@@ -468,6 +499,12 @@ ARG_UMASK = Arg(
ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file if run in
daemon mode")
ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file if run in
daemon mode")
ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file if run
in daemon mode")
+ARG_YES = Arg(
+ ("-y", "--yes"),
+ help="Skip confirmation prompt and proceed with shutdown",
+ action="store_true",
+ default=False,
+)
EDGE_COMMANDS: list[ActionCommand] = [
ActionCommand(
@@ -581,4 +618,10 @@ EDGE_COMMANDS: list[ActionCommand] = [
ARG_QUEUES_MANAGE,
),
),
+ ActionCommand(
+ name="shutdown-all-workers",
+ help=shutdown_all_workers.__doc__,
+ func=shutdown_all_workers,
+ args=(ARG_YES,),
+ ),
]