This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ed106c4fa0 Add shutdown-all-workers command to Edge CLI (#55626)
2ed106c4fa0 is described below

commit 2ed106c4fa0a1f923391de30b2474aaea91415c7
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Sat Sep 13 14:43:34 2025 -0500

    Add shutdown-all-workers command to Edge CLI (#55626)
    
    * Add shutdown-all-workers command to Edge CLI
    
      Implement new Edge CLI command to gracefully shutdown all registered edge 
workers with confirmation prompt. Command follows similar
      pattern to celery shutdown-all-workers, requiring user confirmation with 
y/n prompt unless bypassed with --yes flag.
    
    * Doc update
---
 providers/edge3/docs/deployment.rst                |  1 +
 .../airflow/providers/edge3/cli/edge_command.py    | 43 ++++++++++++++++++++++
 2 files changed, 44 insertions(+)

diff --git a/providers/edge3/docs/deployment.rst 
b/providers/edge3/docs/deployment.rst
index 3b90db06810..ee303e32a9f 100644
--- a/providers/edge3/docs/deployment.rst
+++ b/providers/edge3/docs/deployment.rst
@@ -189,6 +189,7 @@ instance. The commands are:
 - ``airflow edge remote-edge-worker-update-maintenance-comment``: Updates the 
maintenance comment for a remote edge worker
 - ``airflow edge remote-edge-worker-exit-maintenance``: Request a remote edge 
worker to exit maintenance mode
 - ``airflow edge shutdown-remote-edge-worker``: Shuts down a remote edge 
worker gracefully
+- ``airflow edge shutdown-all-workers``: Request graceful shutdown of all 
registered edge workers
 - ``airflow edge remove-remote-edge-worker``: Remove a worker instance from 
the cluster
 - ``airflow edge add-worker-queues``: Add queues to an edge worker
 - ``airflow edge remove-worker-queues``: Remove queues from an edge worker
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py 
b/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
index 39bdb0c6575..63296917683 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
@@ -351,6 +351,37 @@ def remote_worker_request_shutdown(args) -> None:
     logger.info("Requested shutdown of Edge Worker host %s by %s.", 
args.edge_hostname, getuser())
 
 
+@cli_utils.action_cli(check_db=False)
+@providers_configuration_loaded
+def shutdown_all_workers(args) -> None:
+    """Request graceful shutdown of all edge workers."""
+    _check_valid_db_connection()
+    if not (
+        args.yes
+        or input("This will shutdown all active edge workers, this cannot be 
undone! Proceed? (y/n)").upper()
+        == "Y"
+    ):
+        raise SystemExit("Cancelled")
+
+    from airflow.providers.edge3.models.edge_worker import 
get_registered_edge_hosts, request_shutdown
+
+    all_hosts = list(get_registered_edge_hosts())
+    if not all_hosts:
+        logger.info("No edge workers found to shutdown.")
+        return
+
+    shutdown_count = 0
+    for host in all_hosts:
+        try:
+            request_shutdown(host.worker_name)
+            logger.info("Requested shutdown of Edge Worker host %s", 
host.worker_name)
+            shutdown_count += 1
+        except Exception as e:
+            logger.error("Failed to shutdown Edge Worker host %s: %s", 
host.worker_name, e)
+
+    logger.info("Requested shutdown of %d edge workers by %s.", 
shutdown_count, getuser())
+
+
 @cli_utils.action_cli(check_db=False)
 @providers_configuration_loaded
 def add_worker_queues(args) -> None:
@@ -468,6 +499,12 @@ ARG_UMASK = Arg(
 ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file if run in 
daemon mode")
 ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file if run in 
daemon mode")
 ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file if run 
in daemon mode")
+ARG_YES = Arg(
+    ("-y", "--yes"),
+    help="Skip confirmation prompt and proceed with shutdown",
+    action="store_true",
+    default=False,
+)
 
 EDGE_COMMANDS: list[ActionCommand] = [
     ActionCommand(
@@ -581,4 +618,10 @@ EDGE_COMMANDS: list[ActionCommand] = [
             ARG_QUEUES_MANAGE,
         ),
     ),
+    ActionCommand(
+        name="shutdown-all-workers",
+        help=shutdown_all_workers.__doc__,
+        func=shutdown_all_workers,
+        args=(ARG_YES,),
+    ),
 ]

Reply via email to