potiuk commented on a change in pull request #21351:
URL: https://github.com/apache/airflow/pull/21351#discussion_r810471560
##########
File path: tests/cli/test_cli_parser.py
##########
@@ -206,8 +206,9 @@ def
test_dag_parser_celery_command_require_celery_executor(self):
stderr = stderr.getvalue()
assert (
"airflow command error: argument GROUP_OR_COMMAND: celery
subcommand "
- "works only with CeleryExecutor, CeleryKubernetesExecutor and
executors derived from them, "
- "your current executor: SequentialExecutor, subclassed from:
BaseExecutor, see help above."
+ "works only with executors that has 'supports_celery' set to True,
"
Review comment:
```suggestion
"works only with executors that have 'supports_celery' set to
True, "
```
##########
File path: airflow/cli/cli_parser.py
##########
@@ -59,31 +59,23 @@ class DefaultHelpParser(argparse.ArgumentParser):
def _check_value(self, action, value):
"""Override _check_value and check conditionally added command"""
if action.dest == 'subcommand' and value == 'celery':
+ try:
+ from airflow.executors.celery_executor import CeleryExecutor
# noqa
+ except ImportError:
+ message = (
+ "The celery subcommand requires that you pip install the
celery module. "
+ "To do it, run: pip install 'apache-airflow[celery]'"
+ )
+ raise ArgumentError(action, message)
+
executor = conf.get('core', 'EXECUTOR')
if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
- classes = ()
- try:
- from airflow.executors.celery_executor import
CeleryExecutor
-
- classes += (CeleryExecutor,)
- except ImportError:
- message = (
- "The celery subcommand requires that you pip install
the celery module. "
- "To do it, run: pip install 'apache-airflow[celery]'"
- )
- raise ArgumentError(action, message)
- try:
- from airflow.executors.celery_kubernetes_executor import
CeleryKubernetesExecutor
-
- classes += (CeleryKubernetesExecutor,)
- except ImportError:
- pass
- if not issubclass(executor_cls, classes):
+ if not getattr(executor_cls, "supports_celery", False):
message = (
- f'celery subcommand works only with CeleryExecutor,
CeleryKubernetesExecutor and '
- f'executors derived from them, your current executor:
{executor}, subclassed from: '
- f'{", ".join([base_cls.__qualname__ for base_cls in
executor_cls.__bases__])}'
+ f"celery subcommand works only with executors that has
'supports_celery' "
Review comment:
```suggestion
f"celery subcommand works only with executors that
have 'supports_celery' "
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]