This is an automated email from the ASF dual-hosted git repository.

bugraoz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d8e6f4bb756 feat(airflowctl): Include Airflow CLI Output Structure and 
Integrate (#53424)
d8e6f4bb756 is described below

commit d8e6f4bb7560aa01443d338aa77317bf27ac90bd
Author: Bugra Ozturk <bugrao...@users.noreply.github.com>
AuthorDate: Thu Jul 17 04:31:09 2025 +0300

    feat(airflowctl): Include Airflow CLI Output Structure and Integrate 
(#53424)
    
    * feat(airflowctl): Include Airflow CLI Output Structure and Integrate into 
Auto Generated Commands
    
    * feat(airflowctl): Add tabulate into dependencies
    
    * feat(airflowctl): Update default and the file name
    
    * feat(airflowctl): fix unit tests
---
 airflow-ctl/pyproject.toml                         |   1 +
 airflow-ctl/src/airflowctl/ctl/cli_config.py       | 107 ++++++++-------
 .../src/airflowctl/ctl/console_formatting.py       | 146 +++++++++++++++++++++
 airflow-ctl/src/airflowctl/ctl/utils/__init__.py   |  16 +++
 airflow-ctl/src/airflowctl/ctl/utils/yaml.py       |  74 +++++++++++
 .../tests/airflow_ctl/ctl/test_cli_config.py       |  87 ++++++++++--
 6 files changed, 370 insertions(+), 61 deletions(-)

diff --git a/airflow-ctl/pyproject.toml b/airflow-ctl/pyproject.toml
index 43eb8cd0ec1..734f43465af 100644
--- a/airflow-ctl/pyproject.toml
+++ b/airflow-ctl/pyproject.toml
@@ -33,6 +33,7 @@ dependencies = [
     "rich-argparse>=1.0.0",
     "structlog>=25.2.0",
     "uuid6>=2024.7.10",
+    "tabulate>=0.9.0",
 ]
 
 classifiers = [
diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py 
b/airflow-ctl/src/airflowctl/ctl/cli_config.py
index e636ed62ee8..fa8c736cb5f 100644
--- a/airflow-ctl/src/airflowctl/ctl/cli_config.py
+++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py
@@ -26,7 +26,6 @@ import datetime
 import getpass
 import inspect
 import os
-import textwrap
 from argparse import Namespace
 from collections.abc import Callable, Iterable
 from functools import partial
@@ -38,6 +37,7 @@ import rich
 import airflowctl.api.datamodels.generated as generated_datamodels
 from airflowctl.api.client import NEW_API_CLIENT, Client, ClientKind, 
provide_api_client
 from airflowctl.api.operations import BaseOperations, ServerResponseError
+from airflowctl.ctl.console_formatting import AirflowConsole
 from airflowctl.exceptions import (
     AirflowCtlConnectionException,
     AirflowCtlCredentialNotFoundException,
@@ -172,6 +172,17 @@ ARG_FILE = Arg(
     help="File path to read from or write to. "
     "For import commands, it is a file to read from. For export commands, it 
is a file to write to.",
 )
+ARG_OUTPUT = Arg(
+    (
+        "--output",
+        "-o",
+    ),
+    help="Output format. Allowed values: json, yaml, plain, table (default: 
json)",
+    metavar="(table, json, yaml, plain)",
+    choices=("table", "json", "yaml", "plain"),
+    default="json",
+    type=str,
+)
 
 # Authentication arguments
 ARG_AUTH_URL = Arg(
@@ -209,11 +220,6 @@ ARG_AUTH_PASSWORD = Arg(
 )
 
 # Variable Commands Args
-ARG_VARIABLE_IMPORT = Arg(
-    flags=("file",),
-    metavar="file",
-    help="Import variables from JSON file",
-)
 ARG_VARIABLE_ACTION_ON_EXISTING_KEY = Arg(
     flags=("-a", "--action-on-existing-key"),
     type=str,
@@ -221,45 +227,6 @@ ARG_VARIABLE_ACTION_ON_EXISTING_KEY = Arg(
     help="Action to take if we encounter a variable key that already exists.",
     choices=("overwrite", "fail", "skip"),
 )
-ARG_VARIABLE_EXPORT = Arg(
-    flags=("file",),
-    metavar="file",
-    help="Export all variables to JSON file",
-)
-
-ARG_OUTPUT = Arg(
-    flags=("-o", "--output"),
-    type=str,
-    default="json",
-    help="Output format. Only json format is supported (default: json)",
-)
-
-# Pool Commands Args
-ARG_POOL_FILE = Arg(
-    ("file",),
-    metavar="FILEPATH",
-    help="Pools JSON file. Example format::\n"
-    + textwrap.indent(
-        textwrap.dedent(
-            """
-            [
-                {
-                    "name": "pool_1",
-                    "slots": 5,
-                    "description": "",
-                    "include_deferred": true,
-                    "occupied_slots": 0,
-                    "running_slots": 0,
-                    "queued_slots": 0,
-                    "scheduled_slots": 0,
-                    "open_slots": 5,
-                    "deferred_slots": 0
-                }
-            ]"""
-        ),
-        " " * 4,
-    ),
-)
 
 # Config arguments
 ARG_CONFIG_SECTION = Arg(
@@ -552,6 +519,14 @@ class CommandFactory:
                                 parameter_type=parameter_type, 
parameter_key=parameter_key
                             )
                         )
+            # This list is used to determine if the command/operation needs to 
output data
+            output_command_list = [
+                "list",
+                "get",
+            ]
+            if any(operation.get("name").startswith(cmd) for cmd in 
output_command_list):
+                args.extend([ARG_OUTPUT])
+
             self.args_map[(operation.get("name"), 
operation.get("parent").name)] = args
 
     def _create_func_map_from_operation(self):
@@ -588,9 +563,39 @@ class CommandFactory:
 
             if datamodel:
                 method_params = datamodel.model_validate(method_params)
-                rich.print(operation_method_object(method_params))
+                method_output = operation_method_object(method_params)
             else:
-                rich.print(operation_method_object(**method_params))
+                method_output = operation_method_object(**method_params)
+
+            def convert_to_dict(obj: Any) -> dict | Any:
+                """Recursively convert an object to a dictionary or list of 
dictionaries."""
+                if hasattr(obj, "model_dump"):
+                    return obj.model_dump(mode="json")
+                return obj
+
+            def check_operation_and_collect_list_of_dict(dict_obj: dict) -> 
list:
+                """Check if the object is a nested dictionary and collect list 
of dictionaries."""
+
+                def is_dict_nested(obj: dict) -> bool:
+                    """Check if the object is a nested dictionary."""
+                    return any(isinstance(i, dict) or isinstance(i, list) for 
i in obj.values())
+
+                # Find result from list operation
+                if is_dict_nested(dict_obj):
+                    for _, value in dict_obj.items():
+                        if isinstance(value, list):
+                            return value
+                        if isinstance(value, dict):
+                            result = 
check_operation_and_collect_list_of_dict(value)
+                            if result:
+                                return result
+                # If not nested, return the object as a list which the result 
should be already a dict
+                return [dict_obj]
+
+            AirflowConsole().print_as(
+                
data=check_operation_and_collect_list_of_dict(convert_to_dict(method_output)),
+                output=args.output,
+            )
 
         for operation in self.operations:
             self.func_map[(operation.get("name"), 
operation.get("parent").name)] = partial(
@@ -736,14 +741,14 @@ POOL_COMMANDS = (
         name="import",
         help="Import pools",
         func=lazy_load_command("airflowctl.ctl.commands.pool_command.import_"),
-        args=(ARG_POOL_FILE,),
+        args=(ARG_FILE,),
     ),
     ActionCommand(
         name="export",
         help="Export all pools",
         func=lazy_load_command("airflowctl.ctl.commands.pool_command.export"),
         args=(
-            ARG_POOL_FILE,
+            ARG_FILE,
             ARG_OUTPUT,
         ),
     ),
@@ -754,13 +759,13 @@ VARIABLE_COMMANDS = (
         name="import",
         help="Import variables",
         
func=lazy_load_command("airflowctl.ctl.commands.variable_command.import_"),
-        args=(ARG_VARIABLE_IMPORT, ARG_VARIABLE_ACTION_ON_EXISTING_KEY),
+        args=(ARG_FILE, ARG_VARIABLE_ACTION_ON_EXISTING_KEY),
     ),
     ActionCommand(
         name="export",
         help="Export all variables",
         
func=lazy_load_command("airflowctl.ctl.commands.variable_command.export"),
-        args=(ARG_VARIABLE_EXPORT,),
+        args=(ARG_FILE,),
     ),
 )
 
diff --git a/airflow-ctl/src/airflowctl/ctl/console_formatting.py 
b/airflow-ctl/src/airflowctl/ctl/console_formatting.py
new file mode 100644
index 00000000000..ac9bc431597
--- /dev/null
+++ b/airflow-ctl/src/airflowctl/ctl/console_formatting.py
@@ -0,0 +1,146 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import sys
+from collections.abc import Callable, Sequence
+from typing import TYPE_CHECKING, Any
+
+from rich.box import ASCII_DOUBLE_HEAD
+from rich.console import Console
+from rich.syntax import Syntax
+from rich.table import Table
+from tabulate import tabulate
+
+from airflowctl.ctl.utils import yaml
+
+if TYPE_CHECKING:
+    from typing import TypeGuard
+
+
+# TODO (bugraoz93): Use Vendor Approach and unify with airflow.platform for 
core-ctl
+def is_tty():
+    """Check if stdout is connected (is associated with a terminal device) to 
a tty(-like) device."""
+    if not hasattr(sys.stdout, "isatty"):
+        return False
+    return sys.stdout.isatty()
+
+
+def is_data_sequence(data: Sequence[dict | Any]) -> TypeGuard[Sequence[dict]]:
+    return all(isinstance(d, dict) for d in data)
+
+
+class AirflowConsole(Console):
+    """Airflow rich console."""
+
+    def __init__(self, show_header: bool = True, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        # Set the width to constant to pipe whole output from console
+        self._width = 200 if not is_tty() else self._width
+
+        # If show header in tables
+        self.show_header = show_header
+
+    def print_as_json(self, data: dict):
+        """Render dict as json text representation."""
+        json_content = json.dumps(data)
+        self.print(Syntax(json_content, "json", theme="ansi_dark"), 
soft_wrap=True)
+
+    def print_as_yaml(self, data: dict):
+        """Render dict as yaml text representation."""
+        yaml_content = yaml.dump(data)
+        self.print(Syntax(yaml_content, "yaml", theme="ansi_dark"), 
soft_wrap=True)
+
+    def print_as_table(self, data: list[dict]):
+        """Render list of dictionaries as table."""
+        if not data:
+            self.print("No data found")
+            return
+
+        table = SimpleTable(show_header=self.show_header)
+        for col in data[0]:
+            table.add_column(col)
+
+        for row in data:
+            table.add_row(*(str(d) for d in row.values()))
+        self.print(table)
+
+    def print_as_plain_table(self, data: list[dict]):
+        """Render list of dictionaries as a simple table than can be easily 
piped."""
+        if not data:
+            self.print("No data found")
+            return
+        rows = [d.values() for d in data]
+        output = tabulate(rows, tablefmt="plain", headers=list(data[0]))
+        print(output)
+
+    def _normalize_data(self, value: Any, output: str) -> list | str | dict | 
None:
+        if isinstance(value, (tuple, list)):
+            if output == "table":
+                return ",".join(str(self._normalize_data(x, output)) for x in 
value)
+            return [self._normalize_data(x, output) for x in value]
+        if isinstance(value, dict) and output != "table":
+            return {k: self._normalize_data(v, output) for k, v in 
value.items()}
+        if value is None:
+            return None
+        return str(value)
+
+    def print_as(
+        self,
+        data: Sequence[dict | Any],
+        output: str,
+        mapper: Callable[[Any], dict] | None = None,
+    ) -> None:
+        """Print provided using format specified by output argument."""
+        output_to_renderer: dict[str, Callable[[Any], None]] = {
+            "json": self.print_as_json,
+            "yaml": self.print_as_yaml,
+            "table": self.print_as_table,
+            "plain": self.print_as_plain_table,
+        }
+        renderer = output_to_renderer.get(output)
+        if not renderer:
+            raise ValueError(f"Unknown formatter: {output}. Allowed options: 
{list(output_to_renderer)}")
+
+        if mapper:
+            dict_data: Sequence[dict] = [mapper(d) for d in data]
+        elif is_data_sequence(data):
+            dict_data = data
+        else:
+            raise ValueError("To tabulate non-dictionary data you need to 
provide `mapper` function")
+        dict_data = [{k: self._normalize_data(v, output) for k, v in 
d.items()} for d in dict_data]
+        renderer(dict_data)
+
+
+class SimpleTable(Table):
+    """A rich Table with some default hardcoded for consistency."""
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.show_edge = kwargs.get("show_edge", False)
+        self.pad_edge = kwargs.get("pad_edge", False)
+        self.box = kwargs.get("box", ASCII_DOUBLE_HEAD)
+        self.show_header = kwargs.get("show_header", False)
+        self.title_style = kwargs.get("title_style", "bold green")
+        self.title_justify = kwargs.get("title_justify", "left")
+        self.caption = kwargs.get("caption", " ")
+
+    def add_column(self, *args, **kwargs) -> None:
+        """Add a column to the table. We use different default."""
+        kwargs["overflow"] = kwargs.get("overflow")  # to avoid truncating
+        super().add_column(*args, **kwargs)
diff --git a/airflow-ctl/src/airflowctl/ctl/utils/__init__.py 
b/airflow-ctl/src/airflowctl/ctl/utils/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/airflow-ctl/src/airflowctl/ctl/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow-ctl/src/airflowctl/ctl/utils/yaml.py 
b/airflow-ctl/src/airflowctl/ctl/utils/yaml.py
new file mode 100644
index 00000000000..a52910f73ea
--- /dev/null
+++ b/airflow-ctl/src/airflowctl/ctl/utils/yaml.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# TODO (bugraoz93): Use Vendor Approach and unify with airflow.utils.yaml for 
core-ctl
+"""
+Use libyaml for YAML dump/load operations where possible.
+
+If libyaml is available we will use it -- it is significantly faster.
+
+This module delegates all other properties to the yaml module, so it can be 
used as:
+
+.. code-block:: python
+
+    import airflow.utils.yaml as yaml
+
+And then be used directly in place of the normal python module.
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, BinaryIO, TextIO, cast
+
+if TYPE_CHECKING:
+    from yaml.error import MarkedYAMLError, YAMLError  # noqa: F401
+
+
+def safe_load(stream: bytes | str | BinaryIO | TextIO) -> Any:
+    """Like yaml.safe_load, but use the C libyaml for speed where we can."""
+    # delay import until use.
+    from yaml import load as orig
+
+    try:
+        from yaml import CSafeLoader as SafeLoader
+    except ImportError:
+        from yaml import SafeLoader  # type: ignore[assignment]
+
+    return orig(stream, SafeLoader)
+
+
+def dump(data: Any, **kwargs) -> str:
+    """Like yaml.safe_dump, but use the C libyaml for speed where we can."""
+    # delay import until use.
+    from yaml import dump as orig
+
+    try:
+        from yaml import CSafeDumper as SafeDumper
+    except ImportError:
+        from yaml import SafeDumper  # type: ignore[assignment]
+
+    return cast("str", orig(data, Dumper=SafeDumper, **kwargs))
+
+
+def __getattr__(name):
+    # Delegate anything else to the yaml module
+    import yaml
+
+    if name == "FullLoader":
+        # Try to use CFullLoader by default
+        getattr(yaml, "CFullLoader", yaml.FullLoader)
+
+    return getattr(yaml, name)
diff --git a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py 
b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
index 464e74e4734..3e887b68431 100644
--- a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
+++ b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py
@@ -41,7 +41,7 @@ def no_op_method():
 
 
 @pytest.fixture(scope="module")
-def test_args():
+def test_args_create():
     return [
         (
             "--dag-id",
@@ -116,6 +116,53 @@ def test_args():
     ]
 
 
+"""
+    help="Output format. Allowed values: json, yaml, plain, table (default: 
json)",
+    metavar="(table, json, yaml, plain)",
+    choices=("table", "json", "yaml", "plain"),
+    default="json",
+"""
+
+
+@pytest.fixture(scope="module")
+def test_args_list():
+    return [
+        (
+            "--output",
+            {
+                "help": "Output format. Allowed values: json, yaml, plain, 
table (default: json)",
+                "default": "json",
+                "type": str,
+                "dest": None,
+            },
+        ),
+    ]
+
+
+@pytest.fixture(scope="module")
+def test_args_get():
+    return [
+        (
+            "--backfill-id",
+            {
+                "help": "backfill_id for get operation in BackfillsOperations",
+                "default": None,
+                "type": str,
+                "dest": None,
+            },
+        ),
+        (
+            "--output",
+            {
+                "help": "Output format. Allowed values: json, yaml, plain, 
table (default: json)",
+                "default": "json",
+                "type": str,
+                "dest": None,
+            },
+        ),
+    ]
+
+
 class TestCommandFactory:
     @classmethod
     def _save_temp_operations_py(cls, temp_file: str, file_content) -> None:
@@ -136,7 +183,7 @@ class TestCommandFactory:
         except FileNotFoundError:
             pass
 
-    def test_command_factory(self, no_op_method, test_args):
+    def test_command_factory(self, no_op_method, test_args_create, 
test_args_list, test_args_get):
         """
         Test the command factory.
         """
@@ -159,6 +206,13 @@ class TestCommandFactory:
                             return 
BackfillResponse.model_validate_json(self.response.content)
                         except ServerResponseError as e:
                             raise e
+                    def list(self) -> BackfillListResponse:
+                        params = {"dag_id": dag_id} if dag_id else {}
+                        self.response = self.client.get("backfills", 
params=params)
+                        return 
BackfillListResponse.model_validate_json(self.response.content)
+                    def get(self, backfill_id: str) -> BackfillResponse | 
ServerResponseError:
+                        self.response = 
self.client.get(f"backfills/{backfill_id}")
+                        return 
BackfillResponse.model_validate_json(self.response.content)
             """,
         )
 
@@ -170,14 +224,27 @@ class TestCommandFactory:
             assert generated_group_command.name == "backfills"
             assert generated_group_command.help == "Perform Backfills 
operations"
             for sub_command in generated_group_command.subcommands:
-                assert sub_command.name == "create"
-                for arg, test_arg in zip(sub_command.args, test_args):
-                    assert arg.flags[0] == test_arg[0]
-                    assert arg.kwargs["help"] == test_arg[1]["help"]
-                    assert arg.kwargs["action"] == test_arg[1]["action"]
-                    assert arg.kwargs["default"] == test_arg[1]["default"]
-                    assert arg.kwargs["type"] == test_arg[1]["type"]
-                    assert arg.kwargs["dest"] == test_arg[1]["dest"]
+                if sub_command.name == "create":
+                    for arg, test_arg in zip(sub_command.args, 
test_args_create):
+                        assert arg.flags[0] == test_arg[0]
+                        assert arg.kwargs["help"] == test_arg[1]["help"]
+                        assert arg.kwargs["action"] == test_arg[1]["action"]
+                        assert arg.kwargs["default"] == test_arg[1]["default"]
+                        assert arg.kwargs["type"] == test_arg[1]["type"]
+                        assert arg.kwargs["dest"] == test_arg[1]["dest"]
+                        print(arg.flags)
+                elif sub_command.name == "list":
+                    for arg, test_arg in zip(sub_command.args, test_args_list):
+                        assert arg.flags[0] == test_arg[0]
+                        assert arg.kwargs["help"] == test_arg[1]["help"]
+                        assert arg.kwargs["default"] == test_arg[1]["default"]
+                        assert arg.kwargs["type"] == test_arg[1]["type"]
+                elif sub_command.name == "get":
+                    for arg, test_arg in zip(sub_command.args, test_args_get):
+                        assert arg.flags[0] == test_arg[0]
+                        assert arg.kwargs["help"] == test_arg[1]["help"]
+                        assert arg.kwargs["default"] == test_arg[1]["default"]
+                        assert arg.kwargs["type"] == test_arg[1]["type"]
 
 
 class TestCliConfigMethods:

Reply via email to