This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 06e4672bf44376b1dc146bc6cc37917460448ede
Author: maahir22 <[email protected]>
AuthorDate: Fri Apr 14 22:44:48 2023 +0530

    Add command to get DAG Details via CLI (#30432)
    
    ---------
    
    Co-authored-by: Hussein Awala <[email protected]>
    Co-authored-by: Hussein Awala <[email protected]>
    (cherry picked from commit 7074167d71c93b69361d24c1121adc7419367f2a)
---
 airflow/cli/cli_config.py              |  6 ++++++
 airflow/cli/commands/dag_command.py    | 22 ++++++++++++++++++++++
 tests/cli/commands/test_dag_command.py | 20 ++++++++++++++++++++
 3 files changed, 48 insertions(+)

diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index e08b4e01ec..ffcfd92de0 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -1101,6 +1101,12 @@ class GroupCommand(NamedTuple):
 CLICommand = Union[ActionCommand, GroupCommand]
 
 DAGS_COMMANDS = (
+    ActionCommand(
+        name="details",
+        help="Get DAG details given a DAG id",
+        func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"),
+        args=(ARG_DAG_ID, ARG_OUTPUT, ARG_VERBOSE),
+    ),
     ActionCommand(
         name="list",
         help="List all the DAGs",
diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index 8735a0e522..c81f1c0a47 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -32,6 +32,7 @@ from sqlalchemy.orm import Session
 
 from airflow import settings
 from airflow.api.client import get_current_api_client
+from airflow.api_connexion.schemas.dag_schema import dag_schema
 from airflow.cli.simple_table import AirflowConsole
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
@@ -346,6 +347,27 @@ def dag_list_dags(args) -> None:
     )
 
 
+@cli_utils.action_cli
+@suppress_logs_and_warning
+@provide_session
+def dag_details(args, session=NEW_SESSION):
+    """Get DAG details given a DAG id"""
+    dag = DagModel.get_dagmodel(args.dag_id, session=session)
+    if not dag:
+        raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
+    dag_detail = dag_schema.dump(dag)
+
+    if args.output in ["table", "plain"]:
+        data = [{"property_name": key, "property_value": value} for key, value 
in dag_detail.items()]
+    else:
+        data = [dag_detail]
+
+    AirflowConsole().print_as(
+        data=data,
+        output=args.output,
+    )
+
+
 @cli_utils.action_cli
 @suppress_logs_and_warning
 def dag_list_import_errors(args) -> None:
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 6b4e2187fa..a54d26a993 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -31,6 +31,7 @@ import pytest
 import time_machine
 
 from airflow import settings
+from airflow.api_connexion.schemas.dag_schema import DAGSchema
 from airflow.cli import cli_parser
 from airflow.cli.commands import dag_command
 from airflow.exceptions import AirflowException
@@ -470,6 +471,25 @@ class TestCliDags:
         assert "airflow/example_dags/example_complex.py" in out
         assert "example_complex" in out
 
+    @conf_vars({("core", "load_examples"): "true"})
+    def test_cli_get_dag_details(self):
+        args = self.parser.parse_args(["dags", "details", "example_complex", 
"--output", "yaml"])
+        with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
+            dag_command.dag_details(args)
+            out = temp_stdout.getvalue()
+
+        dag_detail_fields = DAGSchema().fields.keys()
+
+        # Check if DAG Details field are present
+        for field in dag_detail_fields:
+            assert field in out
+
+        # Check if identifying values are present
+        dag_details_values = ["airflow", 
"airflow/example_dags/example_complex.py", "16", "example_complex"]
+
+        for value in dag_details_values:
+            assert value in out
+
     @conf_vars({("core", "load_examples"): "true"})
     def test_cli_list_dags(self):
         args = self.parser.parse_args(["dags", "list", "--output", "yaml"])

Reply via email to