This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3e30b3a025 Use celery worker CLI from Airflow package for Airflow <
2.8.0 (#38879)
3e30b3a025 is described below
commit 3e30b3a02584e13fa130255b25756eaf7dfe35d3
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Apr 9 21:44:59 2024 +0200
Use celery worker CLI from Airflow package for Airflow < 2.8.0 (#38879)
Celery provider has an ambedded Airflow CLI command as of 3.6.1. When
the #36794 was merged, we thought mistakenly that it will only be used
in airflow 2.9.0+, so we used a feature introduced in Airflow 2.8.0 in
the #34945 - but in fact the CLI command is configured by the Celery
Executor which is also part of the Celery provider, so it was also
used for airflow < 2.8.0 and failed due to missing import.
This PR checks if Airflow version is < 2.8.0 and if so, it falls back
to built-in airflow CLI command.
---
airflow/providers/celery/executors/celery_executor.py | 16 +++++++++++++---
1 file changed, 13 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/celery/executors/celery_executor.py
b/airflow/providers/celery/executors/celery_executor.py
index 2a75be91da..0b4293cde7 100644
--- a/airflow/providers/celery/executors/celery_executor.py
+++ b/airflow/providers/celery/executors/celery_executor.py
@@ -30,10 +30,12 @@ import operator
import time
from collections import Counter
from concurrent.futures import ProcessPoolExecutor
+from importlib.metadata import version as importlib_version
from multiprocessing import cpu_count
from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple
from celery import states as celery_states
+from packaging.version import Version
try:
from airflow.cli.cli_config import (
@@ -178,11 +180,19 @@ ARG_WITHOUT_GOSSIP = Arg(
action="store_true",
)
+AIRFLOW_VERSION = Version(importlib_version("apache-airflow"))
+
+CELERY_CLI_COMMAND_PATH = (
+ "airflow.providers.celery.cli.celery_command"
+ if AIRFLOW_VERSION >= Version("2.8.0")
+ else "airflow.cli.commands.celery_command"
+)
+
CELERY_COMMANDS = (
ActionCommand(
name="worker",
help="Start a Celery worker node",
-
func=lazy_load_command("airflow.providers.celery.cli.celery_command.worker"),
+ func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.worker"),
args=(
ARG_QUEUES,
ARG_CONCURRENCY,
@@ -203,7 +213,7 @@ CELERY_COMMANDS = (
ActionCommand(
name="flower",
help="Start a Celery Flower",
-
func=lazy_load_command("airflow.providers.celery.cli.celery_command.flower"),
+ func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.flower"),
args=(
ARG_FLOWER_HOSTNAME,
ARG_FLOWER_PORT,
@@ -222,7 +232,7 @@ CELERY_COMMANDS = (
ActionCommand(
name="stop",
help="Stop the Celery worker gracefully",
-
func=lazy_load_command("airflow.providers.celery.cli.celery_command.stop_worker"),
+ func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.stop_worker"),
args=(ARG_PID, ARG_VERBOSE),
),
)