This is an automated email from the ASF dual-hosted git repository.
pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fd68f0505f Fix list dags command for get_dagmodel is None (#36739)
fd68f0505f is described below
commit fd68f0505f665b1c96ef2fc8620ee9a45c230f99
Author: Pankaj Singh <[email protected]>
AuthorDate: Fri Jan 12 19:24:54 2024 +0530
Fix list dags command for get_dagmodel is None (#36739)
* Fix list dag command for get_dagmodel is None
In the dags list command, we initially retrieve the dag_id from the dagbag.
For each DAG, we fetch the corresponding DAG details from the metadata
database. If the data at both locations is not in sync, the
DagModel.get_dagmodel function returns None, leading to a serialization
failure. This PR trying to use the field available from dagbag to initialise
dag details. some fields are not available and I'm setting the None value for
those
---
airflow/cli/commands/dag_command.py | 39 +++++++++++++++++++++++++++++++++-
tests/cli/commands/test_dag_command.py | 22 ++++++++++++++++++-
2 files changed, 59 insertions(+), 2 deletions(-)
diff --git a/airflow/cli/commands/dag_command.py
b/airflow/cli/commands/dag_command.py
index 4b5f92fd86..52e4fac30e 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -285,6 +285,40 @@ def _save_dot_to_file(dot: Dot, filename: str) -> None:
print(f"File {filename} saved")
+def _get_dagbag_dag_details(dag: DAG) -> dict:
+ """Return a dagbag dag details dict."""
+ return {
+ "dag_id": dag.dag_id,
+ "root_dag_id": dag.parent_dag.dag_id if dag.parent_dag else None,
+ "is_paused": dag.get_is_paused(),
+ "is_active": dag.get_is_active(),
+ "is_subdag": dag.is_subdag,
+ "last_parsed_time": None,
+ "last_pickled": None,
+ "last_expired": None,
+ "scheduler_lock": None,
+ "pickle_id": dag.pickle_id,
+ "default_view": dag.default_view,
+ "fileloc": dag.fileloc,
+ "file_token": None,
+ "owners": dag.owner,
+ "description": dag.description,
+ "schedule_interval": dag.schedule_interval,
+ "timetable_description": dag.timetable.description,
+ "tags": dag.tags,
+ "max_active_tasks": dag.max_active_tasks,
+ "max_active_runs": dag.max_active_runs,
+ "has_task_concurrency_limits": any(
+ t.max_active_tis_per_dag is not None or
t.max_active_tis_per_dagrun is not None for t in dag.tasks
+ ),
+ "has_import_errors": False,
+ "next_dagrun": None,
+ "next_dagrun_data_interval_start": None,
+ "next_dagrun_data_interval_end": None,
+ "next_dagrun_create_after": None,
+ }
+
+
@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
@@ -377,7 +411,10 @@ def dag_list_dags(args, session=NEW_SESSION) -> None:
def get_dag_detail(dag: DAG) -> dict:
dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
- dag_detail = dag_schema.dump(dag_model)
+ if dag_model:
+ dag_detail = dag_schema.dump(dag_model)
+ else:
+ dag_detail = _get_dagbag_dag_details(dag)
return {col: dag_detail[col] for col in valid_cols}
AirflowConsole().print_as(
diff --git a/tests/cli/commands/test_dag_command.py
b/tests/cli/commands/test_dag_command.py
index 9531286a33..09b8164ee1 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -30,7 +30,7 @@ import pytest
import time_machine
from airflow import settings
-from airflow.api_connexion.schemas.dag_schema import DAGSchema
+from airflow.api_connexion.schemas.dag_schema import DAGSchema, dag_schema
from airflow.cli import cli_parser
from airflow.cli.commands import dag_command
from airflow.decorators import task
@@ -565,6 +565,26 @@ class TestCliDags:
out = temp_stderr.getvalue()
assert "Failed to load all files." in out
+ @conf_vars({("core", "load_examples"): "true"})
+ @mock.patch("airflow.models.DagModel.get_dagmodel")
+ def test_list_dags_none_get_dagmodel(self, mock_get_dagmodel):
+ mock_get_dagmodel.return_value = None
+ args = self.parser.parse_args(["dags", "list", "--output", "json"])
+ with contextlib.redirect_stdout(StringIO()) as temp_stdout:
+ dag_command.dag_list_dags(args)
+ out = temp_stdout.getvalue()
+ dag_list = json.loads(out)
+ for key in ["dag_id", "fileloc", "owners", "is_paused"]:
+ assert key in dag_list[0]
+ assert any("airflow/example_dags/example_complex.py" in d["fileloc"]
for d in dag_list)
+
+ @conf_vars({("core", "load_examples"): "true"})
+ def test_dagbag_dag_col(self):
+ valid_cols = [c for c in dag_schema.fields]
+ dagbag = DagBag(include_examples=True)
+ dag_details =
dag_command._get_dagbag_dag_details(dagbag.get_dag("tutorial_dag"))
+ assert list(dag_details.keys()) == valid_cols
+
@conf_vars({("core", "load_examples"): "false"})
def test_cli_list_import_errors(self):
dag_path = os.path.join(TEST_DAGS_FOLDER, "test_invalid_cron.py")