This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new da7de797a6 Allow custom columns in cli dags list (#35250)
da7de797a6 is described below
commit da7de797a6536239e64d62503320aa3719999b7b
Author: Jt Miclat <[email protected]>
AuthorDate: Mon Jan 8 06:46:42 2024 +0800
Allow custom columns in cli dags list (#35250)
* Add last parsed time to cli dags list
* Update dag list test to include last parsed time
* Fix wrong typing for get_last_parsed_time
* Add additional columns and tests
* Add typing
* Replace additional columns to columns for cli dag list. Throw error for
invalid columns
* Update tests for dag list
* Update tests/cli/commands/test_dag_command.py
Co-authored-by: Wei Lee <[email protected]>
* Maiintain column order based on keys
* convert system exist to warning
* Update dag list tests
* Update test for cli list dags with invalid column
* Update airflow/cli/commands/dag_command.py
Co-authored-by: Wei Lee <[email protected]>
* Update airflow/cli/commands/dag_command.py
Co-authored-by: Wei Lee <[email protected]>
* Update airflow/cli/commands/dag_command.py
Co-authored-by: Wei Lee <[email protected]>
* Update airflow/cli/commands/dag_command.py
Co-authored-by: Wei Lee <[email protected]>
* Remove extra colon in failing tests
---------
Co-authored-by: Wei Lee <[email protected]>
---
airflow/cli/cli_config.py | 9 ++++++++-
airflow/cli/commands/dag_command.py | 27 ++++++++++++++++++++-------
tests/cli/commands/test_dag_command.py | 33 +++++++++++++++++++++++++++------
3 files changed, 55 insertions(+), 14 deletions(-)
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 0880a94e07..008d9daecb 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -986,6 +986,13 @@ ARG_CLEAR_ONLY = Arg(
help="If passed, serialized DAGs will be cleared but not reserialized.",
)
+ARG_DAG_LIST_COLUMNS = Arg(
+ ("--columns",),
+ type=string_list_type,
+ help="List of columns to render. (default: ['dag_id', 'fileloc', 'owner',
'is_paused'])",
+ default=("dag_id", "fileloc", "owners", "is_paused"),
+)
+
ALTERNATIVE_CONN_SPECS_ARGS = [
ARG_CONN_TYPE,
ARG_CONN_DESCRIPTION,
@@ -1032,7 +1039,7 @@ DAGS_COMMANDS = (
name="list",
help="List all the DAGs",
func=lazy_load_command("airflow.cli.commands.dag_command.dag_list_dags"),
- args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE),
+ args=(ARG_SUBDIR, ARG_OUTPUT, ARG_VERBOSE, ARG_DAG_LIST_COLUMNS),
),
ActionCommand(
name="list-import-errors",
diff --git a/airflow/cli/commands/dag_command.py
b/airflow/cli/commands/dag_command.py
index a9d6aaf342..4b5f92fd86 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -351,8 +351,20 @@ def dag_next_execution(args) -> None:
@cli_utils.action_cli
@suppress_logs_and_warning
@providers_configuration_loaded
-def dag_list_dags(args) -> None:
+@provide_session
+def dag_list_dags(args, session=NEW_SESSION) -> None:
"""Display dags with or without stats at the command line."""
+ cols = args.columns if args.columns else []
+ invalid_cols = [c for c in cols if c not in dag_schema.fields]
+ valid_cols = [c for c in cols if c in dag_schema.fields]
+ if invalid_cols:
+ from rich import print as rich_print
+
+ rich_print(
+ f"[red][bold]Error:[/bold] Ignoring the following invalid columns:
{invalid_cols}. "
+ f"List of valid columns: {list(dag_schema.fields.keys())}",
+ file=sys.stderr,
+ )
dagbag = DagBag(process_subdir(args.subdir))
if dagbag.import_errors:
from rich import print as rich_print
@@ -362,15 +374,16 @@ def dag_list_dags(args) -> None:
"For details, run `airflow dags list-import-errors`",
file=sys.stderr,
)
+
+ def get_dag_detail(dag: DAG) -> dict:
+ dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
+ dag_detail = dag_schema.dump(dag_model)
+ return {col: dag_detail[col] for col in valid_cols}
+
AirflowConsole().print_as(
data=sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id")),
output=args.output,
- mapper=lambda x: {
- "dag_id": x.dag_id,
- "filepath": x.filepath,
- "owner": x.owner,
- "paused": x.get_is_paused(),
- },
+ mapper=get_dag_detail,
)
diff --git a/tests/cli/commands/test_dag_command.py
b/tests/cli/commands/test_dag_command.py
index 4f16c381ad..0518e35b0d 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -524,15 +524,36 @@ class TestCliDags:
@conf_vars({("core", "load_examples"): "true"})
def test_cli_list_dags(self):
- args = self.parser.parse_args(["dags", "list", "--output", "yaml"])
+ args = self.parser.parse_args(["dags", "list", "--output", "json"])
with contextlib.redirect_stdout(StringIO()) as temp_stdout:
dag_command.dag_list_dags(args)
out = temp_stdout.getvalue()
- assert "owner" in out
- assert "airflow" in out
- assert "paused" in out
- assert "airflow/example_dags/example_complex.py" in out
- assert "- dag_id:" in out
+ dag_list = json.loads(out)
+ for key in ["dag_id", "fileloc", "owners", "is_paused"]:
+ assert key in dag_list[0]
+ assert any("airflow/example_dags/example_complex.py" in d["fileloc"]
for d in dag_list)
+
+ @conf_vars({("core", "load_examples"): "true"})
+ def test_cli_list_dags_custom_cols(self):
+ args = self.parser.parse_args(
+ ["dags", "list", "--output", "json", "--columns",
"dag_id,last_parsed_time"]
+ )
+ with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+ dag_command.dag_list_dags(args)
+ out = temp_stdout.getvalue()
+ dag_list = json.loads(out)
+ for key in ["dag_id", "last_parsed_time"]:
+ assert key in dag_list[0]
+ for key in ["fileloc", "owners", "is_paused"]:
+ assert key not in dag_list[0]
+
+ @conf_vars({("core", "load_examples"): "true"})
+ def test_cli_list_dags_invalid_cols(self):
+ args = self.parser.parse_args(["dags", "list", "--output", "json",
"--columns", "dag_id,invalid_col"])
+ with contextlib.redirect_stderr(StringIO()) as temp_stderr:
+ dag_command.dag_list_dags(args)
+ out = temp_stderr.getvalue()
+ assert "Ignoring the following invalid columns: ['invalid_col']" in out
@conf_vars({("core", "load_examples"): "false"})
def test_cli_list_dags_prints_import_errors(self):