This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d74ee8dcd Remove pid arg from celery option to fix duplicate pid 
issue, Move celery command to provider package (#36794)
8d74ee8dcd is described below

commit 8d74ee8dcd1b3ad0291ef666835edcffb24265ae
Author: Pavan Sharma <[email protected]>
AuthorDate: Sun Feb 25 12:59:24 2024 +0530

    Remove pid arg from celery option to fix duplicate pid issue, Move celery 
command to provider package (#36794)
---
 airflow/cli/commands/celery_command.py             | 17 +++++---
 airflow/providers/celery/cli/__init__.py           | 16 +++++++
 .../celery/cli}/celery_command.py                  | 51 ++++++++++++++++------
 .../providers/celery/executors/celery_executor.py  |  6 +--
 tests/cli/commands/test_celery_command.py          |  6 +--
 tests/providers/celery/cli/__init__.py             | 16 +++++++
 .../celery/cli}/test_celery_command.py             | 30 ++++++-------
 7 files changed, 99 insertions(+), 43 deletions(-)

diff --git a/airflow/cli/commands/celery_command.py 
b/airflow/cli/commands/celery_command.py
index 5e3e01042a..f29641309d 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -15,11 +15,16 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+
+# DO NOT MODIFY THIS FILE unless it is a serious bugfix - all the new celery 
commands should be added in celery provider.
+# This file is kept for backward compatibility only.
 """Celery command."""
 from __future__ import annotations
 
 import logging
 import sys
+import warnings
 from contextlib import contextmanager
 from multiprocessing import Process
 
@@ -40,6 +45,10 @@ from airflow.utils.serve_logs import serve_logs
 
 WORKER_PROCESS_NAME = "worker"
 
+warnings.warn(
+    "Use celery command from providers package, Use celery provider > 3.5.2", 
DeprecationWarning, stacklevel=2
+)
+
 
 @cli_utils.action_cli
 @providers_configuration_loaded
@@ -153,9 +162,6 @@ def worker(args):
     if not celery_log_level:
         celery_log_level = conf.get("logging", "LOGGING_LEVEL")
 
-    # Setup pid file location
-    worker_pid_file_path, _, _, _ = 
setup_locations(process=WORKER_PROCESS_NAME, pid=args.pid)
-
     # Setup Celery worker
     options = [
         "worker",
@@ -169,8 +175,6 @@ def worker(args):
         args.celery_hostname,
         "--loglevel",
         celery_log_level,
-        "--pidfile",
-        worker_pid_file_path,
     ]
     if autoscale:
         options.extend(["--autoscale", autoscale])
@@ -189,11 +193,12 @@ def worker(args):
         # executed.
         maybe_patch_concurrency(["-P", pool])
 
-    _, stdout, stderr, log_file = setup_locations(
+    worker_pid_file_path, stdout, stderr, log_file = setup_locations(
         process=WORKER_PROCESS_NAME,
         stdout=args.stdout,
         stderr=args.stderr,
         log=args.log_file,
+        pid=args.pid,
     )
 
     def run_celery_worker():
diff --git a/airflow/providers/celery/cli/__init__.py 
b/airflow/providers/celery/cli/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/celery/cli/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/cli/commands/celery_command.py 
b/airflow/providers/celery/cli/celery_command.py
similarity index 84%
copy from airflow/cli/commands/celery_command.py
copy to airflow/providers/celery/cli/celery_command.py
index 5e3e01042a..217433f811 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/providers/celery/cli/celery_command.py
@@ -31,18 +31,45 @@ from celery.signals import after_setup_logger
 from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile
 
 from airflow import settings
-from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
 from airflow.configuration import conf
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import setup_locations
-from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.serve_logs import serve_logs
 
 WORKER_PROCESS_NAME = "worker"
 
 
+def _run_command_with_daemon_option(*args, **kwargs):
+    try:
+        from airflow.cli.commands.daemon_utils import 
run_command_with_daemon_option
+
+        run_command_with_daemon_option(*args, **kwargs)
+    except ImportError:
+        from airflow.exceptions import AirflowOptionalProviderFeatureException
+
+        raise AirflowOptionalProviderFeatureException(
+            "Failed to import run_command_with_daemon_option. This feature is 
only available in Airflow versions >= 2.8.0"
+        )
+
+
+def _providers_configuration_loaded(func):
+    def wrapper(*args, **kwargs):
+        try:
+            from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
+
+            providers_configuration_loaded(func)(*args, **kwargs)
+        except ImportError:
+            from airflow.exceptions import 
AirflowOptionalProviderFeatureException
+
+            raise AirflowOptionalProviderFeatureException(
+                "Failed to import providers_configuration_loaded. This feature 
is only available in Airflow versions >= 2.8.0"
+            )
+
+    return wrapper
+
+
 @cli_utils.action_cli
-@providers_configuration_loaded
+@_providers_configuration_loaded
 def flower(args):
     """Start Flower, Celery monitoring tool."""
     # This needs to be imported locally to not trigger Providers Manager 
initialization
@@ -67,7 +94,7 @@ def flower(args):
     if args.flower_conf:
         options.append(f"--conf={args.flower_conf}")
 
-    run_command_with_daemon_option(
+    _run_command_with_daemon_option(
         args=args, process_name="flower", callback=lambda: 
celery_app.start(options)
     )
 
@@ -85,7 +112,7 @@ def _serve_logs(skip_serve_logs: bool = False):
 
 
 @after_setup_logger.connect()
-@providers_configuration_loaded
+@_providers_configuration_loaded
 def logger_setup_handler(logger, **kwargs):
     """
     Reconfigure the logger.
@@ -115,7 +142,7 @@ def logger_setup_handler(logger, **kwargs):
 
 
 @cli_utils.action_cli
-@providers_configuration_loaded
+@_providers_configuration_loaded
 def worker(args):
     """Start Airflow Celery worker."""
     # This needs to be imported locally to not trigger Providers Manager 
initialization
@@ -153,9 +180,6 @@ def worker(args):
     if not celery_log_level:
         celery_log_level = conf.get("logging", "LOGGING_LEVEL")
 
-    # Setup pid file location
-    worker_pid_file_path, _, _, _ = 
setup_locations(process=WORKER_PROCESS_NAME, pid=args.pid)
-
     # Setup Celery worker
     options = [
         "worker",
@@ -169,8 +193,6 @@ def worker(args):
         args.celery_hostname,
         "--loglevel",
         celery_log_level,
-        "--pidfile",
-        worker_pid_file_path,
     ]
     if autoscale:
         options.extend(["--autoscale", autoscale])
@@ -189,11 +211,12 @@ def worker(args):
         # executed.
         maybe_patch_concurrency(["-P", pool])
 
-    _, stdout, stderr, log_file = setup_locations(
+    worker_pid_file_path, stdout, stderr, log_file = setup_locations(
         process=WORKER_PROCESS_NAME,
         stdout=args.stdout,
         stderr=args.stderr,
         log=args.log_file,
+        pid=args.pid,
     )
 
     def run_celery_worker():
@@ -205,7 +228,7 @@ def worker(args):
     else:
         umask = conf.get("celery", "worker_umask", 
fallback=settings.DAEMON_UMASK)
 
-    run_command_with_daemon_option(
+    _run_command_with_daemon_option(
         args=args,
         process_name=WORKER_PROCESS_NAME,
         callback=run_celery_worker,
@@ -216,7 +239,7 @@ def worker(args):
 
 
 @cli_utils.action_cli
-@providers_configuration_loaded
+@_providers_configuration_loaded
 def stop_worker(args):
     """Send SIGTERM to Celery worker."""
     # Read PID from file
diff --git a/airflow/providers/celery/executors/celery_executor.py 
b/airflow/providers/celery/executors/celery_executor.py
index f4aa2a9b75..a8a4e320ec 100644
--- a/airflow/providers/celery/executors/celery_executor.py
+++ b/airflow/providers/celery/executors/celery_executor.py
@@ -181,7 +181,7 @@ CELERY_COMMANDS = (
     ActionCommand(
         name="worker",
         help="Start a Celery worker node",
-        func=lazy_load_command("airflow.cli.commands.celery_command.worker"),
+        
func=lazy_load_command("airflow.providers.celery.cli.celery_command.worker"),
         args=(
             ARG_QUEUES,
             ARG_CONCURRENCY,
@@ -202,7 +202,7 @@ CELERY_COMMANDS = (
     ActionCommand(
         name="flower",
         help="Start a Celery Flower",
-        func=lazy_load_command("airflow.cli.commands.celery_command.flower"),
+        
func=lazy_load_command("airflow.providers.celery.cli.celery_command.flower"),
         args=(
             ARG_FLOWER_HOSTNAME,
             ARG_FLOWER_PORT,
@@ -221,7 +221,7 @@ CELERY_COMMANDS = (
     ActionCommand(
         name="stop",
         help="Stop the Celery worker gracefully",
-        
func=lazy_load_command("airflow.cli.commands.celery_command.stop_worker"),
+        
func=lazy_load_command("airflow.providers.celery.cli.celery_command.stop_worker"),
         args=(ARG_PID, ARG_VERBOSE),
     ),
 )
diff --git a/tests/cli/commands/test_celery_command.py 
b/tests/cli/commands/test_celery_command.py
index cbd87fea58..13cfff5275 100644
--- a/tests/cli/commands/test_celery_command.py
+++ b/tests/cli/commands/test_celery_command.py
@@ -106,7 +106,7 @@ class TestCeleryStopCommand:
         assert mock_celery_app.worker_main.call_args
         args, _ = mock_celery_app.worker_main.call_args
         args_str = " ".join(map(str, args[0]))
-        assert f"--pidfile {pid_file}" in args_str
+        assert f"--pidfile {pid_file}" not in args_str
 
         # Call stop
         stop_args = self.parser.parse_args(["celery", "stop"])
@@ -134,7 +134,7 @@ class TestCeleryStopCommand:
         assert mock_celery_app.worker_main.call_args
         args, _ = mock_celery_app.worker_main.call_args
         args_str = " ".join(map(str, args[0]))
-        assert f"--pidfile {pid_file}" in args_str
+        assert f"--pidfile {pid_file}" not in args_str
 
         stop_args = self.parser.parse_args(["celery", "stop", "--pid", 
pid_file])
         celery_command.stop_worker(stop_args)
@@ -194,8 +194,6 @@ class TestWorkerStart:
                 celery_hostname,
                 "--loglevel",
                 conf.get("logging", "CELERY_LOGGING_LEVEL"),
-                "--pidfile",
-                pid_file,
                 "--autoscale",
                 autoscale,
                 "--without-mingle",
diff --git a/tests/providers/celery/cli/__init__.py 
b/tests/providers/celery/cli/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/celery/cli/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/cli/commands/test_celery_command.py 
b/tests/providers/celery/cli/test_celery_command.py
similarity index 92%
copy from tests/cli/commands/test_celery_command.py
copy to tests/providers/celery/cli/test_celery_command.py
index cbd87fea58..c896f5d87d 100644
--- a/tests/cli/commands/test_celery_command.py
+++ b/tests/providers/celery/cli/test_celery_command.py
@@ -27,8 +27,8 @@ import sqlalchemy
 
 import airflow
 from airflow.cli import cli_parser
-from airflow.cli.commands import celery_command
 from airflow.configuration import conf
+from airflow.providers.celery.cli import celery_command
 from tests.test_utils.config import conf_vars
 
 pytestmark = pytest.mark.db_test
@@ -72,8 +72,8 @@ class TestCeleryStopCommand:
             importlib.reload(cli_parser)
             cls.parser = cli_parser.get_parser()
 
-    @mock.patch("airflow.cli.commands.celery_command.setup_locations")
-    @mock.patch("airflow.cli.commands.celery_command.psutil.Process")
+    @mock.patch("airflow.providers.celery.cli.celery_command.setup_locations")
+    @mock.patch("airflow.providers.celery.cli.celery_command.psutil.Process")
     def test_if_right_pid_is_read(self, mock_process, mock_setup_locations, 
tmp_path):
         args = self.parser.parse_args(["celery", "stop"])
         pid = "123"
@@ -90,9 +90,9 @@ class TestCeleryStopCommand:
         mock_process.assert_called_once_with(int(pid))
         mock_process.return_value.terminate.assert_called_once_with()
 
-    @mock.patch("airflow.cli.commands.celery_command.read_pid_from_pidfile")
+    
@mock.patch("airflow.providers.celery.cli.celery_command.read_pid_from_pidfile")
     @mock.patch("airflow.providers.celery.executors.celery_executor.app")
-    @mock.patch("airflow.cli.commands.celery_command.setup_locations")
+    @mock.patch("airflow.providers.celery.cli.celery_command.setup_locations")
     def test_same_pid_file_is_used_in_start_and_stop(
         self, mock_setup_locations, mock_celery_app, mock_read_pid_from_pidfile
     ):
@@ -106,18 +106,18 @@ class TestCeleryStopCommand:
         assert mock_celery_app.worker_main.call_args
         args, _ = mock_celery_app.worker_main.call_args
         args_str = " ".join(map(str, args[0]))
-        assert f"--pidfile {pid_file}" in args_str
+        assert f"--pidfile {pid_file}" not in args_str
 
         # Call stop
         stop_args = self.parser.parse_args(["celery", "stop"])
         celery_command.stop_worker(stop_args)
         mock_read_pid_from_pidfile.assert_called_once_with(pid_file)
 
-    @mock.patch("airflow.cli.commands.celery_command.remove_existing_pidfile")
-    @mock.patch("airflow.cli.commands.celery_command.read_pid_from_pidfile")
+    
@mock.patch("airflow.providers.celery.cli.celery_command.remove_existing_pidfile")
+    
@mock.patch("airflow.providers.celery.cli.celery_command.read_pid_from_pidfile")
     @mock.patch("airflow.providers.celery.executors.celery_executor.app")
-    @mock.patch("airflow.cli.commands.celery_command.psutil.Process")
-    @mock.patch("airflow.cli.commands.celery_command.setup_locations")
+    @mock.patch("airflow.providers.celery.cli.celery_command.psutil.Process")
+    @mock.patch("airflow.providers.celery.cli.celery_command.setup_locations")
     def test_custom_pid_file_is_used_in_start_and_stop(
         self,
         mock_setup_locations,
@@ -134,7 +134,7 @@ class TestCeleryStopCommand:
         assert mock_celery_app.worker_main.call_args
         args, _ = mock_celery_app.worker_main.call_args
         args_str = " ".join(map(str, args[0]))
-        assert f"--pidfile {pid_file}" in args_str
+        assert f"--pidfile {pid_file}" not in args_str
 
         stop_args = self.parser.parse_args(["celery", "stop", "--pid", 
pid_file])
         celery_command.stop_worker(stop_args)
@@ -152,8 +152,8 @@ class TestWorkerStart:
             importlib.reload(cli_parser)
             cls.parser = cli_parser.get_parser()
 
-    @mock.patch("airflow.cli.commands.celery_command.setup_locations")
-    @mock.patch("airflow.cli.commands.celery_command.Process")
+    @mock.patch("airflow.providers.celery.cli.celery_command.setup_locations")
+    @mock.patch("airflow.providers.celery.cli.celery_command.Process")
     @mock.patch("airflow.providers.celery.executors.celery_executor.app")
     def test_worker_started_with_required_arguments(self, mock_celery_app, 
mock_popen, mock_locations):
         pid_file = "pid_file"
@@ -194,8 +194,6 @@ class TestWorkerStart:
                 celery_hostname,
                 "--loglevel",
                 conf.get("logging", "CELERY_LOGGING_LEVEL"),
-                "--pidfile",
-                pid_file,
                 "--autoscale",
                 autoscale,
                 "--without-mingle",
@@ -214,7 +212,7 @@ class TestWorkerFailure:
             importlib.reload(cli_parser)
             cls.parser = cli_parser.get_parser()
 
-    @mock.patch("airflow.cli.commands.celery_command.Process")
+    @mock.patch("airflow.providers.celery.cli.celery_command.Process")
     @mock.patch("airflow.providers.celery.executors.celery_executor.app")
     def test_worker_failure_gracefull_shutdown(self, mock_celery_app, 
mock_popen):
         args = self.parser.parse_args(["celery", "worker"])

Reply via email to