This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b4c6bed94 Separate cli_parser.py into two modules (#29962)
1b4c6bed94 is described below

commit 1b4c6bed94580eab656cc476e99a4980813b11ca
Author: Niko Oliveira <[email protected]>
AuthorDate: Thu Mar 16 01:01:44 2023 -0700

    Separate cli_parser.py into two modules (#29962)
    
    * Separate cli_parser.py into two modules
    
    In order to avoid import cycles for future cli changes as well as to improve
    the high coupling of the cli_parser module (which was over 2k lines),
    create a new module which will hold the parser methods and another module
    to contain the config/definitions of the Args, Commands, Actions, etc
    
    * Update airflow/cli/cli_config.py
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    
    * Update airflow/cli/cli_parser.py
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow/cli/{cli_parser.py => cli_config.py} |  113 +-
 airflow/cli/cli_parser.py                    | 2247 +-------------------------
 tests/cli/test_cli_parser.py                 |   14 +-
 3 files changed, 29 insertions(+), 2345 deletions(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_config.py
similarity index 95%
copy from airflow/cli/cli_parser.py
copy to airflow/cli/cli_config.py
index 66beadfde1..f687a478d6 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_config.py
@@ -16,28 +16,25 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Command-line interface."""
+"""Explicit configuration and definition of Airflow CLI commands."""
+
 from __future__ import annotations
 
 import argparse
 import json
 import os
 import textwrap
-from argparse import Action, ArgumentError
-from functools import lru_cache
+from argparse import ArgumentError
 from typing import Callable, Iterable, NamedTuple, Union
 
 import lazy_object_proxy
-from rich_argparse import RawTextRichHelpFormatter, RichHelpFormatter
 
 from airflow import settings
 from airflow.cli.commands.legacy_commands import check_legacy_command
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException
 from airflow.executors.executor_constants import CELERY_EXECUTOR, 
CELERY_KUBERNETES_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.cli import ColorMode
-from airflow.utils.helpers import partition
 from airflow.utils.module_loading import import_string
 from airflow.utils.timezone import parse as parsedate
 
@@ -836,7 +833,7 @@ ARG_ENV_VARS = Arg(
 )
 
 # connections
-ARG_CONN_ID = Arg(("conn_id",), help="Connection ID, required to 
get/add/delete/test a connection", type=str)
+ARG_CONN_ID = Arg(("conn_id",), help="Connection id, required to 
get/add/delete/test a connection", type=str)
 ARG_CONN_ID_FILTER = Arg(
     ("--conn-id",), help="If passed, only items with the specified connection 
ID will be displayed", type=str
 )
@@ -1999,7 +1996,7 @@ JOBS_COMMANDS = (
     ),
 )
 
-airflow_commands: list[CLICommand] = [
+core_commands: list[CLICommand] = [
     GroupCommand(
         name="dags",
         help="Manage DAGs",
@@ -2233,7 +2230,6 @@ airflow_commands: list[CLICommand] = [
         args=tuple(),
     ),
 ]
-ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in 
airflow_commands}
 
 
 def _remove_dag_id_opt(command: ActionCommand):
@@ -2259,102 +2255,3 @@ dag_cli_commands: list[CLICommand] = [
     ),
 ]
 DAG_CLI_DICT: dict[str, CLICommand] = {sp.name: sp for sp in dag_cli_commands}
-
-
-class AirflowHelpFormatter(RichHelpFormatter):
-    """
-    Custom help formatter to display help message.
-
-    It displays simple commands and groups of commands in separate sections.
-    """
-
-    def _iter_indented_subactions(self, action: Action):
-        if isinstance(action, argparse._SubParsersAction):
-
-            self._indent()
-            subactions = action._get_subactions()
-            action_subcommands, group_subcommands = partition(
-                lambda d: isinstance(ALL_COMMANDS_DICT[d.dest], GroupCommand), 
subactions
-            )
-            yield Action([], "\n%*s%s:" % (self._current_indent, "", 
"Groups"), nargs=0)
-            self._indent()
-            yield from group_subcommands
-            self._dedent()
-
-            yield Action([], "\n%*s%s:" % (self._current_indent, "", 
"Commands"), nargs=0)
-            self._indent()
-            yield from action_subcommands
-            self._dedent()
-            self._dedent()
-        else:
-            yield from super()._iter_indented_subactions(action)
-
-
-class LazyRichHelpFormatter(RawTextRichHelpFormatter):
-    """
-    Custom help formatter to display help message.
-
-    It resolves lazy help string before printing it using rich.
-    """
-
-    def add_argument(self, action: Action) -> None:
-        if isinstance(action.help, lazy_object_proxy.Proxy):
-            action.help = str(action.help)
-        return super().add_argument(action)
-
-
-@lru_cache(maxsize=None)
-def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser:
-    """Creates and returns command line argument parser."""
-    parser = DefaultHelpParser(prog="airflow", 
formatter_class=AirflowHelpFormatter)
-    subparsers = parser.add_subparsers(dest="subcommand", 
metavar="GROUP_OR_COMMAND")
-    subparsers.required = True
-
-    command_dict = DAG_CLI_DICT if dag_parser else ALL_COMMANDS_DICT
-    subparser_list = command_dict.keys()
-    sub_name: str
-    for sub_name in sorted(subparser_list):
-        sub: CLICommand = command_dict[sub_name]
-        _add_command(subparsers, sub)
-    return parser
-
-
-def _sort_args(args: Iterable[Arg]) -> Iterable[Arg]:
-    """Sort subcommand optional args, keep positional args."""
-
-    def get_long_option(arg: Arg):
-        """Get long option from Arg.flags."""
-        return arg.flags[0] if len(arg.flags) == 1 else arg.flags[1]
-
-    positional, optional = partition(lambda x: x.flags[0].startswith("-"), 
args)
-    yield from positional
-    yield from sorted(optional, key=lambda x: get_long_option(x).lower())
-
-
-def _add_command(subparsers: argparse._SubParsersAction, sub: CLICommand) -> 
None:
-    sub_proc = subparsers.add_parser(
-        sub.name, help=sub.help, description=sub.description or sub.help, 
epilog=sub.epilog
-    )
-    sub_proc.formatter_class = LazyRichHelpFormatter
-
-    if isinstance(sub, GroupCommand):
-        _add_group_command(sub, sub_proc)
-    elif isinstance(sub, ActionCommand):
-        _add_action_command(sub, sub_proc)
-    else:
-        raise AirflowException("Invalid command definition.")
-
-
-def _add_action_command(sub: ActionCommand, sub_proc: argparse.ArgumentParser) 
-> None:
-    for arg in _sort_args(sub.args):
-        arg.add_to_parser(sub_proc)
-    sub_proc.set_defaults(func=sub.func)
-
-
-def _add_group_command(sub: GroupCommand, sub_proc: argparse.ArgumentParser) 
-> None:
-    subcommands = sub.subcommands
-    sub_subparsers = sub_proc.add_subparsers(dest="subcommand", 
metavar="COMMAND")
-    sub_subparsers.required = True
-
-    for command in sorted(subcommands, key=lambda x: x.name):
-        _add_command(sub_subparsers, command)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 66beadfde1..9ccccd0f30 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -16,2251 +16,38 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Command-line interface."""
+"""Produce a CLI parser object from Airflow CLI command configuration.
+
+.. seealso:: :mod:`airflow.cli.cli_config`
+"""
+
 from __future__ import annotations
 
 import argparse
-import json
-import os
-import textwrap
-from argparse import Action, ArgumentError
+from argparse import Action
 from functools import lru_cache
-from typing import Callable, Iterable, NamedTuple, Union
+from typing import Iterable
 
 import lazy_object_proxy
 from rich_argparse import RawTextRichHelpFormatter, RichHelpFormatter
 
-from airflow import settings
-from airflow.cli.commands.legacy_commands import check_legacy_command
-from airflow.configuration import conf
+from airflow.cli.cli_config import (
+    DAG_CLI_DICT,
+    ActionCommand,
+    Arg,
+    CLICommand,
+    DefaultHelpParser,
+    GroupCommand,
+    core_commands,
+)
 from airflow.exceptions import AirflowException
-from airflow.executors.executor_constants import CELERY_EXECUTOR, 
CELERY_KUBERNETES_EXECUTOR
-from airflow.executors.executor_loader import ExecutorLoader
-from airflow.utils.cli import ColorMode
 from airflow.utils.helpers import partition
-from airflow.utils.module_loading import import_string
-from airflow.utils.timezone import parse as parsedate
-
-BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
-
-
-def lazy_load_command(import_path: str) -> Callable:
-    """Create a lazy loader for command."""
-    _, _, name = import_path.rpartition(".")
-
-    def command(*args, **kwargs):
-        func = import_string(import_path)
-        return func(*args, **kwargs)
-
-    command.__name__ = name
-
-    return command
-
-
-class DefaultHelpParser(argparse.ArgumentParser):
-    """CustomParser to display help message."""
-
-    def _check_value(self, action, value):
-        """Override _check_value and check conditionally added command."""
-        if action.dest == "subcommand" and value == "celery":
-            executor = conf.get("core", "EXECUTOR")
-            if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
-                classes = ()
-                try:
-                    from airflow.executors.celery_executor import 
CeleryExecutor
-
-                    classes += (CeleryExecutor,)
-                except ImportError:
-                    message = (
-                        "The celery subcommand requires that you pip install 
the celery module. "
-                        "To do it, run: pip install 'apache-airflow[celery]'"
-                    )
-                    raise ArgumentError(action, message)
-                try:
-                    from airflow.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
-
-                    classes += (CeleryKubernetesExecutor,)
-                except ImportError:
-                    pass
-                if not issubclass(executor_cls, classes):
-                    message = (
-                        f"celery subcommand works only with CeleryExecutor, 
CeleryKubernetesExecutor and "
-                        f"executors derived from them, your current executor: 
{executor}, subclassed from: "
-                        f'{", ".join([base_cls.__qualname__ for base_cls in 
executor_cls.__bases__])}'
-                    )
-                    raise ArgumentError(action, message)
-        if action.dest == "subcommand" and value == "kubernetes":
-            try:
-                import kubernetes.client  # noqa: F401
-            except ImportError:
-                message = (
-                    "The kubernetes subcommand requires that you pip install 
the kubernetes python client. "
-                    "To do it, run: pip install 
'apache-airflow[cncf.kubernetes]'"
-                )
-                raise ArgumentError(action, message)
-
-        if action.choices is not None and value not in action.choices:
-            check_legacy_command(action, value)
-
-        super()._check_value(action, value)
-
-    def error(self, message):
-        """Override error and use print_instead of print_usage."""
-        self.print_help()
-        self.exit(2, f"\n{self.prog} command error: {message}, see help 
above.\n")
-
-
-# Used in Arg to enable `None' as a distinct value from "not passed"
-_UNSET = object()
-
-
-class Arg:
-    """Class to keep information about command line argument."""
-
-    def __init__(
-        self,
-        flags=_UNSET,
-        help=_UNSET,
-        action=_UNSET,
-        default=_UNSET,
-        nargs=_UNSET,
-        type=_UNSET,
-        choices=_UNSET,
-        required=_UNSET,
-        metavar=_UNSET,
-        dest=_UNSET,
-    ):
-        self.flags = flags
-        self.kwargs = {}
-        for k, v in locals().items():
-            if v is _UNSET:
-                continue
-            if k in ("self", "flags"):
-                continue
-
-            self.kwargs[k] = v
-
-    def add_to_parser(self, parser: argparse.ArgumentParser):
-        """Add this argument to an ArgumentParser."""
-        if "metavar" in self.kwargs and "type" not in self.kwargs:
-            if self.kwargs["metavar"] == "DIRPATH":
-                type = lambda x: self._is_valid_directory(parser, x)
-                self.kwargs["type"] = type
-        parser.add_argument(*self.flags, **self.kwargs)
-
-    def _is_valid_directory(self, parser, arg):
-        if not os.path.isdir(arg):
-            parser.error(f"The directory '{arg}' does not exist!")
-        return arg
-
-
-def positive_int(*, allow_zero):
-    """Define a positive int type for an argument."""
-
-    def _check(value):
-        try:
-            value = int(value)
-            if allow_zero and value == 0:
-                return value
-            if value > 0:
-                return value
-        except ValueError:
-            pass
-        raise argparse.ArgumentTypeError(f"invalid positive int value: 
'{value}'")
-
-    return _check
-
-
-def string_list_type(val):
-    """Parses comma-separated list and returns list of string (strips 
whitespace)."""
-    return [x.strip() for x in val.split(",")]
-
-
-def string_lower_type(val):
-    """Lowers arg."""
-    if not val:
-        return
-    return val.strip().lower()
-
-
-# Shared
-ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag")
-ARG_TASK_ID = Arg(("task_id",), help="The id of the task")
-ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the 
DAG", type=parsedate)
-ARG_EXECUTION_DATE_OPTIONAL = Arg(
-    ("execution_date",), nargs="?", help="The execution date of the DAG 
(optional)", type=parsedate
-)
-ARG_EXECUTION_DATE_OR_RUN_ID = Arg(
-    ("execution_date_or_run_id",), help="The execution_date of the DAG or 
run_id of the DAGRun"
-)
-ARG_EXECUTION_DATE_OR_RUN_ID_OPTIONAL = Arg(
-    ("execution_date_or_run_id",),
-    nargs="?",
-    help="The execution_date of the DAG or run_id of the DAGRun (optional)",
-)
-ARG_TASK_REGEX = Arg(
-    ("-t", "--task-regex"), help="The regex to filter specific task_ids to 
backfill (optional)"
-)
-ARG_SUBDIR = Arg(
-    ("-S", "--subdir"),
-    help=(
-        "File location or directory from which to look for the dag. "
-        "Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the "
-        "value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg' "
-    ),
-    default="[AIRFLOW_HOME]/dags" if BUILD_DOCS else settings.DAGS_FOLDER,
-)
-ARG_START_DATE = Arg(("-s", "--start-date"), help="Override start_date 
YYYY-MM-DD", type=parsedate)
-ARG_END_DATE = Arg(("-e", "--end-date"), help="Override end_date YYYY-MM-DD", 
type=parsedate)
-ARG_OUTPUT_PATH = Arg(
-    (
-        "-o",
-        "--output-path",
-    ),
-    help="The output for generated yaml files",
-    type=str,
-    default="[CWD]" if BUILD_DOCS else os.getcwd(),
-)
-ARG_DRY_RUN = Arg(
-    ("-n", "--dry-run"),
-    help="Perform a dry run for each task. Only renders Template Fields for 
each task, nothing else",
-    action="store_true",
-)
-ARG_PID = Arg(("--pid",), help="PID file location", nargs="?")
-ARG_DAEMON = Arg(
-    ("-D", "--daemon"), help="Daemonize instead of running in the foreground", 
action="store_true"
-)
-ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file")
-ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file")
-ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file")
-ARG_YES = Arg(
-    ("-y", "--yes"),
-    help="Do not prompt to confirm. Use with care!",
-    action="store_true",
-    default=False,
-)
-ARG_OUTPUT = Arg(
-    (
-        "-o",
-        "--output",
-    ),
-    help="Output format. Allowed values: json, yaml, plain, table (default: 
table)",
-    metavar="(table, json, yaml, plain)",
-    choices=("table", "json", "yaml", "plain"),
-    default="table",
-)
-ARG_COLOR = Arg(
-    ("--color",),
-    help="Do emit colored output (default: auto)",
-    choices={ColorMode.ON, ColorMode.OFF, ColorMode.AUTO},
-    default=ColorMode.AUTO,
-)
-
-# DB args
-ARG_VERSION_RANGE = Arg(
-    ("-r", "--range"),
-    help="Version range(start:end) for offline sql generation. Example: 
'2.0.2:2.2.3'",
-    default=None,
-)
-ARG_REVISION_RANGE = Arg(
-    ("--revision-range",),
-    help=(
-        "Migration revision range(start:end) to use for offline sql 
generation. "
-        "Example: ``a13f7613ad25:7b2661a43ba3``"
-    ),
-    default=None,
-)
-
-# list_dag_runs
-ARG_DAG_ID_REQ_FLAG = Arg(
-    ("-d", "--dag-id"), required=True, help="The id of the dag"
-)  # TODO: convert this to a positional arg in Airflow 3
-ARG_NO_BACKFILL = Arg(
-    ("--no-backfill",), help="filter all the backfill dagruns given the dag 
id", action="store_true"
-)
-ARG_STATE = Arg(("--state",), help="Only list the dag runs corresponding to 
the state")
-
-# list_jobs
-ARG_DAG_ID_OPT = Arg(("-d", "--dag-id"), help="The id of the dag")
-ARG_LIMIT = Arg(("--limit",), help="Return a limited number of records")
-
-# next_execution
-ARG_NUM_EXECUTIONS = Arg(
-    ("-n", "--num-executions"),
-    default=1,
-    type=positive_int(allow_zero=False),
-    help="The number of next execution datetimes to show",
-)
-
-# backfill
-ARG_MARK_SUCCESS = Arg(
-    ("-m", "--mark-success"), help="Mark jobs as succeeded without running 
them", action="store_true"
-)
-ARG_VERBOSE = Arg(("-v", "--verbose"), help="Make logging output more 
verbose", action="store_true")
-ARG_LOCAL = Arg(("-l", "--local"), help="Run the task using the 
LocalExecutor", action="store_true")
-ARG_DONOT_PICKLE = Arg(
-    ("-x", "--donot-pickle"),
-    help=(
-        "Do not attempt to pickle the DAG object to send over "
-        "to the workers, just tell the workers to run their version "
-        "of the code"
-    ),
-    action="store_true",
-)
-ARG_BF_IGNORE_DEPENDENCIES = Arg(
-    ("-i", "--ignore-dependencies"),
-    help=(
-        "Skip upstream tasks, run only the tasks "
-        "matching the regexp. Only works in conjunction "
-        "with task_regex"
-    ),
-    action="store_true",
-)
-ARG_BF_IGNORE_FIRST_DEPENDS_ON_PAST = Arg(
-    ("-I", "--ignore-first-depends-on-past"),
-    help=(
-        "Ignores depends_on_past dependencies for the first "
-        "set of tasks only (subsequent executions in the backfill "
-        "DO respect depends_on_past)"
-    ),
-    action="store_true",
-)
-ARG_POOL = Arg(("--pool",), "Resource pool to use")
-ARG_DELAY_ON_LIMIT = Arg(
-    ("--delay-on-limit",),
-    help=(
-        "Amount of time in seconds to wait when the limit "
-        "on maximum active dag runs (max_active_runs) has "
-        "been reached before trying to execute a dag run "
-        "again"
-    ),
-    type=float,
-    default=1.0,
-)
-ARG_RESET_DAG_RUN = Arg(
-    ("--reset-dagruns",),
-    help=(
-        "if set, the backfill will delete existing "
-        "backfill-related DAG runs and start "
-        "anew with fresh, running DAG runs"
-    ),
-    action="store_true",
-)
-ARG_RERUN_FAILED_TASKS = Arg(
-    ("--rerun-failed-tasks",),
-    help=(
-        "if set, the backfill will auto-rerun "
-        "all the failed tasks for the backfill date range "
-        "instead of throwing exceptions"
-    ),
-    action="store_true",
-)
-ARG_CONTINUE_ON_FAILURES = Arg(
-    ("--continue-on-failures",),
-    help=("if set, the backfill will keep going even if some of the tasks 
failed"),
-    action="store_true",
-)
-ARG_DISABLE_RETRY = Arg(
-    ("--disable-retry",),
-    help=("if set, the backfill will set tasks as failed without retrying."),
-    action="store_true",
-)
-ARG_RUN_BACKWARDS = Arg(
-    (
-        "-B",
-        "--run-backwards",
-    ),
-    help=(
-        "if set, the backfill will run tasks from the most "
-        "recent day first.  if there are tasks that depend_on_past "
-        "this option will throw an exception"
-    ),
-    action="store_true",
-)
-ARG_TREAT_DAG_AS_REGEX = Arg(
-    ("--treat-dag-as-regex",),
-    help=("if set, dag_id will be treated as regex instead of an exact 
string"),
-    action="store_true",
-)
-# test_dag
-ARG_SHOW_DAGRUN = Arg(
-    ("--show-dagrun",),
-    help=(
-        "After completing the backfill, shows the diagram for current DAG 
Run.\n"
-        "\n"
-        "The diagram is in DOT language\n"
-    ),
-    action="store_true",
-)
-ARG_IMGCAT_DAGRUN = Arg(
-    ("--imgcat-dagrun",),
-    help=(
-        "After completing the dag run, prints a diagram on the screen for the "
-        "current DAG Run using the imgcat tool.\n"
-    ),
-    action="store_true",
-)
-ARG_SAVE_DAGRUN = Arg(
-    ("--save-dagrun",),
-    help="After completing the backfill, saves the diagram for current DAG Run 
to the indicated file.\n\n",
-)
-
-# list_tasks
-ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true")
-
-# tasks_run
-# This is a hidden option -- not meant for users to set or know about
-ARG_SHUT_DOWN_LOGGING = Arg(
-    ("--no-shut-down-logging",),
-    help=argparse.SUPPRESS,
-    dest="shut_down_logging",
-    action="store_false",
-    default=True,
-)
-
-# clear
-ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", 
action="store_true")
-ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", 
action="store_true")
-ARG_ONLY_RUNNING = Arg(("-r", "--only-running"), help="Only running jobs", 
action="store_true")
-ARG_DOWNSTREAM = Arg(("-d", "--downstream"), help="Include downstream tasks", 
action="store_true")
-ARG_EXCLUDE_SUBDAGS = Arg(("-x", "--exclude-subdags"), help="Exclude subdags", 
action="store_true")
-ARG_EXCLUDE_PARENTDAG = Arg(
-    ("-X", "--exclude-parentdag"),
-    help="Exclude ParentDAGS if the task cleared is a part of a SubDAG",
-    action="store_true",
-)
-ARG_DAG_REGEX = Arg(
-    ("-R", "--dag-regex"), help="Search dag_id as regex instead of exact 
string", action="store_true"
-)
-
-# show_dag
-ARG_SAVE = Arg(("-s", "--save"), help="Saves the result to the indicated 
file.")
-
-ARG_IMGCAT = Arg(("--imgcat",), help="Displays graph using the imgcat tool.", 
action="store_true")
-
-# trigger_dag
-ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
-ARG_CONF = Arg(("-c", "--conf"), help="JSON string that gets pickled into the 
DagRun's conf attribute")
-ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the 
DAG", type=parsedate)
-ARG_REPLACE_MICRO = Arg(
-    ("--no-replace-microseconds",),
-    help="whether microseconds should be zeroed",
-    dest="replace_microseconds",
-    action="store_false",
-    default=True,
-)
-
-# db
-ARG_DB_TABLES = Arg(
-    ("-t", "--tables"),
-    help=lazy_object_proxy.Proxy(
-        lambda: f"Table names to perform maintenance on (use comma-separated 
list).\n"
-        f"Options: 
{import_string('airflow.cli.commands.db_command.all_tables')}"
-    ),
-    type=string_list_type,
-)
-ARG_DB_CLEANUP_TIMESTAMP = Arg(
-    ("--clean-before-timestamp",),
-    help="The date or timestamp before which data should be purged.\n"
-    "If no timezone info is supplied then dates are assumed to be in airflow 
default timezone.\n"
-    "Example: '2022-01-01 00:00:00+01:00'",
-    type=parsedate,
-    required=True,
-)
-ARG_DB_DRY_RUN = Arg(
-    ("--dry-run",),
-    help="Perform a dry run",
-    action="store_true",
-)
-ARG_DB_SKIP_ARCHIVE = Arg(
-    ("--skip-archive",),
-    help="Don't preserve purged records in an archive table.",
-    action="store_true",
-)
-ARG_DB_EXPORT_FORMAT = Arg(
-    ("--export-format",),
-    help="The file format to export the cleaned data",
-    choices=("csv",),
-    default="csv",
-)
-ARG_DB_OUTPUT_PATH = Arg(
-    ("--output-path",),
-    metavar="DIRPATH",
-    help="The path to the output directory to export the cleaned data. This 
directory must exist.",
-    required=True,
-)
-ARG_DB_DROP_ARCHIVES = Arg(
-    ("--drop-archives",),
-    help="Drop the archive tables after exporting. Use with caution.",
-    action="store_true",
-)
-
-# pool
-ARG_POOL_NAME = Arg(("pool",), metavar="NAME", help="Pool name")
-ARG_POOL_SLOTS = Arg(("slots",), type=int, help="Pool slots")
-ARG_POOL_DESCRIPTION = Arg(("description",), help="Pool description")
-ARG_POOL_IMPORT = Arg(
-    ("file",),
-    metavar="FILEPATH",
-    help="Import pools from JSON file. Example format::\n"
-    + textwrap.indent(
-        textwrap.dedent(
-            """
-            {
-                "pool_1": {"slots": 5, "description": ""},
-                "pool_2": {"slots": 10, "description": "test"}
-            }"""
-        ),
-        " " * 4,
-    ),
-)
-
-ARG_POOL_EXPORT = Arg(("file",), metavar="FILEPATH", help="Export all pools to 
JSON file")
-
-# variables
-ARG_VAR = Arg(("key",), help="Variable key")
-ARG_VAR_VALUE = Arg(("value",), metavar="VALUE", help="Variable value")
-ARG_DEFAULT = Arg(
-    ("-d", "--default"), metavar="VAL", default=None, help="Default value 
returned if variable does not exist"
-)
-ARG_DESERIALIZE_JSON = Arg(("-j", "--json"), help="Deserialize JSON variable", 
action="store_true")
-ARG_SERIALIZE_JSON = Arg(("-j", "--json"), help="Serialize JSON variable", 
action="store_true")
-ARG_VAR_IMPORT = Arg(("file",), help="Import variables from JSON file")
-ARG_VAR_EXPORT = Arg(("file",), help="Export all variables to JSON file")
-
-# kerberos
-ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?")
-ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs="?", 
default=conf.get("kerberos", "keytab"))
-# run
-ARG_INTERACTIVE = Arg(
-    ("-N", "--interactive"),
-    help="Do not capture standard output and error streams (useful for 
interactive debugging)",
-    action="store_true",
-)
-# TODO(aoen): "force" is a poor choice of name here since it implies it 
overrides
-# all dependencies (not just past success), e.g. the ignore_depends_on_past
-# dependency. This flag should be deprecated and renamed to 'ignore_ti_state' 
and
-# the "ignore_all_dependencies" command should be called the"force" command
-# instead.
-ARG_FORCE = Arg(
-    ("-f", "--force"),
-    help="Ignore previous task instance state, rerun regardless if task 
already succeeded/failed",
-    action="store_true",
-)
-ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
-ARG_IGNORE_ALL_DEPENDENCIES = Arg(
-    ("-A", "--ignore-all-dependencies"),
-    help="Ignores all non-critical dependencies, including ignore_ti_state and 
ignore_task_deps",
-    action="store_true",
-)
-# TODO(aoen): ignore_dependencies is a poor choice of name here because it is 
too
-# vague (e.g. a task being in the appropriate state to be run is also a 
dependency
-# but is not ignored by this flag), the name 'ignore_task_dependencies' is
-# slightly better (as it ignores all dependencies that are specific to the 
task),
-# so deprecate the old command name and use this instead.
-ARG_IGNORE_DEPENDENCIES = Arg(
-    ("-i", "--ignore-dependencies"),
-    help="Ignore task-specific dependencies, e.g. upstream, depends_on_past, 
and retry delay dependencies",
-    action="store_true",
-)
-ARG_IGNORE_DEPENDS_ON_PAST = Arg(
-    ("-I", "--ignore-depends-on-past"),
-    help="Deprecated -- use `--depends-on-past ignore` instead. "
-    "Ignore depends_on_past dependencies (but respect upstream dependencies)",
-    action="store_true",
-)
-ARG_DEPENDS_ON_PAST = Arg(
-    ("-d", "--depends-on-past"),
-    help="Determine how Airflow should deal with past dependencies. The 
default action is `check`, Airflow "
-    "will check if the the past dependencies are met for the tasks having 
`depends_on_past=True` before run "
-    "them, if `ignore` is provided, the past dependencies will be ignored, if 
`wait` is provided and "
-    "`depends_on_past=True`, Airflow will wait the past dependencies until 
they are met before running or "
-    "skipping the task",
-    choices={"check", "ignore", "wait"},
-    default="check",
-)
-ARG_SHIP_DAG = Arg(
-    ("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the 
worker", action="store_true"
-)
-ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the 
entire dag (used internally)")
-ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
-ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead 
of airflow.cfg")
-ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task 
index")
-
-
-# database
-ARG_MIGRATION_TIMEOUT = Arg(
-    ("-t", "--migration-wait-timeout"),
-    help="timeout to wait for db to migrate ",
-    type=int,
-    default=60,
-)
-ARG_DB_RESERIALIZE_DAGS = Arg(
-    ("--no-reserialize-dags",),
-    # Not intended for user, so dont show in help
-    help=argparse.SUPPRESS,
-    action="store_false",
-    default=True,
-    dest="reserialize_dags",
-)
-ARG_DB_VERSION__UPGRADE = Arg(
-    ("-n", "--to-version"),
-    help=(
-        "(Optional) The airflow version to upgrade to. Note: must provide 
either "
-        "`--to-revision` or `--to-version`."
-    ),
-)
-ARG_DB_REVISION__UPGRADE = Arg(
-    ("-r", "--to-revision"),
-    help="(Optional) If provided, only run migrations up to and including this 
Alembic revision.",
-)
-ARG_DB_VERSION__DOWNGRADE = Arg(
-    ("-n", "--to-version"),
-    help="(Optional) If provided, only run migrations up to this version.",
-)
-ARG_DB_FROM_VERSION = Arg(
-    ("--from-version",),
-    help="(Optional) If generating sql, may supply a *from* version",
-)
-ARG_DB_REVISION__DOWNGRADE = Arg(
-    ("-r", "--to-revision"),
-    help="The Alembic revision to downgrade to. Note: must provide either 
`--to-revision` or `--to-version`.",
-)
-ARG_DB_FROM_REVISION = Arg(
-    ("--from-revision",),
-    help="(Optional) If generating sql, may supply a *from* Alembic revision",
-)
-ARG_DB_SQL_ONLY = Arg(
-    ("-s", "--show-sql-only"),
-    help="Don't actually run migrations; just print out sql scripts for 
offline migration. "
-    "Required if using either `--from-revision` or `--from-version`.",
-    action="store_true",
-    default=False,
-)
-ARG_DB_SKIP_INIT = Arg(
-    ("-s", "--skip-init"),
-    help="Only remove tables; do not perform db init.",
-    action="store_true",
-    default=False,
-)
-
-# webserver
-ARG_PORT = Arg(
-    ("-p", "--port"),
-    default=conf.get("webserver", "WEB_SERVER_PORT"),
-    type=int,
-    help="The port on which to run the server",
-)
-ARG_SSL_CERT = Arg(
-    ("--ssl-cert",),
-    default=conf.get("webserver", "WEB_SERVER_SSL_CERT"),
-    help="Path to the SSL certificate for the webserver",
-)
-ARG_SSL_KEY = Arg(
-    ("--ssl-key",),
-    default=conf.get("webserver", "WEB_SERVER_SSL_KEY"),
-    help="Path to the key to use with the SSL certificate",
-)
-ARG_WORKERS = Arg(
-    ("-w", "--workers"),
-    default=conf.get("webserver", "WORKERS"),
-    type=int,
-    help="Number of workers to run the webserver on",
-)
-ARG_WORKERCLASS = Arg(
-    ("-k", "--workerclass"),
-    default=conf.get("webserver", "WORKER_CLASS"),
-    choices=["sync", "eventlet", "gevent", "tornado"],
-    help="The worker class to use for Gunicorn",
-)
-ARG_WORKER_TIMEOUT = Arg(
-    ("-t", "--worker-timeout"),
-    default=conf.get("webserver", "WEB_SERVER_WORKER_TIMEOUT"),
-    type=int,
-    help="The timeout for waiting on webserver workers",
-)
-ARG_HOSTNAME = Arg(
-    ("-H", "--hostname"),
-    default=conf.get("webserver", "WEB_SERVER_HOST"),
-    help="Set the hostname on which to run the web server",
-)
-ARG_DEBUG = Arg(
-    ("-d", "--debug"), help="Use the server that ships with Flask in debug 
mode", action="store_true"
-)
-ARG_ACCESS_LOGFILE = Arg(
-    ("-A", "--access-logfile"),
-    default=conf.get("webserver", "ACCESS_LOGFILE"),
-    help="The logfile to store the webserver access log. Use '-' to print to 
stdout",
-)
-ARG_ERROR_LOGFILE = Arg(
-    ("-E", "--error-logfile"),
-    default=conf.get("webserver", "ERROR_LOGFILE"),
-    help="The logfile to store the webserver error log. Use '-' to print to 
stderr",
-)
-ARG_ACCESS_LOGFORMAT = Arg(
-    ("-L", "--access-logformat"),
-    default=conf.get("webserver", "ACCESS_LOGFORMAT"),
-    help="The access log format for gunicorn logs",
-)
-
-
-# internal-api
-ARG_INTERNAL_API_PORT = Arg(
-    ("-p", "--port"),
-    default=9080,
-    type=int,
-    help="The port on which to run the server",
-)
-ARG_INTERNAL_API_WORKERS = Arg(
-    ("-w", "--workers"),
-    default=4,
-    type=int,
-    help="Number of workers to run the Internal API-on",
-)
-ARG_INTERNAL_API_WORKERCLASS = Arg(
-    ("-k", "--workerclass"),
-    default="sync",
-    choices=["sync", "eventlet", "gevent", "tornado"],
-    help="The worker class to use for Gunicorn",
-)
-ARG_INTERNAL_API_WORKER_TIMEOUT = Arg(
-    ("-t", "--worker-timeout"),
-    default=120,
-    type=int,
-    help="The timeout for waiting on Internal API workers",
-)
-ARG_INTERNAL_API_HOSTNAME = Arg(
-    ("-H", "--hostname"),
-    default="0.0.0.0",
-    help="Set the hostname on which to run the web server",
-)
-ARG_INTERNAL_API_ACCESS_LOGFILE = Arg(
-    ("-A", "--access-logfile"),
-    help="The logfile to store the access log. Use '-' to print to stdout",
-)
-ARG_INTERNAL_API_ERROR_LOGFILE = Arg(
-    ("-E", "--error-logfile"),
-    help="The logfile to store the error log. Use '-' to print to stderr",
-)
-ARG_INTERNAL_API_ACCESS_LOGFORMAT = Arg(
-    ("-L", "--access-logformat"),
-    help="The access log format for gunicorn logs",
-)
 
-# scheduler
-ARG_NUM_RUNS = Arg(
-    ("-n", "--num-runs"),
-    default=conf.getint("scheduler", "num_runs"),
-    type=int,
-    help="Set the number of runs to execute before exiting",
-)
-ARG_DO_PICKLE = Arg(
-    ("-p", "--do-pickle"),
-    default=False,
-    help=(
-        "Attempt to pickle the DAG object to send over "
-        "to the workers, instead of letting workers run their version "
-        "of the code"
-    ),
-    action="store_true",
-)
+airflow_commands = core_commands
 
-# worker
-ARG_QUEUES = Arg(
-    ("-q", "--queues"),
-    help="Comma delimited list of queues to serve",
-    default=conf.get("operators", "DEFAULT_QUEUE"),
-)
-ARG_CONCURRENCY = Arg(
-    ("-c", "--concurrency"),
-    type=int,
-    help="The number of worker processes",
-    default=conf.get("celery", "worker_concurrency"),
-)
-ARG_CELERY_HOSTNAME = Arg(
-    ("-H", "--celery-hostname"),
-    help="Set the hostname of celery worker if you have multiple workers on a 
single machine",
-)
-ARG_UMASK = Arg(
-    ("-u", "--umask"),
-    help="Set the umask of celery worker in daemon mode",
-)
-ARG_WITHOUT_MINGLE = Arg(
-    ("--without-mingle",),
-    default=False,
-    help="Don't synchronize with other workers at start-up",
-    action="store_true",
-)
-ARG_WITHOUT_GOSSIP = Arg(
-    ("--without-gossip",),
-    default=False,
-    help="Don't subscribe to other workers events",
-    action="store_true",
-)
-
-# flower
-ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
-ARG_FLOWER_HOSTNAME = Arg(
-    ("-H", "--hostname"),
-    default=conf.get("celery", "FLOWER_HOST"),
-    help="Set the hostname on which to run the server",
-)
-ARG_FLOWER_PORT = Arg(
-    ("-p", "--port"),
-    default=conf.get("celery", "FLOWER_PORT"),
-    type=int,
-    help="The port on which to run the server",
-)
-ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for 
flower")
-ARG_FLOWER_URL_PREFIX = Arg(
-    ("-u", "--url-prefix"), default=conf.get("celery", "FLOWER_URL_PREFIX"), 
help="URL prefix for Flower"
-)
-ARG_FLOWER_BASIC_AUTH = Arg(
-    ("-A", "--basic-auth"),
-    default=conf.get("celery", "FLOWER_BASIC_AUTH"),
-    help=(
-        "Securing Flower with Basic Authentication. "
-        "Accepts user:password pairs separated by a comma. "
-        "Example: flower_basic_auth = user1:password1,user2:password2"
-    ),
-)
-ARG_TASK_PARAMS = Arg(("-t", "--task-params"), help="Sends a JSON params dict 
to the task")
-ARG_POST_MORTEM = Arg(
-    ("-m", "--post-mortem"), action="store_true", help="Open debugger on 
uncaught exception"
-)
-ARG_ENV_VARS = Arg(
-    ("--env-vars",),
-    help="Set env var in both parsing time and runtime for each of entry 
supplied in a JSON dict",
-    type=json.loads,
-)
-
-# connections
-ARG_CONN_ID = Arg(("conn_id",), help="Connection ID, required to 
get/add/delete/test a connection", type=str)
-ARG_CONN_ID_FILTER = Arg(
-    ("--conn-id",), help="If passed, only items with the specified connection 
ID will be displayed", type=str
-)
-ARG_CONN_URI = Arg(
-    ("--conn-uri",), help="Connection URI, required to add a connection 
without conn_type", type=str
-)
-ARG_CONN_JSON = Arg(
-    ("--conn-json",), help="Connection JSON, required to add a connection 
using JSON representation", type=str
-)
-ARG_CONN_TYPE = Arg(
-    ("--conn-type",), help="Connection type, required to add a connection 
without conn_uri", type=str
-)
-ARG_CONN_DESCRIPTION = Arg(
-    ("--conn-description",), help="Connection description, optional when 
adding a connection", type=str
-)
-ARG_CONN_HOST = Arg(("--conn-host",), help="Connection host, optional when 
adding a connection", type=str)
-ARG_CONN_LOGIN = Arg(("--conn-login",), help="Connection login, optional when 
adding a connection", type=str)
-ARG_CONN_PASSWORD = Arg(
-    ("--conn-password",), help="Connection password, optional when adding a 
connection", type=str
-)
-ARG_CONN_SCHEMA = Arg(
-    ("--conn-schema",), help="Connection schema, optional when adding a 
connection", type=str
-)
-ARG_CONN_PORT = Arg(("--conn-port",), help="Connection port, optional when 
adding a connection", type=str)
-ARG_CONN_EXTRA = Arg(
-    ("--conn-extra",), help="Connection `Extra` field, optional when adding a 
connection", type=str
-)
-ARG_CONN_EXPORT = Arg(
-    ("file",),
-    help="Output file path for exporting the connections",
-    type=argparse.FileType("w", encoding="UTF-8"),
-)
-ARG_CONN_EXPORT_FORMAT = Arg(
-    ("--format",),
-    help="Deprecated -- use `--file-format` instead. File format to use for 
the export.",
-    type=str,
-    choices=["json", "yaml", "env"],
-)
-ARG_CONN_EXPORT_FILE_FORMAT = Arg(
-    ("--file-format",), help="File format for the export", type=str, 
choices=["json", "yaml", "env"]
-)
-ARG_CONN_SERIALIZATION_FORMAT = Arg(
-    ("--serialization-format",),
-    help="When exporting as `.env` format, defines how connections should be 
serialized. Default is `uri`.",
-    type=string_lower_type,
-    choices=["json", "uri"],
-)
-ARG_CONN_IMPORT = Arg(("file",), help="Import connections from a file")
-ARG_CONN_OVERWRITE = Arg(
-    ("--overwrite",),
-    help="Overwrite existing entries if a conflict occurs",
-    required=False,
-    action="store_true",
-)
-
-# providers
-ARG_PROVIDER_NAME = Arg(
-    ("provider_name",), help="Provider name, required to get provider 
information", type=str
-)
-ARG_FULL = Arg(
-    ("-f", "--full"),
-    help="Full information about the provider, including documentation 
information.",
-    required=False,
-    action="store_true",
-)
-
-# users
-ARG_USERNAME = Arg(("-u", "--username"), help="Username of the user", 
required=True, type=str)
-ARG_USERNAME_OPTIONAL = Arg(("-u", "--username"), help="Username of the user", 
type=str)
-ARG_FIRSTNAME = Arg(("-f", "--firstname"), help="First name of the user", 
required=True, type=str)
-ARG_LASTNAME = Arg(("-l", "--lastname"), help="Last name of the user", 
required=True, type=str)
-ARG_ROLE = Arg(
-    ("-r", "--role"),
-    help="Role of the user. Existing roles include Admin, User, Op, Viewer, 
and Public",
-    required=True,
-    type=str,
-)
-ARG_EMAIL = Arg(("-e", "--email"), help="Email of the user", required=True, 
type=str)
-ARG_EMAIL_OPTIONAL = Arg(("-e", "--email"), help="Email of the user", type=str)
-ARG_PASSWORD = Arg(
-    ("-p", "--password"),
-    help="Password of the user, required to create a user without 
--use-random-password",
-    type=str,
-)
-ARG_USE_RANDOM_PASSWORD = Arg(
-    ("--use-random-password",),
-    help="Do not prompt for password. Use random string instead."
-    " Required to create a user without --password ",
-    default=False,
-    action="store_true",
-)
-ARG_USER_IMPORT = Arg(
-    ("import",),
-    metavar="FILEPATH",
-    help="Import users from JSON file. Example format::\n"
-    + textwrap.indent(
-        textwrap.dedent(
-            """
-            [
-                {
-                    "email": "[email protected]",
-                    "firstname": "Jon",
-                    "lastname": "Doe",
-                    "roles": ["Public"],
-                    "username": "jondoe"
-                }
-            ]"""
-        ),
-        " " * 4,
-    ),
-)
-ARG_USER_EXPORT = Arg(("export",), metavar="FILEPATH", help="Export all users 
to JSON file")
-
-# roles
-ARG_CREATE_ROLE = Arg(("-c", "--create"), help="Create a new role", 
action="store_true")
-ARG_LIST_ROLES = Arg(("-l", "--list"), help="List roles", action="store_true")
-ARG_ROLES = Arg(("role",), help="The name of a role", nargs="*")
-ARG_PERMISSIONS = Arg(("-p", "--permission"), help="Show role permissions", 
action="store_true")
-ARG_ROLE_RESOURCE = Arg(("-r", "--resource"), help="The name of permissions", 
nargs="*", required=True)
-ARG_ROLE_ACTION = Arg(("-a", "--action"), help="The action of permissions", 
nargs="*")
-ARG_ROLE_ACTION_REQUIRED = Arg(("-a", "--action"), help="The action of 
permissions", nargs="*", required=True)
-ARG_AUTOSCALE = Arg(("-a", "--autoscale"), help="Minimum and Maximum number of 
worker to autoscale")
-ARG_SKIP_SERVE_LOGS = Arg(
-    ("-s", "--skip-serve-logs"),
-    default=False,
-    help="Don't start the serve logs process along with the workers",
-    action="store_true",
-)
-ARG_ROLE_IMPORT = Arg(("file",), help="Import roles from JSON file", 
nargs=None)
-ARG_ROLE_EXPORT = Arg(("file",), help="Export all roles to JSON file", 
nargs=None)
-ARG_ROLE_EXPORT_FMT = Arg(
-    ("-p", "--pretty"),
-    help="Format output JSON file by sorting role names and indenting by 4 
spaces",
-    action="store_true",
-)
-
-# info
-ARG_ANONYMIZE = Arg(
-    ("--anonymize",),
-    help="Minimize any personal identifiable information. Use it when sharing 
output with others.",
-    action="store_true",
-)
-ARG_FILE_IO = Arg(
-    ("--file-io",), help="Send output to file.io service and returns link.", 
action="store_true"
-)
-
-# config
-ARG_SECTION = Arg(
-    ("section",),
-    help="The section name",
-)
-ARG_OPTION = Arg(
-    ("option",),
-    help="The option name",
-)
-ARG_OPTIONAL_SECTION = Arg(
-    ("--section",),
-    help="The section name",
-)
-
-# kubernetes cleanup-pods
-ARG_NAMESPACE = Arg(
-    ("--namespace",),
-    default=conf.get("kubernetes_executor", "namespace"),
-    help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in 
configuration.",
-)
-
-ARG_MIN_PENDING_MINUTES = Arg(
-    ("--min-pending-minutes",),
-    default=30,
-    type=positive_int(allow_zero=False),
-    help=(
-        "Pending pods created before the time interval are to be cleaned up, "
-        "measured in minutes. Default value is 30(m). The minimum value is 
5(m)."
-    ),
-)
-
-# jobs check
-ARG_JOB_TYPE_FILTER = Arg(
-    ("--job-type",),
-    choices=("BackfillJob", "LocalTaskJob", "SchedulerJob", "TriggererJob"),
-    action="store",
-    help="The type of job(s) that will be checked.",
-)
-
-ARG_JOB_HOSTNAME_FILTER = Arg(
-    ("--hostname",),
-    default=None,
-    type=str,
-    help="The hostname of job(s) that will be checked.",
-)
-
-ARG_JOB_HOSTNAME_CALLABLE_FILTER = Arg(
-    ("--local",),
-    action="store_true",
-    help="If passed, this command will only show jobs from the local host "
-    "(those with a hostname matching what `hostname_callable` returns).",
-)
-
-ARG_JOB_LIMIT = Arg(
-    ("--limit",),
-    default=1,
-    type=positive_int(allow_zero=True),
-    help="The number of recent jobs that will be checked. To disable limit, 
set 0. ",
-)
-
-ARG_ALLOW_MULTIPLE = Arg(
-    ("--allow-multiple",),
-    action="store_true",
-    help="If passed, this command will be successful even if multiple matching 
alive jobs are found.",
-)
-
-# sync-perm
-ARG_INCLUDE_DAGS = Arg(
-    ("--include-dags",), help="If passed, DAG specific permissions will also 
be synced.", action="store_true"
-)
-
-# triggerer
-ARG_CAPACITY = Arg(
-    ("--capacity",),
-    type=positive_int(allow_zero=False),
-    help="The maximum number of triggers that a Triggerer will run at one 
time.",
-)
-
-# reserialize
-ARG_CLEAR_ONLY = Arg(
-    ("--clear-only",),
-    action="store_true",
-    help="If passed, serialized DAGs will be cleared but not reserialized.",
-)
-
-ALTERNATIVE_CONN_SPECS_ARGS = [
-    ARG_CONN_TYPE,
-    ARG_CONN_DESCRIPTION,
-    ARG_CONN_HOST,
-    ARG_CONN_LOGIN,
-    ARG_CONN_PASSWORD,
-    ARG_CONN_SCHEMA,
-    ARG_CONN_PORT,
-]
-
-
-class ActionCommand(NamedTuple):
-    """Single CLI command."""
-
-    name: str
-    help: str
-    func: Callable
-    args: Iterable[Arg]
-    description: str | None = None
-    epilog: str | None = None
-
-
-class GroupCommand(NamedTuple):
-    """ClI command with subcommands."""
-
-    name: str
-    help: str
-    subcommands: Iterable
-    description: str | None = None
-    epilog: str | None = None
-
-
-CLICommand = Union[ActionCommand, GroupCommand]
-
-DAGS_COMMANDS = (
-    ActionCommand(
-        name="list",
-        help="List all the DAGs",
-        
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_dags"),
-        args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="list-import-errors",
-        help="List all the DAGs that have import errors",
-        
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_import_errors"),
-        args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="report",
-        help="Show DagBag loading report",
-        func=lazy_load_command("airflow.cli.commands.dag_command.dag_report"),
-        args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="list-runs",
-        help="List DAG runs given a DAG id",
-        description=(
-            "List DAG runs given a DAG id. If state option is given, it will 
only search for all the "
-            "dagruns with the given state. If no_backfill option is given, it 
will filter out all "
-            "backfill dagruns for given dag id. If start_date is given, it 
will filter out all the "
-            "dagruns that were executed before this date. If end_date is 
given, it will filter out "
-            "all the dagruns that were executed after this date. "
-        ),
-        
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_dag_runs"),
-        args=(
-            ARG_DAG_ID_REQ_FLAG,
-            ARG_NO_BACKFILL,
-            ARG_STATE,
-            ARG_OUTPUT,
-            ARG_VERBOSE,
-            ARG_START_DATE,
-            ARG_END_DATE,
-        ),
-    ),
-    ActionCommand(
-        name="list-jobs",
-        help="List the jobs",
-        
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_jobs"),
-        args=(ARG_DAG_ID_OPT, ARG_STATE, ARG_LIMIT, ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="state",
-        help="Get the status of a dag run",
-        func=lazy_load_command("airflow.cli.commands.dag_command.dag_state"),
-        args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="next-execution",
-        help="Get the next execution datetimes of a DAG",
-        description=(
-            "Get the next execution datetimes of a DAG. It returns one 
execution unless the "
-            "num-executions option is given"
-        ),
-        
func=lazy_load_command("airflow.cli.commands.dag_command.dag_next_execution"),
-        args=(ARG_DAG_ID, ARG_SUBDIR, ARG_NUM_EXECUTIONS, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="pause",
-        help="Pause a DAG",
-        func=lazy_load_command("airflow.cli.commands.dag_command.dag_pause"),
-        args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="unpause",
-        help="Resume a paused DAG",
-        func=lazy_load_command("airflow.cli.commands.dag_command.dag_unpause"),
-        args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="trigger",
-        help="Trigger a DAG run",
-        func=lazy_load_command("airflow.cli.commands.dag_command.dag_trigger"),
-        args=(
-            ARG_DAG_ID,
-            ARG_SUBDIR,
-            ARG_RUN_ID,
-            ARG_CONF,
-            ARG_EXEC_DATE,
-            ARG_VERBOSE,
-            ARG_REPLACE_MICRO,
-            ARG_OUTPUT,
-        ),
-    ),
-    ActionCommand(
-        name="delete",
-        help="Delete all DB records related to the specified DAG",
-        func=lazy_load_command("airflow.cli.commands.dag_command.dag_delete"),
-        args=(ARG_DAG_ID, ARG_YES, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="show",
-        help="Displays DAG's tasks with their dependencies",
-        description=(
-            "The --imgcat option only works in iTerm.\n"
-            "\n"
-            "For more information, see: 
https://www.iterm2.com/documentation-images.html\n";
-            "\n"
-            "The --save option saves the result to the indicated file.\n"
-            "\n"
-            "The file format is determined by the file extension. "
-            "For more information about supported "
-            "format, see: https://www.graphviz.org/doc/info/output.html\n";
-            "\n"
-            "If you want to create a PNG file then you should execute the 
following command:\n"
-            "airflow dags show <DAG_ID> --save output.png\n"
-            "\n"
-            "If you want to create a DOT file then you should execute the 
following command:\n"
-            "airflow dags show <DAG_ID> --save output.dot\n"
-        ),
-        func=lazy_load_command("airflow.cli.commands.dag_command.dag_show"),
-        args=(
-            ARG_DAG_ID,
-            ARG_SUBDIR,
-            ARG_SAVE,
-            ARG_IMGCAT,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="show-dependencies",
-        help="Displays DAGs with their dependencies",
-        description=(
-            "The --imgcat option only works in iTerm.\n"
-            "\n"
-            "For more information, see: 
https://www.iterm2.com/documentation-images.html\n";
-            "\n"
-            "The --save option saves the result to the indicated file.\n"
-            "\n"
-            "The file format is determined by the file extension. "
-            "For more information about supported "
-            "format, see: https://www.graphviz.org/doc/info/output.html\n";
-            "\n"
-            "If you want to create a PNG file then you should execute the 
following command:\n"
-            "airflow dags show-dependencies --save output.png\n"
-            "\n"
-            "If you want to create a DOT file then you should execute the 
following command:\n"
-            "airflow dags show-dependencies --save output.dot\n"
-        ),
-        
func=lazy_load_command("airflow.cli.commands.dag_command.dag_dependencies_show"),
-        args=(
-            ARG_SUBDIR,
-            ARG_SAVE,
-            ARG_IMGCAT,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="backfill",
-        help="Run subsections of a DAG for a specified date range",
-        description=(
-            "Run subsections of a DAG for a specified date range. If 
reset_dag_run option is used, "
-            "backfill will first prompt users whether airflow should clear all 
the previous dag_run and "
-            "task_instances within the backfill date range. If 
rerun_failed_tasks is used, backfill "
-            "will auto re-run the previous failed task instances  within the 
backfill date range"
-        ),
-        
func=lazy_load_command("airflow.cli.commands.dag_command.dag_backfill"),
-        args=(
-            ARG_DAG_ID,
-            ARG_TASK_REGEX,
-            ARG_START_DATE,
-            ARG_END_DATE,
-            ARG_MARK_SUCCESS,
-            ARG_LOCAL,
-            ARG_DONOT_PICKLE,
-            ARG_YES,
-            ARG_CONTINUE_ON_FAILURES,
-            ARG_DISABLE_RETRY,
-            ARG_BF_IGNORE_DEPENDENCIES,
-            ARG_BF_IGNORE_FIRST_DEPENDS_ON_PAST,
-            ARG_SUBDIR,
-            ARG_POOL,
-            ARG_DELAY_ON_LIMIT,
-            ARG_DRY_RUN,
-            ARG_VERBOSE,
-            ARG_CONF,
-            ARG_RESET_DAG_RUN,
-            ARG_RERUN_FAILED_TASKS,
-            ARG_RUN_BACKWARDS,
-            ARG_TREAT_DAG_AS_REGEX,
-        ),
-    ),
-    ActionCommand(
-        name="test",
-        help="Execute one single DagRun",
-        description=(
-            "Execute one single DagRun for a given DAG and execution date.\n"
-            "\n"
-            "The --imgcat-dagrun option only works in iTerm.\n"
-            "\n"
-            "For more information, see: 
https://www.iterm2.com/documentation-images.html\n";
-            "\n"
-            "If --save-dagrun is used, then, after completing the backfill, 
saves the diagram "
-            "for current DAG Run to the indicated file.\n"
-            "The file format is determined by the file extension. "
-            "For more information about supported format, "
-            "see: https://www.graphviz.org/doc/info/output.html\n";
-            "\n"
-            "If you want to create a PNG file then you should execute the 
following command:\n"
-            "airflow dags test <DAG_ID> <EXECUTION_DATE> --save-dagrun 
output.png\n"
-            "\n"
-            "If you want to create a DOT file then you should execute the 
following command:\n"
-            "airflow dags test <DAG_ID> <EXECUTION_DATE> --save-dagrun 
output.dot\n"
-        ),
-        func=lazy_load_command("airflow.cli.commands.dag_command.dag_test"),
-        args=(
-            ARG_DAG_ID,
-            ARG_EXECUTION_DATE_OPTIONAL,
-            ARG_CONF,
-            ARG_SUBDIR,
-            ARG_SHOW_DAGRUN,
-            ARG_IMGCAT_DAGRUN,
-            ARG_SAVE_DAGRUN,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="reserialize",
-        help="Reserialize all DAGs by parsing the DagBag files",
-        description=(
-            "Drop all serialized dags from the metadata DB. This will cause 
all DAGs to be reserialized "
-            "from the DagBag folder. This can be helpful if your serialized 
DAGs get out of sync with the "
-            "version of Airflow that you are running."
-        ),
-        
func=lazy_load_command("airflow.cli.commands.dag_command.dag_reserialize"),
-        args=(
-            ARG_CLEAR_ONLY,
-            ARG_SUBDIR,
-            ARG_VERBOSE,
-        ),
-    ),
-)
-TASKS_COMMANDS = (
-    ActionCommand(
-        name="list",
-        help="List the tasks within a DAG",
-        func=lazy_load_command("airflow.cli.commands.task_command.task_list"),
-        args=(ARG_DAG_ID, ARG_TREE, ARG_SUBDIR, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="clear",
-        help="Clear a set of task instance, as if they never ran",
-        func=lazy_load_command("airflow.cli.commands.task_command.task_clear"),
-        args=(
-            ARG_DAG_ID,
-            ARG_TASK_REGEX,
-            ARG_START_DATE,
-            ARG_END_DATE,
-            ARG_SUBDIR,
-            ARG_UPSTREAM,
-            ARG_DOWNSTREAM,
-            ARG_YES,
-            ARG_ONLY_FAILED,
-            ARG_ONLY_RUNNING,
-            ARG_EXCLUDE_SUBDAGS,
-            ARG_EXCLUDE_PARENTDAG,
-            ARG_DAG_REGEX,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="state",
-        help="Get the status of a task instance",
-        func=lazy_load_command("airflow.cli.commands.task_command.task_state"),
-        args=(
-            ARG_DAG_ID,
-            ARG_TASK_ID,
-            ARG_EXECUTION_DATE_OR_RUN_ID,
-            ARG_SUBDIR,
-            ARG_VERBOSE,
-            ARG_MAP_INDEX,
-        ),
-    ),
-    ActionCommand(
-        name="failed-deps",
-        help="Returns the unmet dependencies for a task instance",
-        description=(
-            "Returns the unmet dependencies for a task instance from the 
perspective of the scheduler. "
-            "In other words, why a task instance doesn't get scheduled and 
then queued by the scheduler, "
-            "and then run by an executor."
-        ),
-        
func=lazy_load_command("airflow.cli.commands.task_command.task_failed_deps"),
-        args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE_OR_RUN_ID, 
ARG_SUBDIR, ARG_MAP_INDEX, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="render",
-        help="Render a task instance's template(s)",
-        
func=lazy_load_command("airflow.cli.commands.task_command.task_render"),
-        args=(
-            ARG_DAG_ID,
-            ARG_TASK_ID,
-            ARG_EXECUTION_DATE_OR_RUN_ID,
-            ARG_SUBDIR,
-            ARG_VERBOSE,
-            ARG_MAP_INDEX,
-        ),
-    ),
-    ActionCommand(
-        name="run",
-        help="Run a single task instance",
-        func=lazy_load_command("airflow.cli.commands.task_command.task_run"),
-        args=(
-            ARG_DAG_ID,
-            ARG_TASK_ID,
-            ARG_EXECUTION_DATE_OR_RUN_ID,
-            ARG_SUBDIR,
-            ARG_MARK_SUCCESS,
-            ARG_FORCE,
-            ARG_POOL,
-            ARG_CFG_PATH,
-            ARG_LOCAL,
-            ARG_RAW,
-            ARG_IGNORE_ALL_DEPENDENCIES,
-            ARG_IGNORE_DEPENDENCIES,
-            ARG_IGNORE_DEPENDS_ON_PAST,
-            ARG_DEPENDS_ON_PAST,
-            ARG_SHIP_DAG,
-            ARG_PICKLE,
-            ARG_JOB_ID,
-            ARG_INTERACTIVE,
-            ARG_SHUT_DOWN_LOGGING,
-            ARG_MAP_INDEX,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="test",
-        help="Test a task instance",
-        description=(
-            "Test a task instance. This will run a task without checking for 
dependencies or recording "
-            "its state in the database"
-        ),
-        func=lazy_load_command("airflow.cli.commands.task_command.task_test"),
-        args=(
-            ARG_DAG_ID,
-            ARG_TASK_ID,
-            ARG_EXECUTION_DATE_OR_RUN_ID_OPTIONAL,
-            ARG_SUBDIR,
-            ARG_DRY_RUN,
-            ARG_TASK_PARAMS,
-            ARG_POST_MORTEM,
-            ARG_ENV_VARS,
-            ARG_MAP_INDEX,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="states-for-dag-run",
-        help="Get the status of all task instances in a dag run",
-        
func=lazy_load_command("airflow.cli.commands.task_command.task_states_for_dag_run"),
-        args=(ARG_DAG_ID, ARG_EXECUTION_DATE_OR_RUN_ID, ARG_OUTPUT, 
ARG_VERBOSE),
-    ),
-)
-POOLS_COMMANDS = (
-    ActionCommand(
-        name="list",
-        help="List pools",
-        func=lazy_load_command("airflow.cli.commands.pool_command.pool_list"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="get",
-        help="Get pool size",
-        func=lazy_load_command("airflow.cli.commands.pool_command.pool_get"),
-        args=(ARG_POOL_NAME, ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="set",
-        help="Configure pool",
-        func=lazy_load_command("airflow.cli.commands.pool_command.pool_set"),
-        args=(ARG_POOL_NAME, ARG_POOL_SLOTS, ARG_POOL_DESCRIPTION, ARG_OUTPUT, 
ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="delete",
-        help="Delete pool",
-        
func=lazy_load_command("airflow.cli.commands.pool_command.pool_delete"),
-        args=(ARG_POOL_NAME, ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="import",
-        help="Import pools",
-        
func=lazy_load_command("airflow.cli.commands.pool_command.pool_import"),
-        args=(ARG_POOL_IMPORT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="export",
-        help="Export all pools",
-        
func=lazy_load_command("airflow.cli.commands.pool_command.pool_export"),
-        args=(ARG_POOL_EXPORT, ARG_VERBOSE),
-    ),
-)
-VARIABLES_COMMANDS = (
-    ActionCommand(
-        name="list",
-        help="List variables",
-        
func=lazy_load_command("airflow.cli.commands.variable_command.variables_list"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="get",
-        help="Get variable",
-        
func=lazy_load_command("airflow.cli.commands.variable_command.variables_get"),
-        args=(ARG_VAR, ARG_DESERIALIZE_JSON, ARG_DEFAULT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="set",
-        help="Set variable",
-        
func=lazy_load_command("airflow.cli.commands.variable_command.variables_set"),
-        args=(ARG_VAR, ARG_VAR_VALUE, ARG_SERIALIZE_JSON, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="delete",
-        help="Delete variable",
-        
func=lazy_load_command("airflow.cli.commands.variable_command.variables_delete"),
-        args=(ARG_VAR, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="import",
-        help="Import variables",
-        
func=lazy_load_command("airflow.cli.commands.variable_command.variables_import"),
-        args=(ARG_VAR_IMPORT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="export",
-        help="Export all variables",
-        
func=lazy_load_command("airflow.cli.commands.variable_command.variables_export"),
-        args=(ARG_VAR_EXPORT, ARG_VERBOSE),
-    ),
-)
-DB_COMMANDS = (
-    ActionCommand(
-        name="init",
-        help="Initialize the metadata database",
-        func=lazy_load_command("airflow.cli.commands.db_command.initdb"),
-        args=(ARG_VERBOSE,),
-    ),
-    ActionCommand(
-        name="check-migrations",
-        help="Check if migration have finished",
-        description="Check if migration have finished (or continually check 
until timeout)",
-        
func=lazy_load_command("airflow.cli.commands.db_command.check_migrations"),
-        args=(ARG_MIGRATION_TIMEOUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="reset",
-        help="Burn down and rebuild the metadata database",
-        func=lazy_load_command("airflow.cli.commands.db_command.resetdb"),
-        args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="upgrade",
-        help="Upgrade the metadata database to latest version",
-        description=(
-            "Upgrade the schema of the metadata database. "
-            "To print but not execute commands, use option 
``--show-sql-only``. "
-            "If using options ``--from-revision`` or ``--from-version``, you 
must also use "
-            "``--show-sql-only``, because if actually *running* migrations, we 
should only "
-            "migrate from the *current* Alembic revision."
-        ),
-        func=lazy_load_command("airflow.cli.commands.db_command.upgradedb"),
-        args=(
-            ARG_DB_REVISION__UPGRADE,
-            ARG_DB_VERSION__UPGRADE,
-            ARG_DB_SQL_ONLY,
-            ARG_DB_FROM_REVISION,
-            ARG_DB_FROM_VERSION,
-            ARG_DB_RESERIALIZE_DAGS,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="downgrade",
-        help="Downgrade the schema of the metadata database.",
-        description=(
-            "Downgrade the schema of the metadata database. "
-            "You must provide either `--to-revision` or `--to-version`. "
-            "To print but not execute commands, use option `--show-sql-only`. "
-            "If using options `--from-revision` or `--from-version`, you must 
also use `--show-sql-only`, "
-            "because if actually *running* migrations, we should only migrate 
from the *current* Alembic "
-            "revision."
-        ),
-        func=lazy_load_command("airflow.cli.commands.db_command.downgrade"),
-        args=(
-            ARG_DB_REVISION__DOWNGRADE,
-            ARG_DB_VERSION__DOWNGRADE,
-            ARG_DB_SQL_ONLY,
-            ARG_YES,
-            ARG_DB_FROM_REVISION,
-            ARG_DB_FROM_VERSION,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="shell",
-        help="Runs a shell to access the database",
-        func=lazy_load_command("airflow.cli.commands.db_command.shell"),
-        args=(ARG_VERBOSE,),
-    ),
-    ActionCommand(
-        name="check",
-        help="Check if the database can be reached",
-        func=lazy_load_command("airflow.cli.commands.db_command.check"),
-        args=(ARG_VERBOSE,),
-    ),
-    ActionCommand(
-        name="clean",
-        help="Purge old records in metastore tables",
-        
func=lazy_load_command("airflow.cli.commands.db_command.cleanup_tables"),
-        args=(
-            ARG_DB_TABLES,
-            ARG_DB_DRY_RUN,
-            ARG_DB_CLEANUP_TIMESTAMP,
-            ARG_VERBOSE,
-            ARG_YES,
-            ARG_DB_SKIP_ARCHIVE,
-        ),
-    ),
-    ActionCommand(
-        name="export-archived",
-        help="Export archived data from the archive tables",
-        
func=lazy_load_command("airflow.cli.commands.db_command.export_archived"),
-        args=(
-            ARG_DB_EXPORT_FORMAT,
-            ARG_DB_OUTPUT_PATH,
-            ARG_DB_DROP_ARCHIVES,
-            ARG_DB_TABLES,
-            ARG_YES,
-        ),
-    ),
-    ActionCommand(
-        name="drop-archived",
-        help="Drop archived tables created through the db clean command",
-        
func=lazy_load_command("airflow.cli.commands.db_command.drop_archived"),
-        args=(ARG_DB_TABLES, ARG_YES),
-    ),
-)
-CONNECTIONS_COMMANDS = (
-    ActionCommand(
-        name="get",
-        help="Get a connection",
-        
func=lazy_load_command("airflow.cli.commands.connection_command.connections_get"),
-        args=(ARG_CONN_ID, ARG_COLOR, ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="list",
-        help="List connections",
-        
func=lazy_load_command("airflow.cli.commands.connection_command.connections_list"),
-        args=(ARG_OUTPUT, ARG_VERBOSE, ARG_CONN_ID_FILTER),
-    ),
-    ActionCommand(
-        name="add",
-        help="Add a connection",
-        
func=lazy_load_command("airflow.cli.commands.connection_command.connections_add"),
-        args=(ARG_CONN_ID, ARG_CONN_URI, ARG_CONN_JSON, ARG_CONN_EXTRA) + 
tuple(ALTERNATIVE_CONN_SPECS_ARGS),
-    ),
-    ActionCommand(
-        name="delete",
-        help="Delete a connection",
-        
func=lazy_load_command("airflow.cli.commands.connection_command.connections_delete"),
-        args=(ARG_CONN_ID, ARG_COLOR, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="export",
-        help="Export all connections",
-        description=(
-            "All connections can be exported in STDOUT using the following 
command:\n"
-            "airflow connections export -\n"
-            "The file format can be determined by the provided file extension. 
E.g., The following "
-            "command will export the connections in JSON format:\n"
-            "airflow connections export /tmp/connections.json\n"
-            "The --file-format parameter can be used to control the file 
format. E.g., "
-            "the default format is JSON in STDOUT mode, which can be 
overridden using: \n"
-            "airflow connections export - --file-format yaml\n"
-            "The --file-format parameter can also be used for the files, for 
example:\n"
-            "airflow connections export /tmp/connections --file-format json.\n"
-            "When exporting in `env` file format, you control whether URI 
format or JSON format "
-            "is used to serialize the connection by passing `uri` or `json` 
with option "
-            "`--serialization-format`.\n"
-        ),
-        
func=lazy_load_command("airflow.cli.commands.connection_command.connections_export"),
-        args=(
-            ARG_CONN_EXPORT,
-            ARG_CONN_EXPORT_FORMAT,
-            ARG_CONN_EXPORT_FILE_FORMAT,
-            ARG_CONN_SERIALIZATION_FORMAT,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="import",
-        help="Import connections from a file",
-        description=(
-            "Connections can be imported from the output of the export 
command.\n"
-            "The filetype must by json, yaml or env and will be automatically 
inferred."
-        ),
-        
func=lazy_load_command("airflow.cli.commands.connection_command.connections_import"),
-        args=(
-            ARG_CONN_IMPORT,
-            ARG_CONN_OVERWRITE,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="test",
-        help="Test a connection",
-        
func=lazy_load_command("airflow.cli.commands.connection_command.connections_test"),
-        args=(ARG_CONN_ID, ARG_VERBOSE),
-    ),
-)
-PROVIDERS_COMMANDS = (
-    ActionCommand(
-        name="list",
-        help="List installed providers",
-        
func=lazy_load_command("airflow.cli.commands.provider_command.providers_list"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="get",
-        help="Get detailed information about a provider",
-        
func=lazy_load_command("airflow.cli.commands.provider_command.provider_get"),
-        args=(ARG_OUTPUT, ARG_VERBOSE, ARG_FULL, ARG_COLOR, ARG_PROVIDER_NAME),
-    ),
-    ActionCommand(
-        name="links",
-        help="List extra links registered by the providers",
-        
func=lazy_load_command("airflow.cli.commands.provider_command.extra_links_list"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="widgets",
-        help="Get information about registered connection form widgets",
-        
func=lazy_load_command("airflow.cli.commands.provider_command.connection_form_widget_list"),
-        args=(
-            ARG_OUTPUT,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="hooks",
-        help="List registered provider hooks",
-        
func=lazy_load_command("airflow.cli.commands.provider_command.hooks_list"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="behaviours",
-        help="Get information about registered connection types with custom 
behaviours",
-        
func=lazy_load_command("airflow.cli.commands.provider_command.connection_field_behaviours"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="logging",
-        help="Get information about task logging handlers provided",
-        
func=lazy_load_command("airflow.cli.commands.provider_command.logging_list"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="secrets",
-        help="Get information about secrets backends provided",
-        
func=lazy_load_command("airflow.cli.commands.provider_command.secrets_backends_list"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="auth",
-        help="Get information about API auth backends provided",
-        
func=lazy_load_command("airflow.cli.commands.provider_command.auth_backend_list"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-)
-
-USERS_COMMANDS = (
-    ActionCommand(
-        name="list",
-        help="List users",
-        func=lazy_load_command("airflow.cli.commands.user_command.users_list"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="create",
-        help="Create a user",
-        
func=lazy_load_command("airflow.cli.commands.user_command.users_create"),
-        args=(
-            ARG_ROLE,
-            ARG_USERNAME,
-            ARG_EMAIL,
-            ARG_FIRSTNAME,
-            ARG_LASTNAME,
-            ARG_PASSWORD,
-            ARG_USE_RANDOM_PASSWORD,
-            ARG_VERBOSE,
-        ),
-        epilog=(
-            "examples:\n"
-            'To create an user with "Admin" role and username equals to 
"admin", run:\n'
-            "\n"
-            "    $ airflow users create \\\n"
-            "          --username admin \\\n"
-            "          --firstname FIRST_NAME \\\n"
-            "          --lastname LAST_NAME \\\n"
-            "          --role Admin \\\n"
-            "          --email [email protected]"
-        ),
-    ),
-    ActionCommand(
-        name="delete",
-        help="Delete a user",
-        
func=lazy_load_command("airflow.cli.commands.user_command.users_delete"),
-        args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="add-role",
-        help="Add role to a user",
-        func=lazy_load_command("airflow.cli.commands.user_command.add_role"),
-        args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE, 
ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="remove-role",
-        help="Remove role from a user",
-        
func=lazy_load_command("airflow.cli.commands.user_command.remove_role"),
-        args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE, 
ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="import",
-        help="Import users",
-        
func=lazy_load_command("airflow.cli.commands.user_command.users_import"),
-        args=(ARG_USER_IMPORT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="export",
-        help="Export all users",
-        
func=lazy_load_command("airflow.cli.commands.user_command.users_export"),
-        args=(ARG_USER_EXPORT, ARG_VERBOSE),
-    ),
-)
-ROLES_COMMANDS = (
-    ActionCommand(
-        name="list",
-        help="List roles",
-        func=lazy_load_command("airflow.cli.commands.role_command.roles_list"),
-        args=(ARG_PERMISSIONS, ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="create",
-        help="Create role",
-        
func=lazy_load_command("airflow.cli.commands.role_command.roles_create"),
-        args=(ARG_ROLES, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="delete",
-        help="Delete role",
-        
func=lazy_load_command("airflow.cli.commands.role_command.roles_delete"),
-        args=(ARG_ROLES, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="add-perms",
-        help="Add roles permissions",
-        
func=lazy_load_command("airflow.cli.commands.role_command.roles_add_perms"),
-        args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION_REQUIRED, 
ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="del-perms",
-        help="Delete roles permissions",
-        
func=lazy_load_command("airflow.cli.commands.role_command.roles_del_perms"),
-        args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="export",
-        help="Export roles (without permissions) from db to JSON file",
-        
func=lazy_load_command("airflow.cli.commands.role_command.roles_export"),
-        args=(ARG_ROLE_EXPORT, ARG_ROLE_EXPORT_FMT, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="import",
-        help="Import roles (without permissions) from JSON file to db",
-        
func=lazy_load_command("airflow.cli.commands.role_command.roles_import"),
-        args=(ARG_ROLE_IMPORT, ARG_VERBOSE),
-    ),
-)
-
-CELERY_COMMANDS = (
-    ActionCommand(
-        name="worker",
-        help="Start a Celery worker node",
-        func=lazy_load_command("airflow.cli.commands.celery_command.worker"),
-        args=(
-            ARG_QUEUES,
-            ARG_CONCURRENCY,
-            ARG_CELERY_HOSTNAME,
-            ARG_PID,
-            ARG_DAEMON,
-            ARG_UMASK,
-            ARG_STDOUT,
-            ARG_STDERR,
-            ARG_LOG_FILE,
-            ARG_AUTOSCALE,
-            ARG_SKIP_SERVE_LOGS,
-            ARG_WITHOUT_MINGLE,
-            ARG_WITHOUT_GOSSIP,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="flower",
-        help="Start a Celery Flower",
-        func=lazy_load_command("airflow.cli.commands.celery_command.flower"),
-        args=(
-            ARG_FLOWER_HOSTNAME,
-            ARG_FLOWER_PORT,
-            ARG_FLOWER_CONF,
-            ARG_FLOWER_URL_PREFIX,
-            ARG_FLOWER_BASIC_AUTH,
-            ARG_BROKER_API,
-            ARG_PID,
-            ARG_DAEMON,
-            ARG_STDOUT,
-            ARG_STDERR,
-            ARG_LOG_FILE,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="stop",
-        help="Stop the Celery worker gracefully",
-        
func=lazy_load_command("airflow.cli.commands.celery_command.stop_worker"),
-        args=(ARG_PID, ARG_VERBOSE),
-    ),
-)
-
-CONFIG_COMMANDS = (
-    ActionCommand(
-        name="get-value",
-        help="Print the value of the configuration",
-        
func=lazy_load_command("airflow.cli.commands.config_command.get_value"),
-        args=(
-            ARG_SECTION,
-            ARG_OPTION,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="list",
-        help="List options for the configuration",
-        
func=lazy_load_command("airflow.cli.commands.config_command.show_config"),
-        args=(ARG_OPTIONAL_SECTION, ARG_COLOR, ARG_VERBOSE),
-    ),
-)
-
-KUBERNETES_COMMANDS = (
-    ActionCommand(
-        name="cleanup-pods",
-        help=(
-            "Clean up Kubernetes pods "
-            "(created by KubernetesExecutor/KubernetesPodOperator) "
-            "in evicted/failed/succeeded/pending states"
-        ),
-        
func=lazy_load_command("airflow.cli.commands.kubernetes_command.cleanup_pods"),
-        args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="generate-dag-yaml",
-        help="Generate YAML files for all tasks in DAG. Useful for debugging 
tasks without "
-        "launching into a cluster",
-        
func=lazy_load_command("airflow.cli.commands.kubernetes_command.generate_pod_yaml"),
-        args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH, 
ARG_VERBOSE),
-    ),
-)
-
-JOBS_COMMANDS = (
-    ActionCommand(
-        name="check",
-        help="Checks if job(s) are still alive",
-        func=lazy_load_command("airflow.cli.commands.jobs_command.check"),
-        args=(
-            ARG_JOB_TYPE_FILTER,
-            ARG_JOB_HOSTNAME_FILTER,
-            ARG_JOB_HOSTNAME_CALLABLE_FILTER,
-            ARG_JOB_LIMIT,
-            ARG_ALLOW_MULTIPLE,
-            ARG_VERBOSE,
-        ),
-        epilog=(
-            "examples:\n"
-            "To check if the local scheduler is still working properly, run:\n"
-            "\n"
-            '    $ airflow jobs check --job-type SchedulerJob --local"\n'
-            "\n"
-            "To check if any scheduler is running when you are using high 
availability, run:\n"
-            "\n"
-            "    $ airflow jobs check --job-type SchedulerJob --allow-multiple 
--limit 100"
-        ),
-    ),
-)
-
-airflow_commands: list[CLICommand] = [
-    GroupCommand(
-        name="dags",
-        help="Manage DAGs",
-        subcommands=DAGS_COMMANDS,
-    ),
-    GroupCommand(
-        name="kubernetes", help="Tools to help run the KubernetesExecutor", 
subcommands=KUBERNETES_COMMANDS
-    ),
-    GroupCommand(
-        name="tasks",
-        help="Manage tasks",
-        subcommands=TASKS_COMMANDS,
-    ),
-    GroupCommand(
-        name="pools",
-        help="Manage pools",
-        subcommands=POOLS_COMMANDS,
-    ),
-    GroupCommand(
-        name="variables",
-        help="Manage variables",
-        subcommands=VARIABLES_COMMANDS,
-    ),
-    GroupCommand(
-        name="jobs",
-        help="Manage jobs",
-        subcommands=JOBS_COMMANDS,
-    ),
-    GroupCommand(
-        name="db",
-        help="Database operations",
-        subcommands=DB_COMMANDS,
-    ),
-    ActionCommand(
-        name="kerberos",
-        help="Start a kerberos ticket renewer",
-        
func=lazy_load_command("airflow.cli.commands.kerberos_command.kerberos"),
-        args=(
-            ARG_PRINCIPAL,
-            ARG_KEYTAB,
-            ARG_PID,
-            ARG_DAEMON,
-            ARG_STDOUT,
-            ARG_STDERR,
-            ARG_LOG_FILE,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="webserver",
-        help="Start a Airflow webserver instance",
-        
func=lazy_load_command("airflow.cli.commands.webserver_command.webserver"),
-        args=(
-            ARG_PORT,
-            ARG_WORKERS,
-            ARG_WORKERCLASS,
-            ARG_WORKER_TIMEOUT,
-            ARG_HOSTNAME,
-            ARG_PID,
-            ARG_DAEMON,
-            ARG_STDOUT,
-            ARG_STDERR,
-            ARG_ACCESS_LOGFILE,
-            ARG_ERROR_LOGFILE,
-            ARG_ACCESS_LOGFORMAT,
-            ARG_LOG_FILE,
-            ARG_SSL_CERT,
-            ARG_SSL_KEY,
-            ARG_DEBUG,
-        ),
-    ),
-    ActionCommand(
-        name="internal-api",
-        help="Start a Airflow Internal API instance",
-        
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
-        args=(
-            ARG_INTERNAL_API_PORT,
-            ARG_INTERNAL_API_WORKERS,
-            ARG_INTERNAL_API_WORKERCLASS,
-            ARG_INTERNAL_API_WORKER_TIMEOUT,
-            ARG_INTERNAL_API_HOSTNAME,
-            ARG_PID,
-            ARG_DAEMON,
-            ARG_STDOUT,
-            ARG_STDERR,
-            ARG_INTERNAL_API_ACCESS_LOGFILE,
-            ARG_INTERNAL_API_ERROR_LOGFILE,
-            ARG_INTERNAL_API_ACCESS_LOGFORMAT,
-            ARG_LOG_FILE,
-            ARG_SSL_CERT,
-            ARG_SSL_KEY,
-            ARG_DEBUG,
-        ),
-    ),
-    ActionCommand(
-        name="scheduler",
-        help="Start a scheduler instance",
-        
func=lazy_load_command("airflow.cli.commands.scheduler_command.scheduler"),
-        args=(
-            ARG_SUBDIR,
-            ARG_NUM_RUNS,
-            ARG_DO_PICKLE,
-            ARG_PID,
-            ARG_DAEMON,
-            ARG_STDOUT,
-            ARG_STDERR,
-            ARG_LOG_FILE,
-            ARG_SKIP_SERVE_LOGS,
-            ARG_VERBOSE,
-        ),
-        epilog=(
-            "Signals:\n"
-            "\n"
-            "  - SIGUSR2: Dump a snapshot of task state being tracked by the 
executor.\n"
-            "\n"
-            "    Example:\n"
-            '        pkill -f -USR2 "airflow scheduler"'
-        ),
-    ),
-    ActionCommand(
-        name="triggerer",
-        help="Start a triggerer instance",
-        
func=lazy_load_command("airflow.cli.commands.triggerer_command.triggerer"),
-        args=(
-            ARG_PID,
-            ARG_DAEMON,
-            ARG_STDOUT,
-            ARG_STDERR,
-            ARG_LOG_FILE,
-            ARG_CAPACITY,
-            ARG_VERBOSE,
-            ARG_SKIP_SERVE_LOGS,
-        ),
-    ),
-    ActionCommand(
-        name="dag-processor",
-        help="Start a standalone Dag Processor instance",
-        
func=lazy_load_command("airflow.cli.commands.dag_processor_command.dag_processor"),
-        args=(
-            ARG_PID,
-            ARG_DAEMON,
-            ARG_SUBDIR,
-            ARG_NUM_RUNS,
-            ARG_DO_PICKLE,
-            ARG_STDOUT,
-            ARG_STDERR,
-            ARG_LOG_FILE,
-            ARG_VERBOSE,
-        ),
-    ),
-    ActionCommand(
-        name="version",
-        help="Show the version",
-        func=lazy_load_command("airflow.cli.commands.version_command.version"),
-        args=(),
-    ),
-    ActionCommand(
-        name="cheat-sheet",
-        help="Display cheat sheet",
-        
func=lazy_load_command("airflow.cli.commands.cheat_sheet_command.cheat_sheet"),
-        args=(ARG_VERBOSE,),
-    ),
-    GroupCommand(
-        name="connections",
-        help="Manage connections",
-        subcommands=CONNECTIONS_COMMANDS,
-    ),
-    GroupCommand(
-        name="providers",
-        help="Display providers",
-        subcommands=PROVIDERS_COMMANDS,
-    ),
-    GroupCommand(
-        name="users",
-        help="Manage users",
-        subcommands=USERS_COMMANDS,
-    ),
-    GroupCommand(
-        name="roles",
-        help="Manage roles",
-        subcommands=ROLES_COMMANDS,
-    ),
-    ActionCommand(
-        name="sync-perm",
-        help="Update permissions for existing roles and optionally DAGs",
-        
func=lazy_load_command("airflow.cli.commands.sync_perm_command.sync_perm"),
-        args=(ARG_INCLUDE_DAGS, ARG_VERBOSE),
-    ),
-    ActionCommand(
-        name="rotate-fernet-key",
-        
func=lazy_load_command("airflow.cli.commands.rotate_fernet_key_command.rotate_fernet_key"),
-        help="Rotate encrypted connection credentials and variables",
-        description=(
-            "Rotate all encrypted connection credentials and variables; see "
-            
"https://airflow.apache.org/docs/apache-airflow/stable/howto/secure-connections.html";
-            "#rotating-encryption-keys"
-        ),
-        args=(),
-    ),
-    GroupCommand(name="config", help="View configuration", 
subcommands=CONFIG_COMMANDS),
-    ActionCommand(
-        name="info",
-        help="Show information about current Airflow and environment",
-        func=lazy_load_command("airflow.cli.commands.info_command.show_info"),
-        args=(
-            ARG_ANONYMIZE,
-            ARG_FILE_IO,
-            ARG_VERBOSE,
-            ARG_OUTPUT,
-        ),
-    ),
-    ActionCommand(
-        name="plugins",
-        help="Dump information about loaded plugins",
-        
func=lazy_load_command("airflow.cli.commands.plugins_command.dump_plugins"),
-        args=(ARG_OUTPUT, ARG_VERBOSE),
-    ),
-    GroupCommand(
-        name="celery",
-        help="Celery components",
-        description=(
-            "Start celery components. Works only when using CeleryExecutor. 
For more information, see "
-            
"https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html";
-        ),
-        subcommands=CELERY_COMMANDS,
-    ),
-    ActionCommand(
-        name="standalone",
-        help="Run an all-in-one copy of Airflow",
-        
func=lazy_load_command("airflow.cli.commands.standalone_command.standalone"),
-        args=tuple(),
-    ),
-]
 ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in 
airflow_commands}
 
 
-def _remove_dag_id_opt(command: ActionCommand):
-    cmd = command._asdict()
-    cmd["args"] = (arg for arg in command.args if arg is not ARG_DAG_ID)
-    return ActionCommand(**cmd)
-
-
-dag_cli_commands: list[CLICommand] = [
-    GroupCommand(
-        name="dags",
-        help="Manage DAGs",
-        subcommands=[
-            _remove_dag_id_opt(sp)
-            for sp in DAGS_COMMANDS
-            if sp.name in ["backfill", "list-runs", "pause", "unpause", "test"]
-        ],
-    ),
-    GroupCommand(
-        name="tasks",
-        help="Manage tasks",
-        subcommands=[_remove_dag_id_opt(sp) for sp in TASKS_COMMANDS if 
sp.name in ["list", "test", "run"]],
-    ),
-]
-DAG_CLI_DICT: dict[str, CLICommand] = {sp.name: sp for sp in dag_cli_commands}
-
-
 class AirflowHelpFormatter(RichHelpFormatter):
     """
     Custom help formatter to display help message.
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index 41143bebe9..fbbb257982 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -27,7 +27,7 @@ from unittest.mock import patch
 
 import pytest
 
-from airflow.cli import cli_parser
+from airflow.cli import cli_config, cli_parser
 from tests.test_utils.config import conf_vars
 
 # Can not be `--snake_case` or contain uppercase letter
@@ -177,7 +177,7 @@ class TestCli:
 
         all_command_as_args = [
             command_as_args
-            for top_command in cli_parser.dag_cli_commands
+            for top_command in cli_config.dag_cli_commands
             for command_as_args in (
                 [[top_command.name]]
                 if isinstance(top_command, cli_parser.ActionCommand)
@@ -189,12 +189,12 @@ class TestCli:
                 parser.parse_args([*cmd_args, "--help"])
 
     def test_positive_int(self):
-        assert 1 == cli_parser.positive_int(allow_zero=True)("1")
-        assert 0 == cli_parser.positive_int(allow_zero=True)("0")
+        assert 1 == cli_config.positive_int(allow_zero=True)("1")
+        assert 0 == cli_config.positive_int(allow_zero=True)("0")
 
         with pytest.raises(argparse.ArgumentTypeError):
-            cli_parser.positive_int(allow_zero=False)("0")
-            cli_parser.positive_int(allow_zero=True)("-1")
+            cli_config.positive_int(allow_zero=False)("0")
+            cli_config.positive_int(allow_zero=True)("-1")
 
     def test_dag_parser_celery_command_require_celery_executor(self):
         with conf_vars({("core", "executor"): "SequentialExecutor"}), 
contextlib.redirect_stderr(
@@ -251,7 +251,7 @@ class TestCli:
         )
 
     @pytest.mark.parametrize("export_format", ["json", "yaml", "unknown"])
-    @patch("airflow.cli.cli_parser.os.path.isdir", return_value=True)
+    @patch("airflow.cli.cli_config.os.path.isdir", return_value=True)
     def 
test_invalid_choice_raises_for_export_format_in_db_export_archived_command(
         self, mock_isdir, export_format
     ):


Reply via email to