This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 964997a7a6d Add CLI command to remove all queues from Celery worker 
(#56195)
964997a7a6d is described below

commit 964997a7a6da5041fb19e48bc31866ffc6fe7bc7
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Tue Sep 30 14:28:06 2025 -0500

    Add CLI command to remove all queues from Celery worker (#56195)
    
    Implement  command that unsubscribes
      a Celery worker from all its active queues. This complements the existing
       command by providing a bulk operation for queue management.
---
 .../airflow/providers/celery/cli/celery_command.py | 32 ++++++++++++++++++++++
 .../providers/celery/executors/celery_executor.py  |  6 ++++
 .../tests/unit/celery/cli/test_celery_command.py   | 23 ++++++++++++++++
 3 files changed, 61 insertions(+)

diff --git 
a/providers/celery/src/airflow/providers/celery/cli/celery_command.py 
b/providers/celery/src/airflow/providers/celery/cli/celery_command.py
index 3022e7b474b..5a79d260e86 100644
--- a/providers/celery/src/airflow/providers/celery/cli/celery_command.py
+++ b/providers/celery/src/airflow/providers/celery/cli/celery_command.py
@@ -396,3 +396,35 @@ def remove_queue(args):
     queues = args.queues.split(",")
     for queue in queues:
         celery_app.control.cancel_consumer(queue, 
destination=[args.celery_hostname])
+
+
+@cli_utils.action_cli(check_db=False)
+@_providers_configuration_loaded
+def remove_all_queues(args):
+    """Unsubscribe a Celery worker from all its active queues."""
+    _check_if_active_celery_worker(hostname=args.celery_hostname)
+    # This needs to be imported locally to not trigger Providers Manager 
initialization
+    from airflow.providers.celery.executors.celery_executor import app as 
celery_app
+
+    inspect = celery_app.control.inspect()
+    active_workers = inspect.active_queues()
+
+    if not active_workers or args.celery_hostname not in active_workers:
+        print(f"No active queues found for worker: {args.celery_hostname}")
+        return
+
+    worker_queues = active_workers[args.celery_hostname]
+    queue_names = [queue["name"] for queue in worker_queues if "name" in queue]
+
+    if not queue_names:
+        print(f"No queues to remove for worker: {args.celery_hostname}")
+        return
+
+    print(
+        f"Removing {len(queue_names)} queue(s) from worker 
{args.celery_hostname}: {', '.join(queue_names)}"
+    )
+
+    for queue_name in queue_names:
+        celery_app.control.cancel_consumer(queue_name, 
destination=[args.celery_hostname])
+
+    print(f"Successfully removed all queues from worker: 
{args.celery_hostname}")
diff --git 
a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py 
b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
index 6091dc5c124..13b5ce3237f 100644
--- a/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
+++ b/providers/celery/src/airflow/providers/celery/executors/celery_executor.py
@@ -268,6 +268,12 @@ CELERY_COMMANDS = (
             ARG_FULL_CELERY_HOSTNAME,
         ),
     ),
+    ActionCommand(
+        name="remove-all-queues",
+        help="Unsubscribe Celery worker from all its active queues",
+        func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.remove_all_queues"),
+        args=(ARG_FULL_CELERY_HOSTNAME,),
+    ),
 )
 
 
diff --git a/providers/celery/tests/unit/celery/cli/test_celery_command.py 
b/providers/celery/tests/unit/celery/cli/test_celery_command.py
index 2bd0755c32c..e4aedfe45e4 100644
--- a/providers/celery/tests/unit/celery/cli/test_celery_command.py
+++ b/providers/celery/tests/unit/celery/cli/test_celery_command.py
@@ -439,6 +439,29 @@ class TestRemoteCeleryControlCommands:
             celery_command.remove_queue(args)
             mock_cancel_consumer.assert_called_once_with("test1", 
destination=["celery@host_1"])
 
+    @pytest.mark.db_test
+    
@mock.patch("airflow.providers.celery.executors.celery_executor.app.control.cancel_consumer")
+    
@mock.patch("airflow.providers.celery.executors.celery_executor.app.control.inspect")
+    def test_remove_all_queues(self, mock_inspect, mock_cancel_consumer):
+        args = self.parser.parse_args(["celery", "remove-all-queues", "-H", 
"celery@host_1"])
+        mock_instance = MagicMock()
+        mock_instance.active_queues.return_value = {
+            "celery@host_1": [{"name": "queue1"}, {"name": "queue2"}],
+            "celery@host_2": [{"name": "queue3"}],
+        }
+        mock_inspect.return_value = mock_instance
+        with patch(
+            
"airflow.providers.celery.cli.celery_command._check_if_active_celery_worker", 
return_value=None
+        ):
+            celery_command.remove_all_queues(args)
+            # Verify cancel_consumer was called for each queue
+            expected_calls = [
+                mock.call("queue1", destination=["celery@host_1"]),
+                mock.call("queue2", destination=["celery@host_1"]),
+            ]
+            mock_cancel_consumer.assert_has_calls(expected_calls, 
any_order=True)
+            assert mock_cancel_consumer.call_count == 2
+
 
 @patch("airflow.providers.celery.cli.celery_command.Process")
 @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Doesn't apply to pre-3.0")

Reply via email to