This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new da7de797a6 Allow custom columns in cli dags list (#35250)
da7de797a6 is described below

commit da7de797a6536239e64d62503320aa3719999b7b
Author: Jt Miclat <[email protected]>
AuthorDate: Mon Jan 8 06:46:42 2024 +0800

    Allow custom columns in cli dags list (#35250)
    
    * Add last parsed time to cli dags list
    
    * Update dag list test to include last parsed time
    
    * Fix wrong typing for get_last_parsed_time
    
    * Add additional columns and tests
    
    * Add typing
    
    * Replace additional columns to columns for cli dag list. Throw error for 
invalid columns
    
    * Update tests for dag list
    
    * Update tests/cli/commands/test_dag_command.py
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * Maiintain column order based on keys
    
    * convert system exist to warning
    
    * Update dag list tests
    
    * Update test for cli list dags with invalid column
    
    * Update airflow/cli/commands/dag_command.py
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * Update airflow/cli/commands/dag_command.py
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * Update airflow/cli/commands/dag_command.py
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * Update airflow/cli/commands/dag_command.py
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * Remove extra colon in failing tests
    
    ---------
    
    Co-authored-by: Wei Lee <[email protected]>
---
 airflow/cli/cli_config.py              |  9 ++++++++-
 airflow/cli/commands/dag_command.py    | 27 ++++++++++++++++++++-------
 tests/cli/commands/test_dag_command.py | 33 +++++++++++++++++++++++++++------
 3 files changed, 55 insertions(+), 14 deletions(-)

diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 0880a94e07..008d9daecb 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -986,6 +986,13 @@ ARG_CLEAR_ONLY = Arg(
     help="If passed, serialized DAGs will be cleared but not reserialized.",
 )
 
+ARG_DAG_LIST_COLUMNS = Arg(
+    ("--columns",),
+    type=string_list_type,
+    help="List of columns to render. (default: ['dag_id', 'fileloc', 'owner', 
'is_paused'])",
+    default=("dag_id", "fileloc", "owners", "is_paused"),
+)
+
 ALTERNATIVE_CONN_SPECS_ARGS = [
     ARG_CONN_TYPE,
     ARG_CONN_DESCRIPTION,
@@ -1032,7 +1039,7 @@ DAGS_COMMANDS = (
         name="list",
         help="List all the DAGs",
         
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_dags"),
-        args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
+        args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE, ARG_DAG_LIST_COLUMNS),
     ),
     ActionCommand(
         name="list-import-errors",
diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index a9d6aaf342..4b5f92fd86 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -351,8 +351,20 @@ def dag_next_execution(args) -> None:
 @cli_utils.action_cli
 @suppress_logs_and_warning
 @providers_configuration_loaded
-def dag_list_dags(args) -> None:
+@provide_session
+def dag_list_dags(args, session=NEW_SESSION) -> None:
     """Display dags with or without stats at the command line."""
+    cols = args.columns if args.columns else []
+    invalid_cols = [c for c in cols if c not in dag_schema.fields]
+    valid_cols = [c for c in cols if c in dag_schema.fields]
+    if invalid_cols:
+        from rich import print as rich_print
+
+        rich_print(
+            f"[red][bold]Error:[/bold] Ignoring the following invalid columns: 
{invalid_cols}.  "
+            f"List of valid columns: {list(dag_schema.fields.keys())}",
+            file=sys.stderr,
+        )
     dagbag = DagBag(process_subdir(args.subdir))
     if dagbag.import_errors:
         from rich import print as rich_print
@@ -362,15 +374,16 @@ def dag_list_dags(args) -> None:
             "For details, run `airflow dags list-import-errors`",
             file=sys.stderr,
         )
+
+    def get_dag_detail(dag: DAG) -> dict:
+        dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
+        dag_detail = dag_schema.dump(dag_model)
+        return {col: dag_detail[col] for col in valid_cols}
+
     AirflowConsole().print_as(
         data=sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id")),
         output=args.output,
-        mapper=lambda x: {
-            "dag_id": x.dag_id,
-            "filepath": x.filepath,
-            "owner": x.owner,
-            "paused": x.get_is_paused(),
-        },
+        mapper=get_dag_detail,
     )
 
 
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 4f16c381ad..0518e35b0d 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -524,15 +524,36 @@ class TestCliDags:
 
     @conf_vars({("core", "load_examples"): "true"})
     def test_cli_list_dags(self):
-        args = self.parser.parse_args(["dags", "list", "--output", "yaml"])
+        args = self.parser.parse_args(["dags", "list", "--output", "json"])
         with contextlib.redirect_stdout(StringIO()) as temp_stdout:
             dag_command.dag_list_dags(args)
             out = temp_stdout.getvalue()
-        assert "owner" in out
-        assert "airflow" in out
-        assert "paused" in out
-        assert "airflow/example_dags/example_complex.py" in out
-        assert "- dag_id:" in out
+            dag_list = json.loads(out)
+        for key in ["dag_id", "fileloc", "owners", "is_paused"]:
+            assert key in dag_list[0]
+        assert any("airflow/example_dags/example_complex.py" in d["fileloc"] 
for d in dag_list)
+
+    @conf_vars({("core", "load_examples"): "true"})
+    def test_cli_list_dags_custom_cols(self):
+        args = self.parser.parse_args(
+            ["dags", "list", "--output", "json", "--columns", 
"dag_id,last_parsed_time"]
+        )
+        with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+            dag_command.dag_list_dags(args)
+            out = temp_stdout.getvalue()
+            dag_list = json.loads(out)
+        for key in ["dag_id", "last_parsed_time"]:
+            assert key in dag_list[0]
+        for key in ["fileloc", "owners", "is_paused"]:
+            assert key not in dag_list[0]
+
+    @conf_vars({("core", "load_examples"): "true"})
+    def test_cli_list_dags_invalid_cols(self):
+        args = self.parser.parse_args(["dags", "list", "--output", "json", 
"--columns", "dag_id,invalid_col"])
+        with contextlib.redirect_stderr(StringIO()) as temp_stderr:
+            dag_command.dag_list_dags(args)
+            out = temp_stderr.getvalue()
+        assert "Ignoring the following invalid columns: ['invalid_col']" in out
 
     @conf_vars({("core", "load_examples"): "false"})
     def test_cli_list_dags_prints_import_errors(self):

Reply via email to