This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8d74ee8dcd Remove pid arg from celery option to fix duplicate pid
issue, Move celery command to provider package (#36794)
8d74ee8dcd is described below
commit 8d74ee8dcd1b3ad0291ef666835edcffb24265ae
Author: Pavan Sharma <[email protected]>
AuthorDate: Sun Feb 25 12:59:24 2024 +0530
Remove pid arg from celery option to fix duplicate pid issue, Move celery
command to provider package (#36794)
---
airflow/cli/commands/celery_command.py | 17 +++++---
airflow/providers/celery/cli/__init__.py | 16 +++++++
.../celery/cli}/celery_command.py | 51 ++++++++++++++++------
.../providers/celery/executors/celery_executor.py | 6 +--
tests/cli/commands/test_celery_command.py | 6 +--
tests/providers/celery/cli/__init__.py | 16 +++++++
.../celery/cli}/test_celery_command.py | 30 ++++++-------
7 files changed, 99 insertions(+), 43 deletions(-)
diff --git a/airflow/cli/commands/celery_command.py
b/airflow/cli/commands/celery_command.py
index 5e3e01042a..f29641309d 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -15,11 +15,16 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+
+# DO NOT MODIFY THIS FILE unless it is a serious bugfix - all the new celery
commands should be added in celery provider.
+# This file is kept for backward compatibility only.
"""Celery command."""
from __future__ import annotations
import logging
import sys
+import warnings
from contextlib import contextmanager
from multiprocessing import Process
@@ -40,6 +45,10 @@ from airflow.utils.serve_logs import serve_logs
WORKER_PROCESS_NAME = "worker"
+warnings.warn(
+ "Use celery command from providers package, Use celery provider > 3.5.2",
DeprecationWarning, stacklevel=2
+)
+
@cli_utils.action_cli
@providers_configuration_loaded
@@ -153,9 +162,6 @@ def worker(args):
if not celery_log_level:
celery_log_level = conf.get("logging", "LOGGING_LEVEL")
- # Setup pid file location
- worker_pid_file_path, _, _, _ =
setup_locations(process=WORKER_PROCESS_NAME, pid=args.pid)
-
# Setup Celery worker
options = [
"worker",
@@ -169,8 +175,6 @@ def worker(args):
args.celery_hostname,
"--loglevel",
celery_log_level,
- "--pidfile",
- worker_pid_file_path,
]
if autoscale:
options.extend(["--autoscale", autoscale])
@@ -189,11 +193,12 @@ def worker(args):
# executed.
maybe_patch_concurrency(["-P", pool])
- _, stdout, stderr, log_file = setup_locations(
+ worker_pid_file_path, stdout, stderr, log_file = setup_locations(
process=WORKER_PROCESS_NAME,
stdout=args.stdout,
stderr=args.stderr,
log=args.log_file,
+ pid=args.pid,
)
def run_celery_worker():
diff --git a/airflow/providers/celery/cli/__init__.py
b/airflow/providers/celery/cli/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/celery/cli/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/cli/commands/celery_command.py
b/airflow/providers/celery/cli/celery_command.py
similarity index 84%
copy from airflow/cli/commands/celery_command.py
copy to airflow/providers/celery/cli/celery_command.py
index 5e3e01042a..217433f811 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/providers/celery/cli/celery_command.py
@@ -31,18 +31,45 @@ from celery.signals import after_setup_logger
from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile
from airflow import settings
-from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations
-from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
from airflow.utils.serve_logs import serve_logs
WORKER_PROCESS_NAME = "worker"
+def _run_command_with_daemon_option(*args, **kwargs):
+ try:
+ from airflow.cli.commands.daemon_utils import
run_command_with_daemon_option
+
+ run_command_with_daemon_option(*args, **kwargs)
+ except ImportError:
+ from airflow.exceptions import AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import run_command_with_daemon_option. This feature is
only available in Airflow versions >= 2.8.0"
+ )
+
+
+def _providers_configuration_loaded(func):
+ def wrapper(*args, **kwargs):
+ try:
+ from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
+
+ providers_configuration_loaded(func)(*args, **kwargs)
+ except ImportError:
+ from airflow.exceptions import
AirflowOptionalProviderFeatureException
+
+ raise AirflowOptionalProviderFeatureException(
+ "Failed to import providers_configuration_loaded. This feature
is only available in Airflow versions >= 2.8.0"
+ )
+
+ return wrapper
+
+
@cli_utils.action_cli
-@providers_configuration_loaded
+@_providers_configuration_loaded
def flower(args):
"""Start Flower, Celery monitoring tool."""
# This needs to be imported locally to not trigger Providers Manager
initialization
@@ -67,7 +94,7 @@ def flower(args):
if args.flower_conf:
options.append(f"--conf={args.flower_conf}")
- run_command_with_daemon_option(
+ _run_command_with_daemon_option(
args=args, process_name="flower", callback=lambda:
celery_app.start(options)
)
@@ -85,7 +112,7 @@ def _serve_logs(skip_serve_logs: bool = False):
@after_setup_logger.connect()
-@providers_configuration_loaded
+@_providers_configuration_loaded
def logger_setup_handler(logger, **kwargs):
"""
Reconfigure the logger.
@@ -115,7 +142,7 @@ def logger_setup_handler(logger, **kwargs):
@cli_utils.action_cli
-@providers_configuration_loaded
+@_providers_configuration_loaded
def worker(args):
"""Start Airflow Celery worker."""
# This needs to be imported locally to not trigger Providers Manager
initialization
@@ -153,9 +180,6 @@ def worker(args):
if not celery_log_level:
celery_log_level = conf.get("logging", "LOGGING_LEVEL")
- # Setup pid file location
- worker_pid_file_path, _, _, _ =
setup_locations(process=WORKER_PROCESS_NAME, pid=args.pid)
-
# Setup Celery worker
options = [
"worker",
@@ -169,8 +193,6 @@ def worker(args):
args.celery_hostname,
"--loglevel",
celery_log_level,
- "--pidfile",
- worker_pid_file_path,
]
if autoscale:
options.extend(["--autoscale", autoscale])
@@ -189,11 +211,12 @@ def worker(args):
# executed.
maybe_patch_concurrency(["-P", pool])
- _, stdout, stderr, log_file = setup_locations(
+ worker_pid_file_path, stdout, stderr, log_file = setup_locations(
process=WORKER_PROCESS_NAME,
stdout=args.stdout,
stderr=args.stderr,
log=args.log_file,
+ pid=args.pid,
)
def run_celery_worker():
@@ -205,7 +228,7 @@ def worker(args):
else:
umask = conf.get("celery", "worker_umask",
fallback=settings.DAEMON_UMASK)
- run_command_with_daemon_option(
+ _run_command_with_daemon_option(
args=args,
process_name=WORKER_PROCESS_NAME,
callback=run_celery_worker,
@@ -216,7 +239,7 @@ def worker(args):
@cli_utils.action_cli
-@providers_configuration_loaded
+@_providers_configuration_loaded
def stop_worker(args):
"""Send SIGTERM to Celery worker."""
# Read PID from file
diff --git a/airflow/providers/celery/executors/celery_executor.py
b/airflow/providers/celery/executors/celery_executor.py
index f4aa2a9b75..a8a4e320ec 100644
--- a/airflow/providers/celery/executors/celery_executor.py
+++ b/airflow/providers/celery/executors/celery_executor.py
@@ -181,7 +181,7 @@ CELERY_COMMANDS = (
ActionCommand(
name="worker",
help="Start a Celery worker node",
- func=lazy_load_command("airflow.cli.commands.celery_command.worker"),
+
func=lazy_load_command("airflow.providers.celery.cli.celery_command.worker"),
args=(
ARG_QUEUES,
ARG_CONCURRENCY,
@@ -202,7 +202,7 @@ CELERY_COMMANDS = (
ActionCommand(
name="flower",
help="Start a Celery Flower",
- func=lazy_load_command("airflow.cli.commands.celery_command.flower"),
+
func=lazy_load_command("airflow.providers.celery.cli.celery_command.flower"),
args=(
ARG_FLOWER_HOSTNAME,
ARG_FLOWER_PORT,
@@ -221,7 +221,7 @@ CELERY_COMMANDS = (
ActionCommand(
name="stop",
help="Stop the Celery worker gracefully",
-
func=lazy_load_command("airflow.cli.commands.celery_command.stop_worker"),
+
func=lazy_load_command("airflow.providers.celery.cli.celery_command.stop_worker"),
args=(ARG_PID, ARG_VERBOSE),
),
)
diff --git a/tests/cli/commands/test_celery_command.py
b/tests/cli/commands/test_celery_command.py
index cbd87fea58..13cfff5275 100644
--- a/tests/cli/commands/test_celery_command.py
+++ b/tests/cli/commands/test_celery_command.py
@@ -106,7 +106,7 @@ class TestCeleryStopCommand:
assert mock_celery_app.worker_main.call_args
args, _ = mock_celery_app.worker_main.call_args
args_str = " ".join(map(str, args[0]))
- assert f"--pidfile {pid_file}" in args_str
+ assert f"--pidfile {pid_file}" not in args_str
# Call stop
stop_args = self.parser.parse_args(["celery", "stop"])
@@ -134,7 +134,7 @@ class TestCeleryStopCommand:
assert mock_celery_app.worker_main.call_args
args, _ = mock_celery_app.worker_main.call_args
args_str = " ".join(map(str, args[0]))
- assert f"--pidfile {pid_file}" in args_str
+ assert f"--pidfile {pid_file}" not in args_str
stop_args = self.parser.parse_args(["celery", "stop", "--pid",
pid_file])
celery_command.stop_worker(stop_args)
@@ -194,8 +194,6 @@ class TestWorkerStart:
celery_hostname,
"--loglevel",
conf.get("logging", "CELERY_LOGGING_LEVEL"),
- "--pidfile",
- pid_file,
"--autoscale",
autoscale,
"--without-mingle",
diff --git a/tests/providers/celery/cli/__init__.py
b/tests/providers/celery/cli/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/celery/cli/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/cli/commands/test_celery_command.py
b/tests/providers/celery/cli/test_celery_command.py
similarity index 92%
copy from tests/cli/commands/test_celery_command.py
copy to tests/providers/celery/cli/test_celery_command.py
index cbd87fea58..c896f5d87d 100644
--- a/tests/cli/commands/test_celery_command.py
+++ b/tests/providers/celery/cli/test_celery_command.py
@@ -27,8 +27,8 @@ import sqlalchemy
import airflow
from airflow.cli import cli_parser
-from airflow.cli.commands import celery_command
from airflow.configuration import conf
+from airflow.providers.celery.cli import celery_command
from tests.test_utils.config import conf_vars
pytestmark = pytest.mark.db_test
@@ -72,8 +72,8 @@ class TestCeleryStopCommand:
importlib.reload(cli_parser)
cls.parser = cli_parser.get_parser()
- @mock.patch("airflow.cli.commands.celery_command.setup_locations")
- @mock.patch("airflow.cli.commands.celery_command.psutil.Process")
+ @mock.patch("airflow.providers.celery.cli.celery_command.setup_locations")
+ @mock.patch("airflow.providers.celery.cli.celery_command.psutil.Process")
def test_if_right_pid_is_read(self, mock_process, mock_setup_locations,
tmp_path):
args = self.parser.parse_args(["celery", "stop"])
pid = "123"
@@ -90,9 +90,9 @@ class TestCeleryStopCommand:
mock_process.assert_called_once_with(int(pid))
mock_process.return_value.terminate.assert_called_once_with()
- @mock.patch("airflow.cli.commands.celery_command.read_pid_from_pidfile")
+
@mock.patch("airflow.providers.celery.cli.celery_command.read_pid_from_pidfile")
@mock.patch("airflow.providers.celery.executors.celery_executor.app")
- @mock.patch("airflow.cli.commands.celery_command.setup_locations")
+ @mock.patch("airflow.providers.celery.cli.celery_command.setup_locations")
def test_same_pid_file_is_used_in_start_and_stop(
self, mock_setup_locations, mock_celery_app, mock_read_pid_from_pidfile
):
@@ -106,18 +106,18 @@ class TestCeleryStopCommand:
assert mock_celery_app.worker_main.call_args
args, _ = mock_celery_app.worker_main.call_args
args_str = " ".join(map(str, args[0]))
- assert f"--pidfile {pid_file}" in args_str
+ assert f"--pidfile {pid_file}" not in args_str
# Call stop
stop_args = self.parser.parse_args(["celery", "stop"])
celery_command.stop_worker(stop_args)
mock_read_pid_from_pidfile.assert_called_once_with(pid_file)
- @mock.patch("airflow.cli.commands.celery_command.remove_existing_pidfile")
- @mock.patch("airflow.cli.commands.celery_command.read_pid_from_pidfile")
+
@mock.patch("airflow.providers.celery.cli.celery_command.remove_existing_pidfile")
+
@mock.patch("airflow.providers.celery.cli.celery_command.read_pid_from_pidfile")
@mock.patch("airflow.providers.celery.executors.celery_executor.app")
- @mock.patch("airflow.cli.commands.celery_command.psutil.Process")
- @mock.patch("airflow.cli.commands.celery_command.setup_locations")
+ @mock.patch("airflow.providers.celery.cli.celery_command.psutil.Process")
+ @mock.patch("airflow.providers.celery.cli.celery_command.setup_locations")
def test_custom_pid_file_is_used_in_start_and_stop(
self,
mock_setup_locations,
@@ -134,7 +134,7 @@ class TestCeleryStopCommand:
assert mock_celery_app.worker_main.call_args
args, _ = mock_celery_app.worker_main.call_args
args_str = " ".join(map(str, args[0]))
- assert f"--pidfile {pid_file}" in args_str
+ assert f"--pidfile {pid_file}" not in args_str
stop_args = self.parser.parse_args(["celery", "stop", "--pid",
pid_file])
celery_command.stop_worker(stop_args)
@@ -152,8 +152,8 @@ class TestWorkerStart:
importlib.reload(cli_parser)
cls.parser = cli_parser.get_parser()
- @mock.patch("airflow.cli.commands.celery_command.setup_locations")
- @mock.patch("airflow.cli.commands.celery_command.Process")
+ @mock.patch("airflow.providers.celery.cli.celery_command.setup_locations")
+ @mock.patch("airflow.providers.celery.cli.celery_command.Process")
@mock.patch("airflow.providers.celery.executors.celery_executor.app")
def test_worker_started_with_required_arguments(self, mock_celery_app,
mock_popen, mock_locations):
pid_file = "pid_file"
@@ -194,8 +194,6 @@ class TestWorkerStart:
celery_hostname,
"--loglevel",
conf.get("logging", "CELERY_LOGGING_LEVEL"),
- "--pidfile",
- pid_file,
"--autoscale",
autoscale,
"--without-mingle",
@@ -214,7 +212,7 @@ class TestWorkerFailure:
importlib.reload(cli_parser)
cls.parser = cli_parser.get_parser()
- @mock.patch("airflow.cli.commands.celery_command.Process")
+ @mock.patch("airflow.providers.celery.cli.celery_command.Process")
@mock.patch("airflow.providers.celery.executors.celery_executor.app")
def test_worker_failure_gracefull_shutdown(self, mock_celery_app,
mock_popen):
args = self.parser.parse_args(["celery", "worker"])