potiuk commented on a change in pull request #21351:
URL: https://github.com/apache/airflow/pull/21351#discussion_r810471560



##########
File path: tests/cli/test_cli_parser.py
##########
@@ -206,8 +206,9 @@ def 
test_dag_parser_celery_command_require_celery_executor(self):
             stderr = stderr.getvalue()
         assert (
             "airflow command error: argument GROUP_OR_COMMAND: celery 
subcommand "
-            "works only with CeleryExecutor, CeleryKubernetesExecutor and 
executors derived from them, "
-            "your current executor: SequentialExecutor, subclassed from: 
BaseExecutor, see help above."
+            "works only with executors that has 'supports_celery' set to True, 
"

Review comment:
       ```suggestion
               "works only with executors that have 'supports_celery' set to 
True, "
   ```

##########
File path: airflow/cli/cli_parser.py
##########
@@ -59,31 +59,23 @@ class DefaultHelpParser(argparse.ArgumentParser):
     def _check_value(self, action, value):
         """Override _check_value and check conditionally added command"""
         if action.dest == 'subcommand' and value == 'celery':
+            try:
+                from airflow.executors.celery_executor import CeleryExecutor  
# noqa
+            except ImportError:
+                message = (
+                    "The celery subcommand requires that you pip install the 
celery module. "
+                    "To do it, run: pip install 'apache-airflow[celery]'"
+                )
+                raise ArgumentError(action, message)
+
             executor = conf.get('core', 'EXECUTOR')
             if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
                 executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
-                classes = ()
-                try:
-                    from airflow.executors.celery_executor import 
CeleryExecutor
-
-                    classes += (CeleryExecutor,)
-                except ImportError:
-                    message = (
-                        "The celery subcommand requires that you pip install 
the celery module. "
-                        "To do it, run: pip install 'apache-airflow[celery]'"
-                    )
-                    raise ArgumentError(action, message)
-                try:
-                    from airflow.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
-
-                    classes += (CeleryKubernetesExecutor,)
-                except ImportError:
-                    pass
-                if not issubclass(executor_cls, classes):
+                if not getattr(executor_cls, "supports_celery", False):
                     message = (
-                        f'celery subcommand works only with CeleryExecutor, 
CeleryKubernetesExecutor and '
-                        f'executors derived from them, your current executor: 
{executor}, subclassed from: '
-                        f'{", ".join([base_cls.__qualname__ for base_cls in 
executor_cls.__bases__])}'
+                        f"celery subcommand works only with executors that has 
'supports_celery' "

Review comment:
       ```suggestion
                           f"celery subcommand works only with executors that 
have 'supports_celery' "
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to