This is an automated email from the ASF dual-hosted git repository. bugraoz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new d8e6f4bb756 feat(airflowctl): Include Airflow CLI Output Structure and Integrate (#53424) d8e6f4bb756 is described below commit d8e6f4bb7560aa01443d338aa77317bf27ac90bd Author: Bugra Ozturk <bugrao...@users.noreply.github.com> AuthorDate: Thu Jul 17 04:31:09 2025 +0300 feat(airflowctl): Include Airflow CLI Output Structure and Integrate (#53424) * feat(airflowctl): Include Airflow CLI Output Structure and Integrate into Auto Generated Commands * feat(airflowctl): Add tabulate into dependencies * feat(airflowctl): Update default and the file name * feat(airflowctl): fix unit tests --- airflow-ctl/pyproject.toml | 1 + airflow-ctl/src/airflowctl/ctl/cli_config.py | 107 ++++++++------- .../src/airflowctl/ctl/console_formatting.py | 146 +++++++++++++++++++++ airflow-ctl/src/airflowctl/ctl/utils/__init__.py | 16 +++ airflow-ctl/src/airflowctl/ctl/utils/yaml.py | 74 +++++++++++ .../tests/airflow_ctl/ctl/test_cli_config.py | 87 ++++++++++-- 6 files changed, 370 insertions(+), 61 deletions(-) diff --git a/airflow-ctl/pyproject.toml b/airflow-ctl/pyproject.toml index 43eb8cd0ec1..734f43465af 100644 --- a/airflow-ctl/pyproject.toml +++ b/airflow-ctl/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "rich-argparse>=1.0.0", "structlog>=25.2.0", "uuid6>=2024.7.10", + "tabulate>=0.9.0", ] classifiers = [ diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index e636ed62ee8..fa8c736cb5f 100644 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -26,7 +26,6 @@ import datetime import getpass import inspect import os -import textwrap from argparse import Namespace from collections.abc import Callable, Iterable from functools import partial @@ -38,6 +37,7 @@ import rich import airflowctl.api.datamodels.generated as generated_datamodels from airflowctl.api.client import NEW_API_CLIENT, Client, ClientKind, provide_api_client from airflowctl.api.operations import BaseOperations, ServerResponseError +from airflowctl.ctl.console_formatting import AirflowConsole from airflowctl.exceptions import ( AirflowCtlConnectionException, AirflowCtlCredentialNotFoundException, @@ -172,6 +172,17 @@ ARG_FILE = Arg( help="File path to read from or write to. " "For import commands, it is a file to read from. For export commands, it is a file to write to.", ) +ARG_OUTPUT = Arg( + ( + "--output", + "-o", + ), + help="Output format. Allowed values: json, yaml, plain, table (default: json)", + metavar="(table, json, yaml, plain)", + choices=("table", "json", "yaml", "plain"), + default="json", + type=str, +) # Authentication arguments ARG_AUTH_URL = Arg( @@ -209,11 +220,6 @@ ARG_AUTH_PASSWORD = Arg( ) # Variable Commands Args -ARG_VARIABLE_IMPORT = Arg( - flags=("file",), - metavar="file", - help="Import variables from JSON file", -) ARG_VARIABLE_ACTION_ON_EXISTING_KEY = Arg( flags=("-a", "--action-on-existing-key"), type=str, @@ -221,45 +227,6 @@ ARG_VARIABLE_ACTION_ON_EXISTING_KEY = Arg( help="Action to take if we encounter a variable key that already exists.", choices=("overwrite", "fail", "skip"), ) -ARG_VARIABLE_EXPORT = Arg( - flags=("file",), - metavar="file", - help="Export all variables to JSON file", -) - -ARG_OUTPUT = Arg( - flags=("-o", "--output"), - type=str, - default="json", - help="Output format. Only json format is supported (default: json)", -) - -# Pool Commands Args -ARG_POOL_FILE = Arg( - ("file",), - metavar="FILEPATH", - help="Pools JSON file. Example format::\n" - + textwrap.indent( - textwrap.dedent( - """ - [ - { - "name": "pool_1", - "slots": 5, - "description": "", - "include_deferred": true, - "occupied_slots": 0, - "running_slots": 0, - "queued_slots": 0, - "scheduled_slots": 0, - "open_slots": 5, - "deferred_slots": 0 - } - ]""" - ), - " " * 4, - ), -) # Config arguments ARG_CONFIG_SECTION = Arg( @@ -552,6 +519,14 @@ class CommandFactory: parameter_type=parameter_type, parameter_key=parameter_key ) ) + # This list is used to determine if the command/operation needs to output data + output_command_list = [ + "list", + "get", + ] + if any(operation.get("name").startswith(cmd) for cmd in output_command_list): + args.extend([ARG_OUTPUT]) + self.args_map[(operation.get("name"), operation.get("parent").name)] = args def _create_func_map_from_operation(self): @@ -588,9 +563,39 @@ class CommandFactory: if datamodel: method_params = datamodel.model_validate(method_params) - rich.print(operation_method_object(method_params)) + method_output = operation_method_object(method_params) else: - rich.print(operation_method_object(**method_params)) + method_output = operation_method_object(**method_params) + + def convert_to_dict(obj: Any) -> dict | Any: + """Recursively convert an object to a dictionary or list of dictionaries.""" + if hasattr(obj, "model_dump"): + return obj.model_dump(mode="json") + return obj + + def check_operation_and_collect_list_of_dict(dict_obj: dict) -> list: + """Check if the object is a nested dictionary and collect list of dictionaries.""" + + def is_dict_nested(obj: dict) -> bool: + """Check if the object is a nested dictionary.""" + return any(isinstance(i, dict) or isinstance(i, list) for i in obj.values()) + + # Find result from list operation + if is_dict_nested(dict_obj): + for _, value in dict_obj.items(): + if isinstance(value, list): + return value + if isinstance(value, dict): + result = check_operation_and_collect_list_of_dict(value) + if result: + return result + # If not nested, return the object as a list which the result should be already a dict + return [dict_obj] + + AirflowConsole().print_as( + data=check_operation_and_collect_list_of_dict(convert_to_dict(method_output)), + output=args.output, + ) for operation in self.operations: self.func_map[(operation.get("name"), operation.get("parent").name)] = partial( @@ -736,14 +741,14 @@ POOL_COMMANDS = ( name="import", help="Import pools", func=lazy_load_command("airflowctl.ctl.commands.pool_command.import_"), - args=(ARG_POOL_FILE,), + args=(ARG_FILE,), ), ActionCommand( name="export", help="Export all pools", func=lazy_load_command("airflowctl.ctl.commands.pool_command.export"), args=( - ARG_POOL_FILE, + ARG_FILE, ARG_OUTPUT, ), ), @@ -754,13 +759,13 @@ VARIABLE_COMMANDS = ( name="import", help="Import variables", func=lazy_load_command("airflowctl.ctl.commands.variable_command.import_"), - args=(ARG_VARIABLE_IMPORT, ARG_VARIABLE_ACTION_ON_EXISTING_KEY), + args=(ARG_FILE, ARG_VARIABLE_ACTION_ON_EXISTING_KEY), ), ActionCommand( name="export", help="Export all variables", func=lazy_load_command("airflowctl.ctl.commands.variable_command.export"), - args=(ARG_VARIABLE_EXPORT,), + args=(ARG_FILE,), ), ) diff --git a/airflow-ctl/src/airflowctl/ctl/console_formatting.py b/airflow-ctl/src/airflowctl/ctl/console_formatting.py new file mode 100644 index 00000000000..ac9bc431597 --- /dev/null +++ b/airflow-ctl/src/airflowctl/ctl/console_formatting.py @@ -0,0 +1,146 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +import sys +from collections.abc import Callable, Sequence +from typing import TYPE_CHECKING, Any + +from rich.box import ASCII_DOUBLE_HEAD +from rich.console import Console +from rich.syntax import Syntax +from rich.table import Table +from tabulate import tabulate + +from airflowctl.ctl.utils import yaml + +if TYPE_CHECKING: + from typing import TypeGuard + + +# TODO (bugraoz93): Use Vendor Approach and unify with airflow.platform for core-ctl +def is_tty(): + """Check if stdout is connected (is associated with a terminal device) to a tty(-like) device.""" + if not hasattr(sys.stdout, "isatty"): + return False + return sys.stdout.isatty() + + +def is_data_sequence(data: Sequence[dict | Any]) -> TypeGuard[Sequence[dict]]: + return all(isinstance(d, dict) for d in data) + + +class AirflowConsole(Console): + """Airflow rich console.""" + + def __init__(self, show_header: bool = True, *args, **kwargs): + super().__init__(*args, **kwargs) + # Set the width to constant to pipe whole output from console + self._width = 200 if not is_tty() else self._width + + # If show header in tables + self.show_header = show_header + + def print_as_json(self, data: dict): + """Render dict as json text representation.""" + json_content = json.dumps(data) + self.print(Syntax(json_content, "json", theme="ansi_dark"), soft_wrap=True) + + def print_as_yaml(self, data: dict): + """Render dict as yaml text representation.""" + yaml_content = yaml.dump(data) + self.print(Syntax(yaml_content, "yaml", theme="ansi_dark"), soft_wrap=True) + + def print_as_table(self, data: list[dict]): + """Render list of dictionaries as table.""" + if not data: + self.print("No data found") + return + + table = SimpleTable(show_header=self.show_header) + for col in data[0]: + table.add_column(col) + + for row in data: + table.add_row(*(str(d) for d in row.values())) + self.print(table) + + def print_as_plain_table(self, data: list[dict]): + """Render list of dictionaries as a simple table than can be easily piped.""" + if not data: + self.print("No data found") + return + rows = [d.values() for d in data] + output = tabulate(rows, tablefmt="plain", headers=list(data[0])) + print(output) + + def _normalize_data(self, value: Any, output: str) -> list | str | dict | None: + if isinstance(value, (tuple, list)): + if output == "table": + return ",".join(str(self._normalize_data(x, output)) for x in value) + return [self._normalize_data(x, output) for x in value] + if isinstance(value, dict) and output != "table": + return {k: self._normalize_data(v, output) for k, v in value.items()} + if value is None: + return None + return str(value) + + def print_as( + self, + data: Sequence[dict | Any], + output: str, + mapper: Callable[[Any], dict] | None = None, + ) -> None: + """Print provided using format specified by output argument.""" + output_to_renderer: dict[str, Callable[[Any], None]] = { + "json": self.print_as_json, + "yaml": self.print_as_yaml, + "table": self.print_as_table, + "plain": self.print_as_plain_table, + } + renderer = output_to_renderer.get(output) + if not renderer: + raise ValueError(f"Unknown formatter: {output}. Allowed options: {list(output_to_renderer)}") + + if mapper: + dict_data: Sequence[dict] = [mapper(d) for d in data] + elif is_data_sequence(data): + dict_data = data + else: + raise ValueError("To tabulate non-dictionary data you need to provide `mapper` function") + dict_data = [{k: self._normalize_data(v, output) for k, v in d.items()} for d in dict_data] + renderer(dict_data) + + +class SimpleTable(Table): + """A rich Table with some default hardcoded for consistency.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.show_edge = kwargs.get("show_edge", False) + self.pad_edge = kwargs.get("pad_edge", False) + self.box = kwargs.get("box", ASCII_DOUBLE_HEAD) + self.show_header = kwargs.get("show_header", False) + self.title_style = kwargs.get("title_style", "bold green") + self.title_justify = kwargs.get("title_justify", "left") + self.caption = kwargs.get("caption", " ") + + def add_column(self, *args, **kwargs) -> None: + """Add a column to the table. We use different default.""" + kwargs["overflow"] = kwargs.get("overflow") # to avoid truncating + super().add_column(*args, **kwargs) diff --git a/airflow-ctl/src/airflowctl/ctl/utils/__init__.py b/airflow-ctl/src/airflowctl/ctl/utils/__init__.py new file mode 100644 index 00000000000..13a83393a91 --- /dev/null +++ b/airflow-ctl/src/airflowctl/ctl/utils/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-ctl/src/airflowctl/ctl/utils/yaml.py b/airflow-ctl/src/airflowctl/ctl/utils/yaml.py new file mode 100644 index 00000000000..a52910f73ea --- /dev/null +++ b/airflow-ctl/src/airflowctl/ctl/utils/yaml.py @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# TODO (bugraoz93): Use Vendor Approach and unify with airflow.utils.yaml for core-ctl +""" +Use libyaml for YAML dump/load operations where possible. + +If libyaml is available we will use it -- it is significantly faster. + +This module delegates all other properties to the yaml module, so it can be used as: + +.. code-block:: python + + import airflow.utils.yaml as yaml + +And then be used directly in place of the normal python module. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, BinaryIO, TextIO, cast + +if TYPE_CHECKING: + from yaml.error import MarkedYAMLError, YAMLError # noqa: F401 + + +def safe_load(stream: bytes | str | BinaryIO | TextIO) -> Any: + """Like yaml.safe_load, but use the C libyaml for speed where we can.""" + # delay import until use. + from yaml import load as orig + + try: + from yaml import CSafeLoader as SafeLoader + except ImportError: + from yaml import SafeLoader # type: ignore[assignment] + + return orig(stream, SafeLoader) + + +def dump(data: Any, **kwargs) -> str: + """Like yaml.safe_dump, but use the C libyaml for speed where we can.""" + # delay import until use. + from yaml import dump as orig + + try: + from yaml import CSafeDumper as SafeDumper + except ImportError: + from yaml import SafeDumper # type: ignore[assignment] + + return cast("str", orig(data, Dumper=SafeDumper, **kwargs)) + + +def __getattr__(name): + # Delegate anything else to the yaml module + import yaml + + if name == "FullLoader": + # Try to use CFullLoader by default + getattr(yaml, "CFullLoader", yaml.FullLoader) + + return getattr(yaml, name) diff --git a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py index 464e74e4734..3e887b68431 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py @@ -41,7 +41,7 @@ def no_op_method(): @pytest.fixture(scope="module") -def test_args(): +def test_args_create(): return [ ( "--dag-id", @@ -116,6 +116,53 @@ def test_args(): ] +""" + help="Output format. Allowed values: json, yaml, plain, table (default: json)", + metavar="(table, json, yaml, plain)", + choices=("table", "json", "yaml", "plain"), + default="json", +""" + + +@pytest.fixture(scope="module") +def test_args_list(): + return [ + ( + "--output", + { + "help": "Output format. Allowed values: json, yaml, plain, table (default: json)", + "default": "json", + "type": str, + "dest": None, + }, + ), + ] + + +@pytest.fixture(scope="module") +def test_args_get(): + return [ + ( + "--backfill-id", + { + "help": "backfill_id for get operation in BackfillsOperations", + "default": None, + "type": str, + "dest": None, + }, + ), + ( + "--output", + { + "help": "Output format. Allowed values: json, yaml, plain, table (default: json)", + "default": "json", + "type": str, + "dest": None, + }, + ), + ] + + class TestCommandFactory: @classmethod def _save_temp_operations_py(cls, temp_file: str, file_content) -> None: @@ -136,7 +183,7 @@ class TestCommandFactory: except FileNotFoundError: pass - def test_command_factory(self, no_op_method, test_args): + def test_command_factory(self, no_op_method, test_args_create, test_args_list, test_args_get): """ Test the command factory. """ @@ -159,6 +206,13 @@ class TestCommandFactory: return BackfillResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e + def list(self) -> BackfillListResponse: + params = {"dag_id": dag_id} if dag_id else {} + self.response = self.client.get("backfills", params=params) + return BackfillListResponse.model_validate_json(self.response.content) + def get(self, backfill_id: str) -> BackfillResponse | ServerResponseError: + self.response = self.client.get(f"backfills/{backfill_id}") + return BackfillResponse.model_validate_json(self.response.content) """, ) @@ -170,14 +224,27 @@ class TestCommandFactory: assert generated_group_command.name == "backfills" assert generated_group_command.help == "Perform Backfills operations" for sub_command in generated_group_command.subcommands: - assert sub_command.name == "create" - for arg, test_arg in zip(sub_command.args, test_args): - assert arg.flags[0] == test_arg[0] - assert arg.kwargs["help"] == test_arg[1]["help"] - assert arg.kwargs["action"] == test_arg[1]["action"] - assert arg.kwargs["default"] == test_arg[1]["default"] - assert arg.kwargs["type"] == test_arg[1]["type"] - assert arg.kwargs["dest"] == test_arg[1]["dest"] + if sub_command.name == "create": + for arg, test_arg in zip(sub_command.args, test_args_create): + assert arg.flags[0] == test_arg[0] + assert arg.kwargs["help"] == test_arg[1]["help"] + assert arg.kwargs["action"] == test_arg[1]["action"] + assert arg.kwargs["default"] == test_arg[1]["default"] + assert arg.kwargs["type"] == test_arg[1]["type"] + assert arg.kwargs["dest"] == test_arg[1]["dest"] + print(arg.flags) + elif sub_command.name == "list": + for arg, test_arg in zip(sub_command.args, test_args_list): + assert arg.flags[0] == test_arg[0] + assert arg.kwargs["help"] == test_arg[1]["help"] + assert arg.kwargs["default"] == test_arg[1]["default"] + assert arg.kwargs["type"] == test_arg[1]["type"] + elif sub_command.name == "get": + for arg, test_arg in zip(sub_command.args, test_args_get): + assert arg.flags[0] == test_arg[0] + assert arg.kwargs["help"] == test_arg[1]["help"] + assert arg.kwargs["default"] == test_arg[1]["default"] + assert arg.kwargs["type"] == test_arg[1]["type"] class TestCliConfigMethods: