This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1b4c6bed94 Separate cli_parser.py into two modules (#29962)
1b4c6bed94 is described below
commit 1b4c6bed94580eab656cc476e99a4980813b11ca
Author: Niko Oliveira <[email protected]>
AuthorDate: Thu Mar 16 01:01:44 2023 -0700
Separate cli_parser.py into two modules (#29962)
* Separate cli_parser.py into two modules
In order to avoid import cycles for future cli changes as well as to improve
the high coupling of the cli_parser module (which was over 2k lines),
create a new module which will hold the parser methods and another module
to contain the config/definitions of the Args, Commands, Actions, etc
* Update airflow/cli/cli_config.py
Co-authored-by: Tzu-ping Chung <[email protected]>
* Update airflow/cli/cli_parser.py
Co-authored-by: Tzu-ping Chung <[email protected]>
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/cli/{cli_parser.py => cli_config.py} | 113 +-
airflow/cli/cli_parser.py | 2247 +-------------------------
tests/cli/test_cli_parser.py | 14 +-
3 files changed, 29 insertions(+), 2345 deletions(-)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_config.py
similarity index 95%
copy from airflow/cli/cli_parser.py
copy to airflow/cli/cli_config.py
index 66beadfde1..f687a478d6 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_config.py
@@ -16,28 +16,25 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Command-line interface."""
+"""Explicit configuration and definition of Airflow CLI commands."""
+
from __future__ import annotations
import argparse
import json
import os
import textwrap
-from argparse import Action, ArgumentError
-from functools import lru_cache
+from argparse import ArgumentError
from typing import Callable, Iterable, NamedTuple, Union
import lazy_object_proxy
-from rich_argparse import RawTextRichHelpFormatter, RichHelpFormatter
from airflow import settings
from airflow.cli.commands.legacy_commands import check_legacy_command
from airflow.configuration import conf
-from airflow.exceptions import AirflowException
from airflow.executors.executor_constants import CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
from airflow.utils.cli import ColorMode
-from airflow.utils.helpers import partition
from airflow.utils.module_loading import import_string
from airflow.utils.timezone import parse as parsedate
@@ -836,7 +833,7 @@ ARG_ENV_VARS = Arg(
)
# connections
-ARG_CONN_ID = Arg(("conn_id",), help="Connection ID, required to
get/add/delete/test a connection", type=str)
+ARG_CONN_ID = Arg(("conn_id",), help="Connection id, required to
get/add/delete/test a connection", type=str)
ARG_CONN_ID_FILTER = Arg(
("--conn-id",), help="If passed, only items with the specified connection
ID will be displayed", type=str
)
@@ -1999,7 +1996,7 @@ JOBS_COMMANDS = (
),
)
-airflow_commands: list[CLICommand] = [
+core_commands: list[CLICommand] = [
GroupCommand(
name="dags",
help="Manage DAGs",
@@ -2233,7 +2230,6 @@ airflow_commands: list[CLICommand] = [
args=tuple(),
),
]
-ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in
airflow_commands}
def _remove_dag_id_opt(command: ActionCommand):
@@ -2259,102 +2255,3 @@ dag_cli_commands: list[CLICommand] = [
),
]
DAG_CLI_DICT: dict[str, CLICommand] = {sp.name: sp for sp in dag_cli_commands}
-
-
-class AirflowHelpFormatter(RichHelpFormatter):
- """
- Custom help formatter to display help message.
-
- It displays simple commands and groups of commands in separate sections.
- """
-
- def _iter_indented_subactions(self, action: Action):
- if isinstance(action, argparse._SubParsersAction):
-
- self._indent()
- subactions = action._get_subactions()
- action_subcommands, group_subcommands = partition(
- lambda d: isinstance(ALL_COMMANDS_DICT[d.dest], GroupCommand),
subactions
- )
- yield Action([], "\n%*s%s:" % (self._current_indent, "",
"Groups"), nargs=0)
- self._indent()
- yield from group_subcommands
- self._dedent()
-
- yield Action([], "\n%*s%s:" % (self._current_indent, "",
"Commands"), nargs=0)
- self._indent()
- yield from action_subcommands
- self._dedent()
- self._dedent()
- else:
- yield from super()._iter_indented_subactions(action)
-
-
-class LazyRichHelpFormatter(RawTextRichHelpFormatter):
- """
- Custom help formatter to display help message.
-
- It resolves lazy help string before printing it using rich.
- """
-
- def add_argument(self, action: Action) -> None:
- if isinstance(action.help, lazy_object_proxy.Proxy):
- action.help = str(action.help)
- return super().add_argument(action)
-
-
-@lru_cache(maxsize=None)
-def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser:
- """Creates and returns command line argument parser."""
- parser = DefaultHelpParser(prog="airflow",
formatter_class=AirflowHelpFormatter)
- subparsers = parser.add_subparsers(dest="subcommand",
metavar="GROUP_OR_COMMAND")
- subparsers.required = True
-
- command_dict = DAG_CLI_DICT if dag_parser else ALL_COMMANDS_DICT
- subparser_list = command_dict.keys()
- sub_name: str
- for sub_name in sorted(subparser_list):
- sub: CLICommand = command_dict[sub_name]
- _add_command(subparsers, sub)
- return parser
-
-
-def _sort_args(args: Iterable[Arg]) -> Iterable[Arg]:
- """Sort subcommand optional args, keep positional args."""
-
- def get_long_option(arg: Arg):
- """Get long option from Arg.flags."""
- return arg.flags[0] if len(arg.flags) == 1 else arg.flags[1]
-
- positional, optional = partition(lambda x: x.flags[0].startswith("-"),
args)
- yield from positional
- yield from sorted(optional, key=lambda x: get_long_option(x).lower())
-
-
-def _add_command(subparsers: argparse._SubParsersAction, sub: CLICommand) ->
None:
- sub_proc = subparsers.add_parser(
- sub.name, help=sub.help, description=sub.description or sub.help,
epilog=sub.epilog
- )
- sub_proc.formatter_class = LazyRichHelpFormatter
-
- if isinstance(sub, GroupCommand):
- _add_group_command(sub, sub_proc)
- elif isinstance(sub, ActionCommand):
- _add_action_command(sub, sub_proc)
- else:
- raise AirflowException("Invalid command definition.")
-
-
-def _add_action_command(sub: ActionCommand, sub_proc: argparse.ArgumentParser)
-> None:
- for arg in _sort_args(sub.args):
- arg.add_to_parser(sub_proc)
- sub_proc.set_defaults(func=sub.func)
-
-
-def _add_group_command(sub: GroupCommand, sub_proc: argparse.ArgumentParser)
-> None:
- subcommands = sub.subcommands
- sub_subparsers = sub_proc.add_subparsers(dest="subcommand",
metavar="COMMAND")
- sub_subparsers.required = True
-
- for command in sorted(subcommands, key=lambda x: x.name):
- _add_command(sub_subparsers, command)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 66beadfde1..9ccccd0f30 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -16,2251 +16,38 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Command-line interface."""
+"""Produce a CLI parser object from Airflow CLI command configuration.
+
+.. seealso:: :mod:`airflow.cli.cli_config`
+"""
+
from __future__ import annotations
import argparse
-import json
-import os
-import textwrap
-from argparse import Action, ArgumentError
+from argparse import Action
from functools import lru_cache
-from typing import Callable, Iterable, NamedTuple, Union
+from typing import Iterable
import lazy_object_proxy
from rich_argparse import RawTextRichHelpFormatter, RichHelpFormatter
-from airflow import settings
-from airflow.cli.commands.legacy_commands import check_legacy_command
-from airflow.configuration import conf
+from airflow.cli.cli_config import (
+ DAG_CLI_DICT,
+ ActionCommand,
+ Arg,
+ CLICommand,
+ DefaultHelpParser,
+ GroupCommand,
+ core_commands,
+)
from airflow.exceptions import AirflowException
-from airflow.executors.executor_constants import CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR
-from airflow.executors.executor_loader import ExecutorLoader
-from airflow.utils.cli import ColorMode
from airflow.utils.helpers import partition
-from airflow.utils.module_loading import import_string
-from airflow.utils.timezone import parse as parsedate
-
-BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
-
-
-def lazy_load_command(import_path: str) -> Callable:
- """Create a lazy loader for command."""
- _, _, name = import_path.rpartition(".")
-
- def command(*args, **kwargs):
- func = import_string(import_path)
- return func(*args, **kwargs)
-
- command.__name__ = name
-
- return command
-
-
-class DefaultHelpParser(argparse.ArgumentParser):
- """CustomParser to display help message."""
-
- def _check_value(self, action, value):
- """Override _check_value and check conditionally added command."""
- if action.dest == "subcommand" and value == "celery":
- executor = conf.get("core", "EXECUTOR")
- if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
- executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
- classes = ()
- try:
- from airflow.executors.celery_executor import
CeleryExecutor
-
- classes += (CeleryExecutor,)
- except ImportError:
- message = (
- "The celery subcommand requires that you pip install
the celery module. "
- "To do it, run: pip install 'apache-airflow[celery]'"
- )
- raise ArgumentError(action, message)
- try:
- from airflow.executors.celery_kubernetes_executor import
CeleryKubernetesExecutor
-
- classes += (CeleryKubernetesExecutor,)
- except ImportError:
- pass
- if not issubclass(executor_cls, classes):
- message = (
- f"celery subcommand works only with CeleryExecutor,
CeleryKubernetesExecutor and "
- f"executors derived from them, your current executor:
{executor}, subclassed from: "
- f'{", ".join([base_cls.__qualname__ for base_cls in
executor_cls.__bases__])}'
- )
- raise ArgumentError(action, message)
- if action.dest == "subcommand" and value == "kubernetes":
- try:
- import kubernetes.client # noqa: F401
- except ImportError:
- message = (
- "The kubernetes subcommand requires that you pip install
the kubernetes python client. "
- "To do it, run: pip install
'apache-airflow[cncf.kubernetes]'"
- )
- raise ArgumentError(action, message)
-
- if action.choices is not None and value not in action.choices:
- check_legacy_command(action, value)
-
- super()._check_value(action, value)
-
- def error(self, message):
- """Override error and use print_instead of print_usage."""
- self.print_help()
- self.exit(2, f"\n{self.prog} command error: {message}, see help
above.\n")
-
-
-# Used in Arg to enable `None' as a distinct value from "not passed"
-_UNSET = object()
-
-
-class Arg:
- """Class to keep information about command line argument."""
-
- def __init__(
- self,
- flags=_UNSET,
- help=_UNSET,
- action=_UNSET,
- default=_UNSET,
- nargs=_UNSET,
- type=_UNSET,
- choices=_UNSET,
- required=_UNSET,
- metavar=_UNSET,
- dest=_UNSET,
- ):
- self.flags = flags
- self.kwargs = {}
- for k, v in locals().items():
- if v is _UNSET:
- continue
- if k in ("self", "flags"):
- continue
-
- self.kwargs[k] = v
-
- def add_to_parser(self, parser: argparse.ArgumentParser):
- """Add this argument to an ArgumentParser."""
- if "metavar" in self.kwargs and "type" not in self.kwargs:
- if self.kwargs["metavar"] == "DIRPATH":
- type = lambda x: self._is_valid_directory(parser, x)
- self.kwargs["type"] = type
- parser.add_argument(*self.flags, **self.kwargs)
-
- def _is_valid_directory(self, parser, arg):
- if not os.path.isdir(arg):
- parser.error(f"The directory '{arg}' does not exist!")
- return arg
-
-
-def positive_int(*, allow_zero):
- """Define a positive int type for an argument."""
-
- def _check(value):
- try:
- value = int(value)
- if allow_zero and value == 0:
- return value
- if value > 0:
- return value
- except ValueError:
- pass
- raise argparse.ArgumentTypeError(f"invalid positive int value:
'{value}'")
-
- return _check
-
-
-def string_list_type(val):
- """Parses comma-separated list and returns list of string (strips
whitespace)."""
- return [x.strip() for x in val.split(",")]
-
-
-def string_lower_type(val):
- """Lowers arg."""
- if not val:
- return
- return val.strip().lower()
-
-
-# Shared
-ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag")
-ARG_TASK_ID = Arg(("task_id",), help="The id of the task")
-ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the
DAG", type=parsedate)
-ARG_EXECUTION_DATE_OPTIONAL = Arg(
- ("execution_date",), nargs="?", help="The execution date of the DAG
(optional)", type=parsedate
-)
-ARG_EXECUTION_DATE_OR_RUN_ID = Arg(
- ("execution_date_or_run_id",), help="The execution_date of the DAG or
run_id of the DAGRun"
-)
-ARG_EXECUTION_DATE_OR_RUN_ID_OPTIONAL = Arg(
- ("execution_date_or_run_id",),
- nargs="?",
- help="The execution_date of the DAG or run_id of the DAGRun (optional)",
-)
-ARG_TASK_REGEX = Arg(
- ("-t", "--task-regex"), help="The regex to filter specific task_ids to
backfill (optional)"
-)
-ARG_SUBDIR = Arg(
- ("-S", "--subdir"),
- help=(
- "File location or directory from which to look for the dag. "
- "Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the "
- "value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg' "
- ),
- default="[AIRFLOW_HOME]/dags" if BUILD_DOCS else settings.DAGS_FOLDER,
-)
-ARG_START_DATE = Arg(("-s", "--start-date"), help="Override start_date
YYYY-MM-DD", type=parsedate)
-ARG_END_DATE = Arg(("-e", "--end-date"), help="Override end_date YYYY-MM-DD",
type=parsedate)
-ARG_OUTPUT_PATH = Arg(
- (
- "-o",
- "--output-path",
- ),
- help="The output for generated yaml files",
- type=str,
- default="[CWD]" if BUILD_DOCS else os.getcwd(),
-)
-ARG_DRY_RUN = Arg(
- ("-n", "--dry-run"),
- help="Perform a dry run for each task. Only renders Template Fields for
each task, nothing else",
- action="store_true",
-)
-ARG_PID = Arg(("--pid",), help="PID file location", nargs="?")
-ARG_DAEMON = Arg(
- ("-D", "--daemon"), help="Daemonize instead of running in the foreground",
action="store_true"
-)
-ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file")
-ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file")
-ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file")
-ARG_YES = Arg(
- ("-y", "--yes"),
- help="Do not prompt to confirm. Use with care!",
- action="store_true",
- default=False,
-)
-ARG_OUTPUT = Arg(
- (
- "-o",
- "--output",
- ),
- help="Output format. Allowed values: json, yaml, plain, table (default:
table)",
- metavar="(table, json, yaml, plain)",
- choices=("table", "json", "yaml", "plain"),
- default="table",
-)
-ARG_COLOR = Arg(
- ("--color",),
- help="Do emit colored output (default: auto)",
- choices={ColorMode.ON, ColorMode.OFF, ColorMode.AUTO},
- default=ColorMode.AUTO,
-)
-
-# DB args
-ARG_VERSION_RANGE = Arg(
- ("-r", "--range"),
- help="Version range(start:end) for offline sql generation. Example:
'2.0.2:2.2.3'",
- default=None,
-)
-ARG_REVISION_RANGE = Arg(
- ("--revision-range",),
- help=(
- "Migration revision range(start:end) to use for offline sql
generation. "
- "Example: ``a13f7613ad25:7b2661a43ba3``"
- ),
- default=None,
-)
-
-# list_dag_runs
-ARG_DAG_ID_REQ_FLAG = Arg(
- ("-d", "--dag-id"), required=True, help="The id of the dag"
-) # TODO: convert this to a positional arg in Airflow 3
-ARG_NO_BACKFILL = Arg(
- ("--no-backfill",), help="filter all the backfill dagruns given the dag
id", action="store_true"
-)
-ARG_STATE = Arg(("--state",), help="Only list the dag runs corresponding to
the state")
-
-# list_jobs
-ARG_DAG_ID_OPT = Arg(("-d", "--dag-id"), help="The id of the dag")
-ARG_LIMIT = Arg(("--limit",), help="Return a limited number of records")
-
-# next_execution
-ARG_NUM_EXECUTIONS = Arg(
- ("-n", "--num-executions"),
- default=1,
- type=positive_int(allow_zero=False),
- help="The number of next execution datetimes to show",
-)
-
-# backfill
-ARG_MARK_SUCCESS = Arg(
- ("-m", "--mark-success"), help="Mark jobs as succeeded without running
them", action="store_true"
-)
-ARG_VERBOSE = Arg(("-v", "--verbose"), help="Make logging output more
verbose", action="store_true")
-ARG_LOCAL = Arg(("-l", "--local"), help="Run the task using the
LocalExecutor", action="store_true")
-ARG_DONOT_PICKLE = Arg(
- ("-x", "--donot-pickle"),
- help=(
- "Do not attempt to pickle the DAG object to send over "
- "to the workers, just tell the workers to run their version "
- "of the code"
- ),
- action="store_true",
-)
-ARG_BF_IGNORE_DEPENDENCIES = Arg(
- ("-i", "--ignore-dependencies"),
- help=(
- "Skip upstream tasks, run only the tasks "
- "matching the regexp. Only works in conjunction "
- "with task_regex"
- ),
- action="store_true",
-)
-ARG_BF_IGNORE_FIRST_DEPENDS_ON_PAST = Arg(
- ("-I", "--ignore-first-depends-on-past"),
- help=(
- "Ignores depends_on_past dependencies for the first "
- "set of tasks only (subsequent executions in the backfill "
- "DO respect depends_on_past)"
- ),
- action="store_true",
-)
-ARG_POOL = Arg(("--pool",), "Resource pool to use")
-ARG_DELAY_ON_LIMIT = Arg(
- ("--delay-on-limit",),
- help=(
- "Amount of time in seconds to wait when the limit "
- "on maximum active dag runs (max_active_runs) has "
- "been reached before trying to execute a dag run "
- "again"
- ),
- type=float,
- default=1.0,
-)
-ARG_RESET_DAG_RUN = Arg(
- ("--reset-dagruns",),
- help=(
- "if set, the backfill will delete existing "
- "backfill-related DAG runs and start "
- "anew with fresh, running DAG runs"
- ),
- action="store_true",
-)
-ARG_RERUN_FAILED_TASKS = Arg(
- ("--rerun-failed-tasks",),
- help=(
- "if set, the backfill will auto-rerun "
- "all the failed tasks for the backfill date range "
- "instead of throwing exceptions"
- ),
- action="store_true",
-)
-ARG_CONTINUE_ON_FAILURES = Arg(
- ("--continue-on-failures",),
- help=("if set, the backfill will keep going even if some of the tasks
failed"),
- action="store_true",
-)
-ARG_DISABLE_RETRY = Arg(
- ("--disable-retry",),
- help=("if set, the backfill will set tasks as failed without retrying."),
- action="store_true",
-)
-ARG_RUN_BACKWARDS = Arg(
- (
- "-B",
- "--run-backwards",
- ),
- help=(
- "if set, the backfill will run tasks from the most "
- "recent day first. if there are tasks that depend_on_past "
- "this option will throw an exception"
- ),
- action="store_true",
-)
-ARG_TREAT_DAG_AS_REGEX = Arg(
- ("--treat-dag-as-regex",),
- help=("if set, dag_id will be treated as regex instead of an exact
string"),
- action="store_true",
-)
-# test_dag
-ARG_SHOW_DAGRUN = Arg(
- ("--show-dagrun",),
- help=(
- "After completing the backfill, shows the diagram for current DAG
Run.\n"
- "\n"
- "The diagram is in DOT language\n"
- ),
- action="store_true",
-)
-ARG_IMGCAT_DAGRUN = Arg(
- ("--imgcat-dagrun",),
- help=(
- "After completing the dag run, prints a diagram on the screen for the "
- "current DAG Run using the imgcat tool.\n"
- ),
- action="store_true",
-)
-ARG_SAVE_DAGRUN = Arg(
- ("--save-dagrun",),
- help="After completing the backfill, saves the diagram for current DAG Run
to the indicated file.\n\n",
-)
-
-# list_tasks
-ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true")
-
-# tasks_run
-# This is a hidden option -- not meant for users to set or know about
-ARG_SHUT_DOWN_LOGGING = Arg(
- ("--no-shut-down-logging",),
- help=argparse.SUPPRESS,
- dest="shut_down_logging",
- action="store_false",
- default=True,
-)
-
-# clear
-ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks",
action="store_true")
-ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs",
action="store_true")
-ARG_ONLY_RUNNING = Arg(("-r", "--only-running"), help="Only running jobs",
action="store_true")
-ARG_DOWNSTREAM = Arg(("-d", "--downstream"), help="Include downstream tasks",
action="store_true")
-ARG_EXCLUDE_SUBDAGS = Arg(("-x", "--exclude-subdags"), help="Exclude subdags",
action="store_true")
-ARG_EXCLUDE_PARENTDAG = Arg(
- ("-X", "--exclude-parentdag"),
- help="Exclude ParentDAGS if the task cleared is a part of a SubDAG",
- action="store_true",
-)
-ARG_DAG_REGEX = Arg(
- ("-R", "--dag-regex"), help="Search dag_id as regex instead of exact
string", action="store_true"
-)
-
-# show_dag
-ARG_SAVE = Arg(("-s", "--save"), help="Saves the result to the indicated
file.")
-
-ARG_IMGCAT = Arg(("--imgcat",), help="Displays graph using the imgcat tool.",
action="store_true")
-
-# trigger_dag
-ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
-ARG_CONF = Arg(("-c", "--conf"), help="JSON string that gets pickled into the
DagRun's conf attribute")
-ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the
DAG", type=parsedate)
-ARG_REPLACE_MICRO = Arg(
- ("--no-replace-microseconds",),
- help="whether microseconds should be zeroed",
- dest="replace_microseconds",
- action="store_false",
- default=True,
-)
-
-# db
-ARG_DB_TABLES = Arg(
- ("-t", "--tables"),
- help=lazy_object_proxy.Proxy(
- lambda: f"Table names to perform maintenance on (use comma-separated
list).\n"
- f"Options:
{import_string('airflow.cli.commands.db_command.all_tables')}"
- ),
- type=string_list_type,
-)
-ARG_DB_CLEANUP_TIMESTAMP = Arg(
- ("--clean-before-timestamp",),
- help="The date or timestamp before which data should be purged.\n"
- "If no timezone info is supplied then dates are assumed to be in airflow
default timezone.\n"
- "Example: '2022-01-01 00:00:00+01:00'",
- type=parsedate,
- required=True,
-)
-ARG_DB_DRY_RUN = Arg(
- ("--dry-run",),
- help="Perform a dry run",
- action="store_true",
-)
-ARG_DB_SKIP_ARCHIVE = Arg(
- ("--skip-archive",),
- help="Don't preserve purged records in an archive table.",
- action="store_true",
-)
-ARG_DB_EXPORT_FORMAT = Arg(
- ("--export-format",),
- help="The file format to export the cleaned data",
- choices=("csv",),
- default="csv",
-)
-ARG_DB_OUTPUT_PATH = Arg(
- ("--output-path",),
- metavar="DIRPATH",
- help="The path to the output directory to export the cleaned data. This
directory must exist.",
- required=True,
-)
-ARG_DB_DROP_ARCHIVES = Arg(
- ("--drop-archives",),
- help="Drop the archive tables after exporting. Use with caution.",
- action="store_true",
-)
-
-# pool
-ARG_POOL_NAME = Arg(("pool",), metavar="NAME", help="Pool name")
-ARG_POOL_SLOTS = Arg(("slots",), type=int, help="Pool slots")
-ARG_POOL_DESCRIPTION = Arg(("description",), help="Pool description")
-ARG_POOL_IMPORT = Arg(
- ("file",),
- metavar="FILEPATH",
- help="Import pools from JSON file. Example format::\n"
- + textwrap.indent(
- textwrap.dedent(
- """
- {
- "pool_1": {"slots": 5, "description": ""},
- "pool_2": {"slots": 10, "description": "test"}
- }"""
- ),
- " " * 4,
- ),
-)
-
-ARG_POOL_EXPORT = Arg(("file",), metavar="FILEPATH", help="Export all pools to
JSON file")
-
-# variables
-ARG_VAR = Arg(("key",), help="Variable key")
-ARG_VAR_VALUE = Arg(("value",), metavar="VALUE", help="Variable value")
-ARG_DEFAULT = Arg(
- ("-d", "--default"), metavar="VAL", default=None, help="Default value
returned if variable does not exist"
-)
-ARG_DESERIALIZE_JSON = Arg(("-j", "--json"), help="Deserialize JSON variable",
action="store_true")
-ARG_SERIALIZE_JSON = Arg(("-j", "--json"), help="Serialize JSON variable",
action="store_true")
-ARG_VAR_IMPORT = Arg(("file",), help="Import variables from JSON file")
-ARG_VAR_EXPORT = Arg(("file",), help="Export all variables to JSON file")
-
-# kerberos
-ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?")
-ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs="?",
default=conf.get("kerberos", "keytab"))
-# run
-ARG_INTERACTIVE = Arg(
- ("-N", "--interactive"),
- help="Do not capture standard output and error streams (useful for
interactive debugging)",
- action="store_true",
-)
-# TODO(aoen): "force" is a poor choice of name here since it implies it
overrides
-# all dependencies (not just past success), e.g. the ignore_depends_on_past
-# dependency. This flag should be deprecated and renamed to 'ignore_ti_state'
and
-# the "ignore_all_dependencies" command should be called the"force" command
-# instead.
-ARG_FORCE = Arg(
- ("-f", "--force"),
- help="Ignore previous task instance state, rerun regardless if task
already succeeded/failed",
- action="store_true",
-)
-ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
-ARG_IGNORE_ALL_DEPENDENCIES = Arg(
- ("-A", "--ignore-all-dependencies"),
- help="Ignores all non-critical dependencies, including ignore_ti_state and
ignore_task_deps",
- action="store_true",
-)
-# TODO(aoen): ignore_dependencies is a poor choice of name here because it is
too
-# vague (e.g. a task being in the appropriate state to be run is also a
dependency
-# but is not ignored by this flag), the name 'ignore_task_dependencies' is
-# slightly better (as it ignores all dependencies that are specific to the
task),
-# so deprecate the old command name and use this instead.
-ARG_IGNORE_DEPENDENCIES = Arg(
- ("-i", "--ignore-dependencies"),
- help="Ignore task-specific dependencies, e.g. upstream, depends_on_past,
and retry delay dependencies",
- action="store_true",
-)
-ARG_IGNORE_DEPENDS_ON_PAST = Arg(
- ("-I", "--ignore-depends-on-past"),
- help="Deprecated -- use `--depends-on-past ignore` instead. "
- "Ignore depends_on_past dependencies (but respect upstream dependencies)",
- action="store_true",
-)
-ARG_DEPENDS_ON_PAST = Arg(
- ("-d", "--depends-on-past"),
- help="Determine how Airflow should deal with past dependencies. The
default action is `check`, Airflow "
- "will check if the the past dependencies are met for the tasks having
`depends_on_past=True` before run "
- "them, if `ignore` is provided, the past dependencies will be ignored, if
`wait` is provided and "
- "`depends_on_past=True`, Airflow will wait the past dependencies until
they are met before running or "
- "skipping the task",
- choices={"check", "ignore", "wait"},
- default="check",
-)
-ARG_SHIP_DAG = Arg(
- ("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the
worker", action="store_true"
-)
-ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the
entire dag (used internally)")
-ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
-ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead
of airflow.cfg")
-ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task
index")
-
-
-# database
-ARG_MIGRATION_TIMEOUT = Arg(
- ("-t", "--migration-wait-timeout"),
- help="timeout to wait for db to migrate ",
- type=int,
- default=60,
-)
-ARG_DB_RESERIALIZE_DAGS = Arg(
- ("--no-reserialize-dags",),
- # Not intended for user, so dont show in help
- help=argparse.SUPPRESS,
- action="store_false",
- default=True,
- dest="reserialize_dags",
-)
-ARG_DB_VERSION__UPGRADE = Arg(
- ("-n", "--to-version"),
- help=(
- "(Optional) The airflow version to upgrade to. Note: must provide
either "
- "`--to-revision` or `--to-version`."
- ),
-)
-ARG_DB_REVISION__UPGRADE = Arg(
- ("-r", "--to-revision"),
- help="(Optional) If provided, only run migrations up to and including this
Alembic revision.",
-)
-ARG_DB_VERSION__DOWNGRADE = Arg(
- ("-n", "--to-version"),
- help="(Optional) If provided, only run migrations up to this version.",
-)
-ARG_DB_FROM_VERSION = Arg(
- ("--from-version",),
- help="(Optional) If generating sql, may supply a *from* version",
-)
-ARG_DB_REVISION__DOWNGRADE = Arg(
- ("-r", "--to-revision"),
- help="The Alembic revision to downgrade to. Note: must provide either
`--to-revision` or `--to-version`.",
-)
-ARG_DB_FROM_REVISION = Arg(
- ("--from-revision",),
- help="(Optional) If generating sql, may supply a *from* Alembic revision",
-)
-ARG_DB_SQL_ONLY = Arg(
- ("-s", "--show-sql-only"),
- help="Don't actually run migrations; just print out sql scripts for
offline migration. "
- "Required if using either `--from-revision` or `--from-version`.",
- action="store_true",
- default=False,
-)
-ARG_DB_SKIP_INIT = Arg(
- ("-s", "--skip-init"),
- help="Only remove tables; do not perform db init.",
- action="store_true",
- default=False,
-)
-
-# webserver
-ARG_PORT = Arg(
- ("-p", "--port"),
- default=conf.get("webserver", "WEB_SERVER_PORT"),
- type=int,
- help="The port on which to run the server",
-)
-ARG_SSL_CERT = Arg(
- ("--ssl-cert",),
- default=conf.get("webserver", "WEB_SERVER_SSL_CERT"),
- help="Path to the SSL certificate for the webserver",
-)
-ARG_SSL_KEY = Arg(
- ("--ssl-key",),
- default=conf.get("webserver", "WEB_SERVER_SSL_KEY"),
- help="Path to the key to use with the SSL certificate",
-)
-ARG_WORKERS = Arg(
- ("-w", "--workers"),
- default=conf.get("webserver", "WORKERS"),
- type=int,
- help="Number of workers to run the webserver on",
-)
-ARG_WORKERCLASS = Arg(
- ("-k", "--workerclass"),
- default=conf.get("webserver", "WORKER_CLASS"),
- choices=["sync", "eventlet", "gevent", "tornado"],
- help="The worker class to use for Gunicorn",
-)
-ARG_WORKER_TIMEOUT = Arg(
- ("-t", "--worker-timeout"),
- default=conf.get("webserver", "WEB_SERVER_WORKER_TIMEOUT"),
- type=int,
- help="The timeout for waiting on webserver workers",
-)
-ARG_HOSTNAME = Arg(
- ("-H", "--hostname"),
- default=conf.get("webserver", "WEB_SERVER_HOST"),
- help="Set the hostname on which to run the web server",
-)
-ARG_DEBUG = Arg(
- ("-d", "--debug"), help="Use the server that ships with Flask in debug
mode", action="store_true"
-)
-ARG_ACCESS_LOGFILE = Arg(
- ("-A", "--access-logfile"),
- default=conf.get("webserver", "ACCESS_LOGFILE"),
- help="The logfile to store the webserver access log. Use '-' to print to
stdout",
-)
-ARG_ERROR_LOGFILE = Arg(
- ("-E", "--error-logfile"),
- default=conf.get("webserver", "ERROR_LOGFILE"),
- help="The logfile to store the webserver error log. Use '-' to print to
stderr",
-)
-ARG_ACCESS_LOGFORMAT = Arg(
- ("-L", "--access-logformat"),
- default=conf.get("webserver", "ACCESS_LOGFORMAT"),
- help="The access log format for gunicorn logs",
-)
-
-
-# internal-api
-ARG_INTERNAL_API_PORT = Arg(
- ("-p", "--port"),
- default=9080,
- type=int,
- help="The port on which to run the server",
-)
-ARG_INTERNAL_API_WORKERS = Arg(
- ("-w", "--workers"),
- default=4,
- type=int,
- help="Number of workers to run the Internal API-on",
-)
-ARG_INTERNAL_API_WORKERCLASS = Arg(
- ("-k", "--workerclass"),
- default="sync",
- choices=["sync", "eventlet", "gevent", "tornado"],
- help="The worker class to use for Gunicorn",
-)
-ARG_INTERNAL_API_WORKER_TIMEOUT = Arg(
- ("-t", "--worker-timeout"),
- default=120,
- type=int,
- help="The timeout for waiting on Internal API workers",
-)
-ARG_INTERNAL_API_HOSTNAME = Arg(
- ("-H", "--hostname"),
- default="0.0.0.0",
- help="Set the hostname on which to run the web server",
-)
-ARG_INTERNAL_API_ACCESS_LOGFILE = Arg(
- ("-A", "--access-logfile"),
- help="The logfile to store the access log. Use '-' to print to stdout",
-)
-ARG_INTERNAL_API_ERROR_LOGFILE = Arg(
- ("-E", "--error-logfile"),
- help="The logfile to store the error log. Use '-' to print to stderr",
-)
-ARG_INTERNAL_API_ACCESS_LOGFORMAT = Arg(
- ("-L", "--access-logformat"),
- help="The access log format for gunicorn logs",
-)
-# scheduler
-ARG_NUM_RUNS = Arg(
- ("-n", "--num-runs"),
- default=conf.getint("scheduler", "num_runs"),
- type=int,
- help="Set the number of runs to execute before exiting",
-)
-ARG_DO_PICKLE = Arg(
- ("-p", "--do-pickle"),
- default=False,
- help=(
- "Attempt to pickle the DAG object to send over "
- "to the workers, instead of letting workers run their version "
- "of the code"
- ),
- action="store_true",
-)
+airflow_commands = core_commands
-# worker
-ARG_QUEUES = Arg(
- ("-q", "--queues"),
- help="Comma delimited list of queues to serve",
- default=conf.get("operators", "DEFAULT_QUEUE"),
-)
-ARG_CONCURRENCY = Arg(
- ("-c", "--concurrency"),
- type=int,
- help="The number of worker processes",
- default=conf.get("celery", "worker_concurrency"),
-)
-ARG_CELERY_HOSTNAME = Arg(
- ("-H", "--celery-hostname"),
- help="Set the hostname of celery worker if you have multiple workers on a
single machine",
-)
-ARG_UMASK = Arg(
- ("-u", "--umask"),
- help="Set the umask of celery worker in daemon mode",
-)
-ARG_WITHOUT_MINGLE = Arg(
- ("--without-mingle",),
- default=False,
- help="Don't synchronize with other workers at start-up",
- action="store_true",
-)
-ARG_WITHOUT_GOSSIP = Arg(
- ("--without-gossip",),
- default=False,
- help="Don't subscribe to other workers events",
- action="store_true",
-)
-
-# flower
-ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
-ARG_FLOWER_HOSTNAME = Arg(
- ("-H", "--hostname"),
- default=conf.get("celery", "FLOWER_HOST"),
- help="Set the hostname on which to run the server",
-)
-ARG_FLOWER_PORT = Arg(
- ("-p", "--port"),
- default=conf.get("celery", "FLOWER_PORT"),
- type=int,
- help="The port on which to run the server",
-)
-ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for
flower")
-ARG_FLOWER_URL_PREFIX = Arg(
- ("-u", "--url-prefix"), default=conf.get("celery", "FLOWER_URL_PREFIX"),
help="URL prefix for Flower"
-)
-ARG_FLOWER_BASIC_AUTH = Arg(
- ("-A", "--basic-auth"),
- default=conf.get("celery", "FLOWER_BASIC_AUTH"),
- help=(
- "Securing Flower with Basic Authentication. "
- "Accepts user:password pairs separated by a comma. "
- "Example: flower_basic_auth = user1:password1,user2:password2"
- ),
-)
-ARG_TASK_PARAMS = Arg(("-t", "--task-params"), help="Sends a JSON params dict
to the task")
-ARG_POST_MORTEM = Arg(
- ("-m", "--post-mortem"), action="store_true", help="Open debugger on
uncaught exception"
-)
-ARG_ENV_VARS = Arg(
- ("--env-vars",),
- help="Set env var in both parsing time and runtime for each of entry
supplied in a JSON dict",
- type=json.loads,
-)
-
-# connections
-ARG_CONN_ID = Arg(("conn_id",), help="Connection ID, required to
get/add/delete/test a connection", type=str)
-ARG_CONN_ID_FILTER = Arg(
- ("--conn-id",), help="If passed, only items with the specified connection
ID will be displayed", type=str
-)
-ARG_CONN_URI = Arg(
- ("--conn-uri",), help="Connection URI, required to add a connection
without conn_type", type=str
-)
-ARG_CONN_JSON = Arg(
- ("--conn-json",), help="Connection JSON, required to add a connection
using JSON representation", type=str
-)
-ARG_CONN_TYPE = Arg(
- ("--conn-type",), help="Connection type, required to add a connection
without conn_uri", type=str
-)
-ARG_CONN_DESCRIPTION = Arg(
- ("--conn-description",), help="Connection description, optional when
adding a connection", type=str
-)
-ARG_CONN_HOST = Arg(("--conn-host",), help="Connection host, optional when
adding a connection", type=str)
-ARG_CONN_LOGIN = Arg(("--conn-login",), help="Connection login, optional when
adding a connection", type=str)
-ARG_CONN_PASSWORD = Arg(
- ("--conn-password",), help="Connection password, optional when adding a
connection", type=str
-)
-ARG_CONN_SCHEMA = Arg(
- ("--conn-schema",), help="Connection schema, optional when adding a
connection", type=str
-)
-ARG_CONN_PORT = Arg(("--conn-port",), help="Connection port, optional when
adding a connection", type=str)
-ARG_CONN_EXTRA = Arg(
- ("--conn-extra",), help="Connection `Extra` field, optional when adding a
connection", type=str
-)
-ARG_CONN_EXPORT = Arg(
- ("file",),
- help="Output file path for exporting the connections",
- type=argparse.FileType("w", encoding="UTF-8"),
-)
-ARG_CONN_EXPORT_FORMAT = Arg(
- ("--format",),
- help="Deprecated -- use `--file-format` instead. File format to use for
the export.",
- type=str,
- choices=["json", "yaml", "env"],
-)
-ARG_CONN_EXPORT_FILE_FORMAT = Arg(
- ("--file-format",), help="File format for the export", type=str,
choices=["json", "yaml", "env"]
-)
-ARG_CONN_SERIALIZATION_FORMAT = Arg(
- ("--serialization-format",),
- help="When exporting as `.env` format, defines how connections should be
serialized. Default is `uri`.",
- type=string_lower_type,
- choices=["json", "uri"],
-)
-ARG_CONN_IMPORT = Arg(("file",), help="Import connections from a file")
-ARG_CONN_OVERWRITE = Arg(
- ("--overwrite",),
- help="Overwrite existing entries if a conflict occurs",
- required=False,
- action="store_true",
-)
-
-# providers
-ARG_PROVIDER_NAME = Arg(
- ("provider_name",), help="Provider name, required to get provider
information", type=str
-)
-ARG_FULL = Arg(
- ("-f", "--full"),
- help="Full information about the provider, including documentation
information.",
- required=False,
- action="store_true",
-)
-
-# users
-ARG_USERNAME = Arg(("-u", "--username"), help="Username of the user",
required=True, type=str)
-ARG_USERNAME_OPTIONAL = Arg(("-u", "--username"), help="Username of the user",
type=str)
-ARG_FIRSTNAME = Arg(("-f", "--firstname"), help="First name of the user",
required=True, type=str)
-ARG_LASTNAME = Arg(("-l", "--lastname"), help="Last name of the user",
required=True, type=str)
-ARG_ROLE = Arg(
- ("-r", "--role"),
- help="Role of the user. Existing roles include Admin, User, Op, Viewer,
and Public",
- required=True,
- type=str,
-)
-ARG_EMAIL = Arg(("-e", "--email"), help="Email of the user", required=True,
type=str)
-ARG_EMAIL_OPTIONAL = Arg(("-e", "--email"), help="Email of the user", type=str)
-ARG_PASSWORD = Arg(
- ("-p", "--password"),
- help="Password of the user, required to create a user without
--use-random-password",
- type=str,
-)
-ARG_USE_RANDOM_PASSWORD = Arg(
- ("--use-random-password",),
- help="Do not prompt for password. Use random string instead."
- " Required to create a user without --password ",
- default=False,
- action="store_true",
-)
-ARG_USER_IMPORT = Arg(
- ("import",),
- metavar="FILEPATH",
- help="Import users from JSON file. Example format::\n"
- + textwrap.indent(
- textwrap.dedent(
- """
- [
- {
- "email": "[email protected]",
- "firstname": "Jon",
- "lastname": "Doe",
- "roles": ["Public"],
- "username": "jondoe"
- }
- ]"""
- ),
- " " * 4,
- ),
-)
-ARG_USER_EXPORT = Arg(("export",), metavar="FILEPATH", help="Export all users
to JSON file")
-
-# roles
-ARG_CREATE_ROLE = Arg(("-c", "--create"), help="Create a new role",
action="store_true")
-ARG_LIST_ROLES = Arg(("-l", "--list"), help="List roles", action="store_true")
-ARG_ROLES = Arg(("role",), help="The name of a role", nargs="*")
-ARG_PERMISSIONS = Arg(("-p", "--permission"), help="Show role permissions",
action="store_true")
-ARG_ROLE_RESOURCE = Arg(("-r", "--resource"), help="The name of permissions",
nargs="*", required=True)
-ARG_ROLE_ACTION = Arg(("-a", "--action"), help="The action of permissions",
nargs="*")
-ARG_ROLE_ACTION_REQUIRED = Arg(("-a", "--action"), help="The action of
permissions", nargs="*", required=True)
-ARG_AUTOSCALE = Arg(("-a", "--autoscale"), help="Minimum and Maximum number of
worker to autoscale")
-ARG_SKIP_SERVE_LOGS = Arg(
- ("-s", "--skip-serve-logs"),
- default=False,
- help="Don't start the serve logs process along with the workers",
- action="store_true",
-)
-ARG_ROLE_IMPORT = Arg(("file",), help="Import roles from JSON file",
nargs=None)
-ARG_ROLE_EXPORT = Arg(("file",), help="Export all roles to JSON file",
nargs=None)
-ARG_ROLE_EXPORT_FMT = Arg(
- ("-p", "--pretty"),
- help="Format output JSON file by sorting role names and indenting by 4
spaces",
- action="store_true",
-)
-
-# info
-ARG_ANONYMIZE = Arg(
- ("--anonymize",),
- help="Minimize any personal identifiable information. Use it when sharing
output with others.",
- action="store_true",
-)
-ARG_FILE_IO = Arg(
- ("--file-io",), help="Send output to file.io service and returns link.",
action="store_true"
-)
-
-# config
-ARG_SECTION = Arg(
- ("section",),
- help="The section name",
-)
-ARG_OPTION = Arg(
- ("option",),
- help="The option name",
-)
-ARG_OPTIONAL_SECTION = Arg(
- ("--section",),
- help="The section name",
-)
-
-# kubernetes cleanup-pods
-ARG_NAMESPACE = Arg(
- ("--namespace",),
- default=conf.get("kubernetes_executor", "namespace"),
- help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in
configuration.",
-)
-
-ARG_MIN_PENDING_MINUTES = Arg(
- ("--min-pending-minutes",),
- default=30,
- type=positive_int(allow_zero=False),
- help=(
- "Pending pods created before the time interval are to be cleaned up, "
- "measured in minutes. Default value is 30(m). The minimum value is
5(m)."
- ),
-)
-
-# jobs check
-ARG_JOB_TYPE_FILTER = Arg(
- ("--job-type",),
- choices=("BackfillJob", "LocalTaskJob", "SchedulerJob", "TriggererJob"),
- action="store",
- help="The type of job(s) that will be checked.",
-)
-
-ARG_JOB_HOSTNAME_FILTER = Arg(
- ("--hostname",),
- default=None,
- type=str,
- help="The hostname of job(s) that will be checked.",
-)
-
-ARG_JOB_HOSTNAME_CALLABLE_FILTER = Arg(
- ("--local",),
- action="store_true",
- help="If passed, this command will only show jobs from the local host "
- "(those with a hostname matching what `hostname_callable` returns).",
-)
-
-ARG_JOB_LIMIT = Arg(
- ("--limit",),
- default=1,
- type=positive_int(allow_zero=True),
- help="The number of recent jobs that will be checked. To disable limit,
set 0. ",
-)
-
-ARG_ALLOW_MULTIPLE = Arg(
- ("--allow-multiple",),
- action="store_true",
- help="If passed, this command will be successful even if multiple matching
alive jobs are found.",
-)
-
-# sync-perm
-ARG_INCLUDE_DAGS = Arg(
- ("--include-dags",), help="If passed, DAG specific permissions will also
be synced.", action="store_true"
-)
-
-# triggerer
-ARG_CAPACITY = Arg(
- ("--capacity",),
- type=positive_int(allow_zero=False),
- help="The maximum number of triggers that a Triggerer will run at one
time.",
-)
-
-# reserialize
-ARG_CLEAR_ONLY = Arg(
- ("--clear-only",),
- action="store_true",
- help="If passed, serialized DAGs will be cleared but not reserialized.",
-)
-
-ALTERNATIVE_CONN_SPECS_ARGS = [
- ARG_CONN_TYPE,
- ARG_CONN_DESCRIPTION,
- ARG_CONN_HOST,
- ARG_CONN_LOGIN,
- ARG_CONN_PASSWORD,
- ARG_CONN_SCHEMA,
- ARG_CONN_PORT,
-]
-
-
-class ActionCommand(NamedTuple):
- """Single CLI command."""
-
- name: str
- help: str
- func: Callable
- args: Iterable[Arg]
- description: str | None = None
- epilog: str | None = None
-
-
-class GroupCommand(NamedTuple):
- """ClI command with subcommands."""
-
- name: str
- help: str
- subcommands: Iterable
- description: str | None = None
- epilog: str | None = None
-
-
-CLICommand = Union[ActionCommand, GroupCommand]
-
-DAGS_COMMANDS = (
- ActionCommand(
- name="list",
- help="List all the DAGs",
-
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_dags"),
- args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="list-import-errors",
- help="List all the DAGs that have import errors",
-
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_import_errors"),
- args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="report",
- help="Show DagBag loading report",
- func=lazy_load_command("airflow.cli.commands.dag_command.dag_report"),
- args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="list-runs",
- help="List DAG runs given a DAG id",
- description=(
- "List DAG runs given a DAG id. If state option is given, it will
only search for all the "
- "dagruns with the given state. If no_backfill option is given, it
will filter out all "
- "backfill dagruns for given dag id. If start_date is given, it
will filter out all the "
- "dagruns that were executed before this date. If end_date is
given, it will filter out "
- "all the dagruns that were executed after this date. "
- ),
-
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_dag_runs"),
- args=(
- ARG_DAG_ID_REQ_FLAG,
- ARG_NO_BACKFILL,
- ARG_STATE,
- ARG_OUTPUT,
- ARG_VERBOSE,
- ARG_START_DATE,
- ARG_END_DATE,
- ),
- ),
- ActionCommand(
- name="list-jobs",
- help="List the jobs",
-
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_jobs"),
- args=(ARG_DAG_ID_OPT, ARG_STATE, ARG_LIMIT, ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="state",
- help="Get the status of a dag run",
- func=lazy_load_command("airflow.cli.commands.dag_command.dag_state"),
- args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_VERBOSE),
- ),
- ActionCommand(
- name="next-execution",
- help="Get the next execution datetimes of a DAG",
- description=(
- "Get the next execution datetimes of a DAG. It returns one
execution unless the "
- "num-executions option is given"
- ),
-
func=lazy_load_command("airflow.cli.commands.dag_command.dag_next_execution"),
- args=(ARG_DAG_ID, ARG_SUBDIR, ARG_NUM_EXECUTIONS, ARG_VERBOSE),
- ),
- ActionCommand(
- name="pause",
- help="Pause a DAG",
- func=lazy_load_command("airflow.cli.commands.dag_command.dag_pause"),
- args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
- ),
- ActionCommand(
- name="unpause",
- help="Resume a paused DAG",
- func=lazy_load_command("airflow.cli.commands.dag_command.dag_unpause"),
- args=(ARG_DAG_ID, ARG_SUBDIR, ARG_VERBOSE),
- ),
- ActionCommand(
- name="trigger",
- help="Trigger a DAG run",
- func=lazy_load_command("airflow.cli.commands.dag_command.dag_trigger"),
- args=(
- ARG_DAG_ID,
- ARG_SUBDIR,
- ARG_RUN_ID,
- ARG_CONF,
- ARG_EXEC_DATE,
- ARG_VERBOSE,
- ARG_REPLACE_MICRO,
- ARG_OUTPUT,
- ),
- ),
- ActionCommand(
- name="delete",
- help="Delete all DB records related to the specified DAG",
- func=lazy_load_command("airflow.cli.commands.dag_command.dag_delete"),
- args=(ARG_DAG_ID, ARG_YES, ARG_VERBOSE),
- ),
- ActionCommand(
- name="show",
- help="Displays DAG's tasks with their dependencies",
- description=(
- "The --imgcat option only works in iTerm.\n"
- "\n"
- "For more information, see:
https://www.iterm2.com/documentation-images.html\n"
- "\n"
- "The --save option saves the result to the indicated file.\n"
- "\n"
- "The file format is determined by the file extension. "
- "For more information about supported "
- "format, see: https://www.graphviz.org/doc/info/output.html\n"
- "\n"
- "If you want to create a PNG file then you should execute the
following command:\n"
- "airflow dags show <DAG_ID> --save output.png\n"
- "\n"
- "If you want to create a DOT file then you should execute the
following command:\n"
- "airflow dags show <DAG_ID> --save output.dot\n"
- ),
- func=lazy_load_command("airflow.cli.commands.dag_command.dag_show"),
- args=(
- ARG_DAG_ID,
- ARG_SUBDIR,
- ARG_SAVE,
- ARG_IMGCAT,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="show-dependencies",
- help="Displays DAGs with their dependencies",
- description=(
- "The --imgcat option only works in iTerm.\n"
- "\n"
- "For more information, see:
https://www.iterm2.com/documentation-images.html\n"
- "\n"
- "The --save option saves the result to the indicated file.\n"
- "\n"
- "The file format is determined by the file extension. "
- "For more information about supported "
- "format, see: https://www.graphviz.org/doc/info/output.html\n"
- "\n"
- "If you want to create a PNG file then you should execute the
following command:\n"
- "airflow dags show-dependencies --save output.png\n"
- "\n"
- "If you want to create a DOT file then you should execute the
following command:\n"
- "airflow dags show-dependencies --save output.dot\n"
- ),
-
func=lazy_load_command("airflow.cli.commands.dag_command.dag_dependencies_show"),
- args=(
- ARG_SUBDIR,
- ARG_SAVE,
- ARG_IMGCAT,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="backfill",
- help="Run subsections of a DAG for a specified date range",
- description=(
- "Run subsections of a DAG for a specified date range. If
reset_dag_run option is used, "
- "backfill will first prompt users whether airflow should clear all
the previous dag_run and "
- "task_instances within the backfill date range. If
rerun_failed_tasks is used, backfill "
- "will auto re-run the previous failed task instances within the
backfill date range"
- ),
-
func=lazy_load_command("airflow.cli.commands.dag_command.dag_backfill"),
- args=(
- ARG_DAG_ID,
- ARG_TASK_REGEX,
- ARG_START_DATE,
- ARG_END_DATE,
- ARG_MARK_SUCCESS,
- ARG_LOCAL,
- ARG_DONOT_PICKLE,
- ARG_YES,
- ARG_CONTINUE_ON_FAILURES,
- ARG_DISABLE_RETRY,
- ARG_BF_IGNORE_DEPENDENCIES,
- ARG_BF_IGNORE_FIRST_DEPENDS_ON_PAST,
- ARG_SUBDIR,
- ARG_POOL,
- ARG_DELAY_ON_LIMIT,
- ARG_DRY_RUN,
- ARG_VERBOSE,
- ARG_CONF,
- ARG_RESET_DAG_RUN,
- ARG_RERUN_FAILED_TASKS,
- ARG_RUN_BACKWARDS,
- ARG_TREAT_DAG_AS_REGEX,
- ),
- ),
- ActionCommand(
- name="test",
- help="Execute one single DagRun",
- description=(
- "Execute one single DagRun for a given DAG and execution date.\n"
- "\n"
- "The --imgcat-dagrun option only works in iTerm.\n"
- "\n"
- "For more information, see:
https://www.iterm2.com/documentation-images.html\n"
- "\n"
- "If --save-dagrun is used, then, after completing the backfill,
saves the diagram "
- "for current DAG Run to the indicated file.\n"
- "The file format is determined by the file extension. "
- "For more information about supported format, "
- "see: https://www.graphviz.org/doc/info/output.html\n"
- "\n"
- "If you want to create a PNG file then you should execute the
following command:\n"
- "airflow dags test <DAG_ID> <EXECUTION_DATE> --save-dagrun
output.png\n"
- "\n"
- "If you want to create a DOT file then you should execute the
following command:\n"
- "airflow dags test <DAG_ID> <EXECUTION_DATE> --save-dagrun
output.dot\n"
- ),
- func=lazy_load_command("airflow.cli.commands.dag_command.dag_test"),
- args=(
- ARG_DAG_ID,
- ARG_EXECUTION_DATE_OPTIONAL,
- ARG_CONF,
- ARG_SUBDIR,
- ARG_SHOW_DAGRUN,
- ARG_IMGCAT_DAGRUN,
- ARG_SAVE_DAGRUN,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="reserialize",
- help="Reserialize all DAGs by parsing the DagBag files",
- description=(
- "Drop all serialized dags from the metadata DB. This will cause
all DAGs to be reserialized "
- "from the DagBag folder. This can be helpful if your serialized
DAGs get out of sync with the "
- "version of Airflow that you are running."
- ),
-
func=lazy_load_command("airflow.cli.commands.dag_command.dag_reserialize"),
- args=(
- ARG_CLEAR_ONLY,
- ARG_SUBDIR,
- ARG_VERBOSE,
- ),
- ),
-)
-TASKS_COMMANDS = (
- ActionCommand(
- name="list",
- help="List the tasks within a DAG",
- func=lazy_load_command("airflow.cli.commands.task_command.task_list"),
- args=(ARG_DAG_ID, ARG_TREE, ARG_SUBDIR, ARG_VERBOSE),
- ),
- ActionCommand(
- name="clear",
- help="Clear a set of task instance, as if they never ran",
- func=lazy_load_command("airflow.cli.commands.task_command.task_clear"),
- args=(
- ARG_DAG_ID,
- ARG_TASK_REGEX,
- ARG_START_DATE,
- ARG_END_DATE,
- ARG_SUBDIR,
- ARG_UPSTREAM,
- ARG_DOWNSTREAM,
- ARG_YES,
- ARG_ONLY_FAILED,
- ARG_ONLY_RUNNING,
- ARG_EXCLUDE_SUBDAGS,
- ARG_EXCLUDE_PARENTDAG,
- ARG_DAG_REGEX,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="state",
- help="Get the status of a task instance",
- func=lazy_load_command("airflow.cli.commands.task_command.task_state"),
- args=(
- ARG_DAG_ID,
- ARG_TASK_ID,
- ARG_EXECUTION_DATE_OR_RUN_ID,
- ARG_SUBDIR,
- ARG_VERBOSE,
- ARG_MAP_INDEX,
- ),
- ),
- ActionCommand(
- name="failed-deps",
- help="Returns the unmet dependencies for a task instance",
- description=(
- "Returns the unmet dependencies for a task instance from the
perspective of the scheduler. "
- "In other words, why a task instance doesn't get scheduled and
then queued by the scheduler, "
- "and then run by an executor."
- ),
-
func=lazy_load_command("airflow.cli.commands.task_command.task_failed_deps"),
- args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE_OR_RUN_ID,
ARG_SUBDIR, ARG_MAP_INDEX, ARG_VERBOSE),
- ),
- ActionCommand(
- name="render",
- help="Render a task instance's template(s)",
-
func=lazy_load_command("airflow.cli.commands.task_command.task_render"),
- args=(
- ARG_DAG_ID,
- ARG_TASK_ID,
- ARG_EXECUTION_DATE_OR_RUN_ID,
- ARG_SUBDIR,
- ARG_VERBOSE,
- ARG_MAP_INDEX,
- ),
- ),
- ActionCommand(
- name="run",
- help="Run a single task instance",
- func=lazy_load_command("airflow.cli.commands.task_command.task_run"),
- args=(
- ARG_DAG_ID,
- ARG_TASK_ID,
- ARG_EXECUTION_DATE_OR_RUN_ID,
- ARG_SUBDIR,
- ARG_MARK_SUCCESS,
- ARG_FORCE,
- ARG_POOL,
- ARG_CFG_PATH,
- ARG_LOCAL,
- ARG_RAW,
- ARG_IGNORE_ALL_DEPENDENCIES,
- ARG_IGNORE_DEPENDENCIES,
- ARG_IGNORE_DEPENDS_ON_PAST,
- ARG_DEPENDS_ON_PAST,
- ARG_SHIP_DAG,
- ARG_PICKLE,
- ARG_JOB_ID,
- ARG_INTERACTIVE,
- ARG_SHUT_DOWN_LOGGING,
- ARG_MAP_INDEX,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="test",
- help="Test a task instance",
- description=(
- "Test a task instance. This will run a task without checking for
dependencies or recording "
- "its state in the database"
- ),
- func=lazy_load_command("airflow.cli.commands.task_command.task_test"),
- args=(
- ARG_DAG_ID,
- ARG_TASK_ID,
- ARG_EXECUTION_DATE_OR_RUN_ID_OPTIONAL,
- ARG_SUBDIR,
- ARG_DRY_RUN,
- ARG_TASK_PARAMS,
- ARG_POST_MORTEM,
- ARG_ENV_VARS,
- ARG_MAP_INDEX,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="states-for-dag-run",
- help="Get the status of all task instances in a dag run",
-
func=lazy_load_command("airflow.cli.commands.task_command.task_states_for_dag_run"),
- args=(ARG_DAG_ID, ARG_EXECUTION_DATE_OR_RUN_ID, ARG_OUTPUT,
ARG_VERBOSE),
- ),
-)
-POOLS_COMMANDS = (
- ActionCommand(
- name="list",
- help="List pools",
- func=lazy_load_command("airflow.cli.commands.pool_command.pool_list"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="get",
- help="Get pool size",
- func=lazy_load_command("airflow.cli.commands.pool_command.pool_get"),
- args=(ARG_POOL_NAME, ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="set",
- help="Configure pool",
- func=lazy_load_command("airflow.cli.commands.pool_command.pool_set"),
- args=(ARG_POOL_NAME, ARG_POOL_SLOTS, ARG_POOL_DESCRIPTION, ARG_OUTPUT,
ARG_VERBOSE),
- ),
- ActionCommand(
- name="delete",
- help="Delete pool",
-
func=lazy_load_command("airflow.cli.commands.pool_command.pool_delete"),
- args=(ARG_POOL_NAME, ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="import",
- help="Import pools",
-
func=lazy_load_command("airflow.cli.commands.pool_command.pool_import"),
- args=(ARG_POOL_IMPORT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="export",
- help="Export all pools",
-
func=lazy_load_command("airflow.cli.commands.pool_command.pool_export"),
- args=(ARG_POOL_EXPORT, ARG_VERBOSE),
- ),
-)
-VARIABLES_COMMANDS = (
- ActionCommand(
- name="list",
- help="List variables",
-
func=lazy_load_command("airflow.cli.commands.variable_command.variables_list"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="get",
- help="Get variable",
-
func=lazy_load_command("airflow.cli.commands.variable_command.variables_get"),
- args=(ARG_VAR, ARG_DESERIALIZE_JSON, ARG_DEFAULT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="set",
- help="Set variable",
-
func=lazy_load_command("airflow.cli.commands.variable_command.variables_set"),
- args=(ARG_VAR, ARG_VAR_VALUE, ARG_SERIALIZE_JSON, ARG_VERBOSE),
- ),
- ActionCommand(
- name="delete",
- help="Delete variable",
-
func=lazy_load_command("airflow.cli.commands.variable_command.variables_delete"),
- args=(ARG_VAR, ARG_VERBOSE),
- ),
- ActionCommand(
- name="import",
- help="Import variables",
-
func=lazy_load_command("airflow.cli.commands.variable_command.variables_import"),
- args=(ARG_VAR_IMPORT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="export",
- help="Export all variables",
-
func=lazy_load_command("airflow.cli.commands.variable_command.variables_export"),
- args=(ARG_VAR_EXPORT, ARG_VERBOSE),
- ),
-)
-DB_COMMANDS = (
- ActionCommand(
- name="init",
- help="Initialize the metadata database",
- func=lazy_load_command("airflow.cli.commands.db_command.initdb"),
- args=(ARG_VERBOSE,),
- ),
- ActionCommand(
- name="check-migrations",
- help="Check if migration have finished",
- description="Check if migration have finished (or continually check
until timeout)",
-
func=lazy_load_command("airflow.cli.commands.db_command.check_migrations"),
- args=(ARG_MIGRATION_TIMEOUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="reset",
- help="Burn down and rebuild the metadata database",
- func=lazy_load_command("airflow.cli.commands.db_command.resetdb"),
- args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="upgrade",
- help="Upgrade the metadata database to latest version",
- description=(
- "Upgrade the schema of the metadata database. "
- "To print but not execute commands, use option
``--show-sql-only``. "
- "If using options ``--from-revision`` or ``--from-version``, you
must also use "
- "``--show-sql-only``, because if actually *running* migrations, we
should only "
- "migrate from the *current* Alembic revision."
- ),
- func=lazy_load_command("airflow.cli.commands.db_command.upgradedb"),
- args=(
- ARG_DB_REVISION__UPGRADE,
- ARG_DB_VERSION__UPGRADE,
- ARG_DB_SQL_ONLY,
- ARG_DB_FROM_REVISION,
- ARG_DB_FROM_VERSION,
- ARG_DB_RESERIALIZE_DAGS,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="downgrade",
- help="Downgrade the schema of the metadata database.",
- description=(
- "Downgrade the schema of the metadata database. "
- "You must provide either `--to-revision` or `--to-version`. "
- "To print but not execute commands, use option `--show-sql-only`. "
- "If using options `--from-revision` or `--from-version`, you must
also use `--show-sql-only`, "
- "because if actually *running* migrations, we should only migrate
from the *current* Alembic "
- "revision."
- ),
- func=lazy_load_command("airflow.cli.commands.db_command.downgrade"),
- args=(
- ARG_DB_REVISION__DOWNGRADE,
- ARG_DB_VERSION__DOWNGRADE,
- ARG_DB_SQL_ONLY,
- ARG_YES,
- ARG_DB_FROM_REVISION,
- ARG_DB_FROM_VERSION,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="shell",
- help="Runs a shell to access the database",
- func=lazy_load_command("airflow.cli.commands.db_command.shell"),
- args=(ARG_VERBOSE,),
- ),
- ActionCommand(
- name="check",
- help="Check if the database can be reached",
- func=lazy_load_command("airflow.cli.commands.db_command.check"),
- args=(ARG_VERBOSE,),
- ),
- ActionCommand(
- name="clean",
- help="Purge old records in metastore tables",
-
func=lazy_load_command("airflow.cli.commands.db_command.cleanup_tables"),
- args=(
- ARG_DB_TABLES,
- ARG_DB_DRY_RUN,
- ARG_DB_CLEANUP_TIMESTAMP,
- ARG_VERBOSE,
- ARG_YES,
- ARG_DB_SKIP_ARCHIVE,
- ),
- ),
- ActionCommand(
- name="export-archived",
- help="Export archived data from the archive tables",
-
func=lazy_load_command("airflow.cli.commands.db_command.export_archived"),
- args=(
- ARG_DB_EXPORT_FORMAT,
- ARG_DB_OUTPUT_PATH,
- ARG_DB_DROP_ARCHIVES,
- ARG_DB_TABLES,
- ARG_YES,
- ),
- ),
- ActionCommand(
- name="drop-archived",
- help="Drop archived tables created through the db clean command",
-
func=lazy_load_command("airflow.cli.commands.db_command.drop_archived"),
- args=(ARG_DB_TABLES, ARG_YES),
- ),
-)
-CONNECTIONS_COMMANDS = (
- ActionCommand(
- name="get",
- help="Get a connection",
-
func=lazy_load_command("airflow.cli.commands.connection_command.connections_get"),
- args=(ARG_CONN_ID, ARG_COLOR, ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="list",
- help="List connections",
-
func=lazy_load_command("airflow.cli.commands.connection_command.connections_list"),
- args=(ARG_OUTPUT, ARG_VERBOSE, ARG_CONN_ID_FILTER),
- ),
- ActionCommand(
- name="add",
- help="Add a connection",
-
func=lazy_load_command("airflow.cli.commands.connection_command.connections_add"),
- args=(ARG_CONN_ID, ARG_CONN_URI, ARG_CONN_JSON, ARG_CONN_EXTRA) +
tuple(ALTERNATIVE_CONN_SPECS_ARGS),
- ),
- ActionCommand(
- name="delete",
- help="Delete a connection",
-
func=lazy_load_command("airflow.cli.commands.connection_command.connections_delete"),
- args=(ARG_CONN_ID, ARG_COLOR, ARG_VERBOSE),
- ),
- ActionCommand(
- name="export",
- help="Export all connections",
- description=(
- "All connections can be exported in STDOUT using the following
command:\n"
- "airflow connections export -\n"
- "The file format can be determined by the provided file extension.
E.g., The following "
- "command will export the connections in JSON format:\n"
- "airflow connections export /tmp/connections.json\n"
- "The --file-format parameter can be used to control the file
format. E.g., "
- "the default format is JSON in STDOUT mode, which can be
overridden using: \n"
- "airflow connections export - --file-format yaml\n"
- "The --file-format parameter can also be used for the files, for
example:\n"
- "airflow connections export /tmp/connections --file-format json.\n"
- "When exporting in `env` file format, you control whether URI
format or JSON format "
- "is used to serialize the connection by passing `uri` or `json`
with option "
- "`--serialization-format`.\n"
- ),
-
func=lazy_load_command("airflow.cli.commands.connection_command.connections_export"),
- args=(
- ARG_CONN_EXPORT,
- ARG_CONN_EXPORT_FORMAT,
- ARG_CONN_EXPORT_FILE_FORMAT,
- ARG_CONN_SERIALIZATION_FORMAT,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="import",
- help="Import connections from a file",
- description=(
- "Connections can be imported from the output of the export
command.\n"
- "The filetype must by json, yaml or env and will be automatically
inferred."
- ),
-
func=lazy_load_command("airflow.cli.commands.connection_command.connections_import"),
- args=(
- ARG_CONN_IMPORT,
- ARG_CONN_OVERWRITE,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="test",
- help="Test a connection",
-
func=lazy_load_command("airflow.cli.commands.connection_command.connections_test"),
- args=(ARG_CONN_ID, ARG_VERBOSE),
- ),
-)
-PROVIDERS_COMMANDS = (
- ActionCommand(
- name="list",
- help="List installed providers",
-
func=lazy_load_command("airflow.cli.commands.provider_command.providers_list"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="get",
- help="Get detailed information about a provider",
-
func=lazy_load_command("airflow.cli.commands.provider_command.provider_get"),
- args=(ARG_OUTPUT, ARG_VERBOSE, ARG_FULL, ARG_COLOR, ARG_PROVIDER_NAME),
- ),
- ActionCommand(
- name="links",
- help="List extra links registered by the providers",
-
func=lazy_load_command("airflow.cli.commands.provider_command.extra_links_list"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="widgets",
- help="Get information about registered connection form widgets",
-
func=lazy_load_command("airflow.cli.commands.provider_command.connection_form_widget_list"),
- args=(
- ARG_OUTPUT,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="hooks",
- help="List registered provider hooks",
-
func=lazy_load_command("airflow.cli.commands.provider_command.hooks_list"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="behaviours",
- help="Get information about registered connection types with custom
behaviours",
-
func=lazy_load_command("airflow.cli.commands.provider_command.connection_field_behaviours"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="logging",
- help="Get information about task logging handlers provided",
-
func=lazy_load_command("airflow.cli.commands.provider_command.logging_list"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="secrets",
- help="Get information about secrets backends provided",
-
func=lazy_load_command("airflow.cli.commands.provider_command.secrets_backends_list"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="auth",
- help="Get information about API auth backends provided",
-
func=lazy_load_command("airflow.cli.commands.provider_command.auth_backend_list"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
-)
-
-USERS_COMMANDS = (
- ActionCommand(
- name="list",
- help="List users",
- func=lazy_load_command("airflow.cli.commands.user_command.users_list"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="create",
- help="Create a user",
-
func=lazy_load_command("airflow.cli.commands.user_command.users_create"),
- args=(
- ARG_ROLE,
- ARG_USERNAME,
- ARG_EMAIL,
- ARG_FIRSTNAME,
- ARG_LASTNAME,
- ARG_PASSWORD,
- ARG_USE_RANDOM_PASSWORD,
- ARG_VERBOSE,
- ),
- epilog=(
- "examples:\n"
- 'To create an user with "Admin" role and username equals to
"admin", run:\n'
- "\n"
- " $ airflow users create \\\n"
- " --username admin \\\n"
- " --firstname FIRST_NAME \\\n"
- " --lastname LAST_NAME \\\n"
- " --role Admin \\\n"
- " --email [email protected]"
- ),
- ),
- ActionCommand(
- name="delete",
- help="Delete a user",
-
func=lazy_load_command("airflow.cli.commands.user_command.users_delete"),
- args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_VERBOSE),
- ),
- ActionCommand(
- name="add-role",
- help="Add role to a user",
- func=lazy_load_command("airflow.cli.commands.user_command.add_role"),
- args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE,
ARG_VERBOSE),
- ),
- ActionCommand(
- name="remove-role",
- help="Remove role from a user",
-
func=lazy_load_command("airflow.cli.commands.user_command.remove_role"),
- args=(ARG_USERNAME_OPTIONAL, ARG_EMAIL_OPTIONAL, ARG_ROLE,
ARG_VERBOSE),
- ),
- ActionCommand(
- name="import",
- help="Import users",
-
func=lazy_load_command("airflow.cli.commands.user_command.users_import"),
- args=(ARG_USER_IMPORT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="export",
- help="Export all users",
-
func=lazy_load_command("airflow.cli.commands.user_command.users_export"),
- args=(ARG_USER_EXPORT, ARG_VERBOSE),
- ),
-)
-ROLES_COMMANDS = (
- ActionCommand(
- name="list",
- help="List roles",
- func=lazy_load_command("airflow.cli.commands.role_command.roles_list"),
- args=(ARG_PERMISSIONS, ARG_OUTPUT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="create",
- help="Create role",
-
func=lazy_load_command("airflow.cli.commands.role_command.roles_create"),
- args=(ARG_ROLES, ARG_VERBOSE),
- ),
- ActionCommand(
- name="delete",
- help="Delete role",
-
func=lazy_load_command("airflow.cli.commands.role_command.roles_delete"),
- args=(ARG_ROLES, ARG_VERBOSE),
- ),
- ActionCommand(
- name="add-perms",
- help="Add roles permissions",
-
func=lazy_load_command("airflow.cli.commands.role_command.roles_add_perms"),
- args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION_REQUIRED,
ARG_VERBOSE),
- ),
- ActionCommand(
- name="del-perms",
- help="Delete roles permissions",
-
func=lazy_load_command("airflow.cli.commands.role_command.roles_del_perms"),
- args=(ARG_ROLES, ARG_ROLE_RESOURCE, ARG_ROLE_ACTION, ARG_VERBOSE),
- ),
- ActionCommand(
- name="export",
- help="Export roles (without permissions) from db to JSON file",
-
func=lazy_load_command("airflow.cli.commands.role_command.roles_export"),
- args=(ARG_ROLE_EXPORT, ARG_ROLE_EXPORT_FMT, ARG_VERBOSE),
- ),
- ActionCommand(
- name="import",
- help="Import roles (without permissions) from JSON file to db",
-
func=lazy_load_command("airflow.cli.commands.role_command.roles_import"),
- args=(ARG_ROLE_IMPORT, ARG_VERBOSE),
- ),
-)
-
-CELERY_COMMANDS = (
- ActionCommand(
- name="worker",
- help="Start a Celery worker node",
- func=lazy_load_command("airflow.cli.commands.celery_command.worker"),
- args=(
- ARG_QUEUES,
- ARG_CONCURRENCY,
- ARG_CELERY_HOSTNAME,
- ARG_PID,
- ARG_DAEMON,
- ARG_UMASK,
- ARG_STDOUT,
- ARG_STDERR,
- ARG_LOG_FILE,
- ARG_AUTOSCALE,
- ARG_SKIP_SERVE_LOGS,
- ARG_WITHOUT_MINGLE,
- ARG_WITHOUT_GOSSIP,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="flower",
- help="Start a Celery Flower",
- func=lazy_load_command("airflow.cli.commands.celery_command.flower"),
- args=(
- ARG_FLOWER_HOSTNAME,
- ARG_FLOWER_PORT,
- ARG_FLOWER_CONF,
- ARG_FLOWER_URL_PREFIX,
- ARG_FLOWER_BASIC_AUTH,
- ARG_BROKER_API,
- ARG_PID,
- ARG_DAEMON,
- ARG_STDOUT,
- ARG_STDERR,
- ARG_LOG_FILE,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="stop",
- help="Stop the Celery worker gracefully",
-
func=lazy_load_command("airflow.cli.commands.celery_command.stop_worker"),
- args=(ARG_PID, ARG_VERBOSE),
- ),
-)
-
-CONFIG_COMMANDS = (
- ActionCommand(
- name="get-value",
- help="Print the value of the configuration",
-
func=lazy_load_command("airflow.cli.commands.config_command.get_value"),
- args=(
- ARG_SECTION,
- ARG_OPTION,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="list",
- help="List options for the configuration",
-
func=lazy_load_command("airflow.cli.commands.config_command.show_config"),
- args=(ARG_OPTIONAL_SECTION, ARG_COLOR, ARG_VERBOSE),
- ),
-)
-
-KUBERNETES_COMMANDS = (
- ActionCommand(
- name="cleanup-pods",
- help=(
- "Clean up Kubernetes pods "
- "(created by KubernetesExecutor/KubernetesPodOperator) "
- "in evicted/failed/succeeded/pending states"
- ),
-
func=lazy_load_command("airflow.cli.commands.kubernetes_command.cleanup_pods"),
- args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES, ARG_VERBOSE),
- ),
- ActionCommand(
- name="generate-dag-yaml",
- help="Generate YAML files for all tasks in DAG. Useful for debugging
tasks without "
- "launching into a cluster",
-
func=lazy_load_command("airflow.cli.commands.kubernetes_command.generate_pod_yaml"),
- args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH,
ARG_VERBOSE),
- ),
-)
-
-JOBS_COMMANDS = (
- ActionCommand(
- name="check",
- help="Checks if job(s) are still alive",
- func=lazy_load_command("airflow.cli.commands.jobs_command.check"),
- args=(
- ARG_JOB_TYPE_FILTER,
- ARG_JOB_HOSTNAME_FILTER,
- ARG_JOB_HOSTNAME_CALLABLE_FILTER,
- ARG_JOB_LIMIT,
- ARG_ALLOW_MULTIPLE,
- ARG_VERBOSE,
- ),
- epilog=(
- "examples:\n"
- "To check if the local scheduler is still working properly, run:\n"
- "\n"
- ' $ airflow jobs check --job-type SchedulerJob --local"\n'
- "\n"
- "To check if any scheduler is running when you are using high
availability, run:\n"
- "\n"
- " $ airflow jobs check --job-type SchedulerJob --allow-multiple
--limit 100"
- ),
- ),
-)
-
-airflow_commands: list[CLICommand] = [
- GroupCommand(
- name="dags",
- help="Manage DAGs",
- subcommands=DAGS_COMMANDS,
- ),
- GroupCommand(
- name="kubernetes", help="Tools to help run the KubernetesExecutor",
subcommands=KUBERNETES_COMMANDS
- ),
- GroupCommand(
- name="tasks",
- help="Manage tasks",
- subcommands=TASKS_COMMANDS,
- ),
- GroupCommand(
- name="pools",
- help="Manage pools",
- subcommands=POOLS_COMMANDS,
- ),
- GroupCommand(
- name="variables",
- help="Manage variables",
- subcommands=VARIABLES_COMMANDS,
- ),
- GroupCommand(
- name="jobs",
- help="Manage jobs",
- subcommands=JOBS_COMMANDS,
- ),
- GroupCommand(
- name="db",
- help="Database operations",
- subcommands=DB_COMMANDS,
- ),
- ActionCommand(
- name="kerberos",
- help="Start a kerberos ticket renewer",
-
func=lazy_load_command("airflow.cli.commands.kerberos_command.kerberos"),
- args=(
- ARG_PRINCIPAL,
- ARG_KEYTAB,
- ARG_PID,
- ARG_DAEMON,
- ARG_STDOUT,
- ARG_STDERR,
- ARG_LOG_FILE,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="webserver",
- help="Start a Airflow webserver instance",
-
func=lazy_load_command("airflow.cli.commands.webserver_command.webserver"),
- args=(
- ARG_PORT,
- ARG_WORKERS,
- ARG_WORKERCLASS,
- ARG_WORKER_TIMEOUT,
- ARG_HOSTNAME,
- ARG_PID,
- ARG_DAEMON,
- ARG_STDOUT,
- ARG_STDERR,
- ARG_ACCESS_LOGFILE,
- ARG_ERROR_LOGFILE,
- ARG_ACCESS_LOGFORMAT,
- ARG_LOG_FILE,
- ARG_SSL_CERT,
- ARG_SSL_KEY,
- ARG_DEBUG,
- ),
- ),
- ActionCommand(
- name="internal-api",
- help="Start a Airflow Internal API instance",
-
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
- args=(
- ARG_INTERNAL_API_PORT,
- ARG_INTERNAL_API_WORKERS,
- ARG_INTERNAL_API_WORKERCLASS,
- ARG_INTERNAL_API_WORKER_TIMEOUT,
- ARG_INTERNAL_API_HOSTNAME,
- ARG_PID,
- ARG_DAEMON,
- ARG_STDOUT,
- ARG_STDERR,
- ARG_INTERNAL_API_ACCESS_LOGFILE,
- ARG_INTERNAL_API_ERROR_LOGFILE,
- ARG_INTERNAL_API_ACCESS_LOGFORMAT,
- ARG_LOG_FILE,
- ARG_SSL_CERT,
- ARG_SSL_KEY,
- ARG_DEBUG,
- ),
- ),
- ActionCommand(
- name="scheduler",
- help="Start a scheduler instance",
-
func=lazy_load_command("airflow.cli.commands.scheduler_command.scheduler"),
- args=(
- ARG_SUBDIR,
- ARG_NUM_RUNS,
- ARG_DO_PICKLE,
- ARG_PID,
- ARG_DAEMON,
- ARG_STDOUT,
- ARG_STDERR,
- ARG_LOG_FILE,
- ARG_SKIP_SERVE_LOGS,
- ARG_VERBOSE,
- ),
- epilog=(
- "Signals:\n"
- "\n"
- " - SIGUSR2: Dump a snapshot of task state being tracked by the
executor.\n"
- "\n"
- " Example:\n"
- ' pkill -f -USR2 "airflow scheduler"'
- ),
- ),
- ActionCommand(
- name="triggerer",
- help="Start a triggerer instance",
-
func=lazy_load_command("airflow.cli.commands.triggerer_command.triggerer"),
- args=(
- ARG_PID,
- ARG_DAEMON,
- ARG_STDOUT,
- ARG_STDERR,
- ARG_LOG_FILE,
- ARG_CAPACITY,
- ARG_VERBOSE,
- ARG_SKIP_SERVE_LOGS,
- ),
- ),
- ActionCommand(
- name="dag-processor",
- help="Start a standalone Dag Processor instance",
-
func=lazy_load_command("airflow.cli.commands.dag_processor_command.dag_processor"),
- args=(
- ARG_PID,
- ARG_DAEMON,
- ARG_SUBDIR,
- ARG_NUM_RUNS,
- ARG_DO_PICKLE,
- ARG_STDOUT,
- ARG_STDERR,
- ARG_LOG_FILE,
- ARG_VERBOSE,
- ),
- ),
- ActionCommand(
- name="version",
- help="Show the version",
- func=lazy_load_command("airflow.cli.commands.version_command.version"),
- args=(),
- ),
- ActionCommand(
- name="cheat-sheet",
- help="Display cheat sheet",
-
func=lazy_load_command("airflow.cli.commands.cheat_sheet_command.cheat_sheet"),
- args=(ARG_VERBOSE,),
- ),
- GroupCommand(
- name="connections",
- help="Manage connections",
- subcommands=CONNECTIONS_COMMANDS,
- ),
- GroupCommand(
- name="providers",
- help="Display providers",
- subcommands=PROVIDERS_COMMANDS,
- ),
- GroupCommand(
- name="users",
- help="Manage users",
- subcommands=USERS_COMMANDS,
- ),
- GroupCommand(
- name="roles",
- help="Manage roles",
- subcommands=ROLES_COMMANDS,
- ),
- ActionCommand(
- name="sync-perm",
- help="Update permissions for existing roles and optionally DAGs",
-
func=lazy_load_command("airflow.cli.commands.sync_perm_command.sync_perm"),
- args=(ARG_INCLUDE_DAGS, ARG_VERBOSE),
- ),
- ActionCommand(
- name="rotate-fernet-key",
-
func=lazy_load_command("airflow.cli.commands.rotate_fernet_key_command.rotate_fernet_key"),
- help="Rotate encrypted connection credentials and variables",
- description=(
- "Rotate all encrypted connection credentials and variables; see "
-
"https://airflow.apache.org/docs/apache-airflow/stable/howto/secure-connections.html"
- "#rotating-encryption-keys"
- ),
- args=(),
- ),
- GroupCommand(name="config", help="View configuration",
subcommands=CONFIG_COMMANDS),
- ActionCommand(
- name="info",
- help="Show information about current Airflow and environment",
- func=lazy_load_command("airflow.cli.commands.info_command.show_info"),
- args=(
- ARG_ANONYMIZE,
- ARG_FILE_IO,
- ARG_VERBOSE,
- ARG_OUTPUT,
- ),
- ),
- ActionCommand(
- name="plugins",
- help="Dump information about loaded plugins",
-
func=lazy_load_command("airflow.cli.commands.plugins_command.dump_plugins"),
- args=(ARG_OUTPUT, ARG_VERBOSE),
- ),
- GroupCommand(
- name="celery",
- help="Celery components",
- description=(
- "Start celery components. Works only when using CeleryExecutor.
For more information, see "
-
"https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html"
- ),
- subcommands=CELERY_COMMANDS,
- ),
- ActionCommand(
- name="standalone",
- help="Run an all-in-one copy of Airflow",
-
func=lazy_load_command("airflow.cli.commands.standalone_command.standalone"),
- args=tuple(),
- ),
-]
ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in
airflow_commands}
-def _remove_dag_id_opt(command: ActionCommand):
- cmd = command._asdict()
- cmd["args"] = (arg for arg in command.args if arg is not ARG_DAG_ID)
- return ActionCommand(**cmd)
-
-
-dag_cli_commands: list[CLICommand] = [
- GroupCommand(
- name="dags",
- help="Manage DAGs",
- subcommands=[
- _remove_dag_id_opt(sp)
- for sp in DAGS_COMMANDS
- if sp.name in ["backfill", "list-runs", "pause", "unpause", "test"]
- ],
- ),
- GroupCommand(
- name="tasks",
- help="Manage tasks",
- subcommands=[_remove_dag_id_opt(sp) for sp in TASKS_COMMANDS if
sp.name in ["list", "test", "run"]],
- ),
-]
-DAG_CLI_DICT: dict[str, CLICommand] = {sp.name: sp for sp in dag_cli_commands}
-
-
class AirflowHelpFormatter(RichHelpFormatter):
"""
Custom help formatter to display help message.
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index 41143bebe9..fbbb257982 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -27,7 +27,7 @@ from unittest.mock import patch
import pytest
-from airflow.cli import cli_parser
+from airflow.cli import cli_config, cli_parser
from tests.test_utils.config import conf_vars
# Can not be `--snake_case` or contain uppercase letter
@@ -177,7 +177,7 @@ class TestCli:
all_command_as_args = [
command_as_args
- for top_command in cli_parser.dag_cli_commands
+ for top_command in cli_config.dag_cli_commands
for command_as_args in (
[[top_command.name]]
if isinstance(top_command, cli_parser.ActionCommand)
@@ -189,12 +189,12 @@ class TestCli:
parser.parse_args([*cmd_args, "--help"])
def test_positive_int(self):
- assert 1 == cli_parser.positive_int(allow_zero=True)("1")
- assert 0 == cli_parser.positive_int(allow_zero=True)("0")
+ assert 1 == cli_config.positive_int(allow_zero=True)("1")
+ assert 0 == cli_config.positive_int(allow_zero=True)("0")
with pytest.raises(argparse.ArgumentTypeError):
- cli_parser.positive_int(allow_zero=False)("0")
- cli_parser.positive_int(allow_zero=True)("-1")
+ cli_config.positive_int(allow_zero=False)("0")
+ cli_config.positive_int(allow_zero=True)("-1")
def test_dag_parser_celery_command_require_celery_executor(self):
with conf_vars({("core", "executor"): "SequentialExecutor"}),
contextlib.redirect_stderr(
@@ -251,7 +251,7 @@ class TestCli:
)
@pytest.mark.parametrize("export_format", ["json", "yaml", "unknown"])
- @patch("airflow.cli.cli_parser.os.path.isdir", return_value=True)
+ @patch("airflow.cli.cli_config.os.path.isdir", return_value=True)
def
test_invalid_choice_raises_for_export_format_in_db_export_archived_command(
self, mock_isdir, export_format
):