This is an automated email from the ASF dual-hosted git repository.

pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fd68f0505f Fix list dags command for get_dagmodel is None (#36739)
fd68f0505f is described below

commit fd68f0505f665b1c96ef2fc8620ee9a45c230f99
Author: Pankaj Singh <[email protected]>
AuthorDate: Fri Jan 12 19:24:54 2024 +0530

    Fix list dags command for get_dagmodel is None (#36739)
    
    * Fix list dag command for get_dagmodel is None
    
    In the dags list command, we initially retrieve the dag_id from the dagbag. 
For each DAG, we fetch the corresponding DAG details from the metadata 
database. If the data at both locations is not in sync, the 
DagModel.get_dagmodel function returns None, leading to a serialization 
failure. This PR trying to use the field available from dagbag to initialise 
dag details. some fields are not available and I'm setting the None value for 
those
---
 airflow/cli/commands/dag_command.py    | 39 +++++++++++++++++++++++++++++++++-
 tests/cli/commands/test_dag_command.py | 22 ++++++++++++++++++-
 2 files changed, 59 insertions(+), 2 deletions(-)

diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index 4b5f92fd86..52e4fac30e 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -285,6 +285,40 @@ def _save_dot_to_file(dot: Dot, filename: str) -> None:
     print(f"File {filename} saved")
 
 
+def _get_dagbag_dag_details(dag: DAG) -> dict:
+    """Return a dagbag dag details dict."""
+    return {
+        "dag_id": dag.dag_id,
+        "root_dag_id": dag.parent_dag.dag_id if dag.parent_dag else None,
+        "is_paused": dag.get_is_paused(),
+        "is_active": dag.get_is_active(),
+        "is_subdag": dag.is_subdag,
+        "last_parsed_time": None,
+        "last_pickled": None,
+        "last_expired": None,
+        "scheduler_lock": None,
+        "pickle_id": dag.pickle_id,
+        "default_view": dag.default_view,
+        "fileloc": dag.fileloc,
+        "file_token": None,
+        "owners": dag.owner,
+        "description": dag.description,
+        "schedule_interval": dag.schedule_interval,
+        "timetable_description": dag.timetable.description,
+        "tags": dag.tags,
+        "max_active_tasks": dag.max_active_tasks,
+        "max_active_runs": dag.max_active_runs,
+        "has_task_concurrency_limits": any(
+            t.max_active_tis_per_dag is not None or 
t.max_active_tis_per_dagrun is not None for t in dag.tasks
+        ),
+        "has_import_errors": False,
+        "next_dagrun": None,
+        "next_dagrun_data_interval_start": None,
+        "next_dagrun_data_interval_end": None,
+        "next_dagrun_create_after": None,
+    }
+
+
 @cli_utils.action_cli
 @providers_configuration_loaded
 @provide_session
@@ -377,7 +411,10 @@ def dag_list_dags(args, session=NEW_SESSION) -> None:
 
     def get_dag_detail(dag: DAG) -> dict:
         dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
-        dag_detail = dag_schema.dump(dag_model)
+        if dag_model:
+            dag_detail = dag_schema.dump(dag_model)
+        else:
+            dag_detail = _get_dagbag_dag_details(dag)
         return {col: dag_detail[col] for col in valid_cols}
 
     AirflowConsole().print_as(
diff --git a/tests/cli/commands/test_dag_command.py 
b/tests/cli/commands/test_dag_command.py
index 9531286a33..09b8164ee1 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -30,7 +30,7 @@ import pytest
 import time_machine
 
 from airflow import settings
-from airflow.api_connexion.schemas.dag_schema import DAGSchema
+from airflow.api_connexion.schemas.dag_schema import DAGSchema, dag_schema
 from airflow.cli import cli_parser
 from airflow.cli.commands import dag_command
 from airflow.decorators import task
@@ -565,6 +565,26 @@ class TestCliDags:
             out = temp_stderr.getvalue()
         assert "Failed to load all files." in out
 
+    @conf_vars({("core", "load_examples"): "true"})
+    @mock.patch("airflow.models.DagModel.get_dagmodel")
+    def test_list_dags_none_get_dagmodel(self, mock_get_dagmodel):
+        mock_get_dagmodel.return_value = None
+        args = self.parser.parse_args(["dags", "list", "--output", "json"])
+        with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+            dag_command.dag_list_dags(args)
+            out = temp_stdout.getvalue()
+            dag_list = json.loads(out)
+        for key in ["dag_id", "fileloc", "owners", "is_paused"]:
+            assert key in dag_list[0]
+        assert any("airflow/example_dags/example_complex.py" in d["fileloc"] 
for d in dag_list)
+
+    @conf_vars({("core", "load_examples"): "true"})
+    def test_dagbag_dag_col(self):
+        valid_cols = [c for c in dag_schema.fields]
+        dagbag = DagBag(include_examples=True)
+        dag_details = 
dag_command._get_dagbag_dag_details(dagbag.get_dag("tutorial_dag"))
+        assert list(dag_details.keys()) == valid_cols
+
     @conf_vars({("core", "load_examples"): "false"})
     def test_cli_list_import_errors(self):
         dag_path = os.path.join(TEST_DAGS_FOLDER, "test_invalid_cron.py")

Reply via email to