This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a10b98e8ea Add error handling for encoding the dag runs (#40222)
a10b98e8ea is described below

commit a10b98e8eaf068922b705fba28db79ff00508b39
Author: M. Olcay Tercanlı <[email protected]>
AuthorDate: Tue Jul 16 16:51:33 2024 +0200

    Add error handling for encoding the dag runs (#40222)
---
 .editorconfig                                 |  3 ++
 airflow/www/static/js/api/useGridData.ts      | 13 +++++++
 airflow/www/static/js/dag/grid/index.test.tsx |  1 +
 airflow/www/utils.py                          | 54 +++++++++++++++++----------
 airflow/www/views.py                          | 11 +++++-
 tests/www/test_utils.py                       | 53 +++++++++++++++++++++++++-
 tests/www/views/test_views_grid.py            |  3 ++
 7 files changed, 115 insertions(+), 23 deletions(-)

diff --git a/.editorconfig b/.editorconfig
index c69a40f427..bdc9b52ce6 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -37,6 +37,9 @@ indent_size = 4
 [*.js]
 indent_size = 2
 
+[*.ts]
+indent_size = 2
+
 [*.css]
 indent_size = 2
 
diff --git a/airflow/www/static/js/api/useGridData.ts 
b/airflow/www/static/js/api/useGridData.ts
index 1a633acea3..6bed9dfd71 100644
--- a/airflow/www/static/js/api/useGridData.ts
+++ b/airflow/www/static/js/api/useGridData.ts
@@ -47,6 +47,7 @@ export interface GridData {
   dagRuns: DagRun[];
   groups: Task;
   ordering: RunOrdering;
+  errors: string[];
 }
 
 export const emptyGridData: GridData = {
@@ -57,6 +58,7 @@ export const emptyGridData: GridData = {
     instances: [],
   },
   ordering: [],
+  errors: [],
 };
 
 const formatOrdering = (data: GridData) => ({
@@ -132,6 +134,17 @@ const useGridData = () => {
       }
       // turn off auto refresh if there are no active runs
       if (!areActiveRuns(response.dagRuns)) stopRefresh();
+      // if any errors returned then show as toast message
+      if (response.errors.length > 0) {
+        response.errors.forEach((errorMsg) => {
+          const error = Error(errorMsg);
+          errorToast({
+            title: "Error",
+            error,
+          });
+        });
+      }
+
       return response;
     },
     {
diff --git a/airflow/www/static/js/dag/grid/index.test.tsx 
b/airflow/www/static/js/dag/grid/index.test.tsx
index ece090f7c8..b6eb000b76 100644
--- a/airflow/www/static/js/dag/grid/index.test.tsx
+++ b/airflow/www/static/js/dag/grid/index.test.tsx
@@ -156,6 +156,7 @@ const mockGridData = {
     },
   ],
   ordering: ["dataIntervalStart"],
+  errors: [],
 } as useGridDataModule.GridData;
 
 const EXPAND = "Expand all task groups";
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index f361381fcc..413d9fe2b6 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import json
+import logging
 import textwrap
 import time
 from typing import TYPE_CHECKING, Any, Callable, Sequence
@@ -67,6 +68,8 @@ if TYPE_CHECKING:
 
 TI = TaskInstance
 
+logger = logging.getLogger(__name__)
+
 
 def datetime_to_string(value: DateTime | None) -> str | None:
     if value is None:
@@ -163,28 +166,39 @@ def get_dag_run_conf(
 
 def encode_dag_run(
     dag_run: DagRun | None, *, json_encoder: type[json.JSONEncoder] = 
json.JSONEncoder
-) -> dict[str, Any] | None:
+) -> tuple[dict[str, Any] | None, None | str]:
     if not dag_run:
-        return None
-
-    dag_run_conf, conf_is_json = get_dag_run_conf(dag_run.conf, 
json_encoder=json_encoder)
+        return None, None
+
+    try:
+        dag_run_conf, conf_is_json = get_dag_run_conf(dag_run.conf, 
json_encoder=json_encoder)
+        encoded_dag_run = {
+            "run_id": dag_run.run_id,
+            "queued_at": datetime_to_string(dag_run.queued_at),
+            "start_date": datetime_to_string(dag_run.start_date),
+            "end_date": datetime_to_string(dag_run.end_date),
+            "state": dag_run.state,
+            "execution_date": datetime_to_string(dag_run.execution_date),
+            "data_interval_start": 
datetime_to_string(dag_run.data_interval_start),
+            "data_interval_end": datetime_to_string(dag_run.data_interval_end),
+            "run_type": dag_run.run_type,
+            "last_scheduling_decision": 
datetime_to_string(dag_run.last_scheduling_decision),
+            "external_trigger": dag_run.external_trigger,
+            "conf": dag_run_conf,
+            "conf_is_json": conf_is_json,
+            "note": dag_run.note,
+        }
+    except ValueError as e:
+        logger.error("Error while encoding the DAG Run!", exc_info=e)
+        if str(e) == "Circular reference detected":
+            return None, (
+                f"Circular reference detected in the DAG Run config 
(#{dag_run.run_id}). "
+                f"You should check your webserver logs for more details."
+            )
+        else:
+            raise e
 
-    return {
-        "run_id": dag_run.run_id,
-        "queued_at": datetime_to_string(dag_run.queued_at),
-        "start_date": datetime_to_string(dag_run.start_date),
-        "end_date": datetime_to_string(dag_run.end_date),
-        "state": dag_run.state,
-        "execution_date": datetime_to_string(dag_run.execution_date),
-        "data_interval_start": datetime_to_string(dag_run.data_interval_start),
-        "data_interval_end": datetime_to_string(dag_run.data_interval_end),
-        "run_type": dag_run.run_type,
-        "last_scheduling_decision": 
datetime_to_string(dag_run.last_scheduling_decision),
-        "external_trigger": dag_run.external_trigger,
-        "conf": dag_run_conf,
-        "conf_is_json": conf_is_json,
-        "note": dag_run.note,
-    }
+    return encoded_dag_run, None
 
 
 def check_import_errors(fileloc, session):
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 7b16975101..6d54cf93c0 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3350,15 +3350,24 @@ class Airflow(AirflowBaseView):
         if run_states:
             query = query.where(DagRun.state.in_(run_states))
 
+        # Retrieve, sort and encode the previous DAG Runs
         dag_runs = wwwutils.sorted_dag_runs(
             query, ordering=dag.timetable.run_ordering, limit=num_runs, 
session=session
         )
+        encoded_runs = []
+        encoding_errors = []
+        for dr in dag_runs:
+            encoded_dr, error = wwwutils.encode_dag_run(dr, 
json_encoder=utils_json.WebEncoder)
+            if error:
+                encoding_errors.append(error)
+            else:
+                encoded_runs.append(encoded_dr)
 
-        encoded_runs = [wwwutils.encode_dag_run(dr, 
json_encoder=utils_json.WebEncoder) for dr in dag_runs]
         data = {
             "groups": dag_to_grid(dag, dag_runs, session),
             "dag_runs": encoded_runs,
             "ordering": dag.timetable.run_ordering,
+            "errors": encoding_errors,
         }
         # avoid spaces to reduce payload size
         return (
diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py
index a90d924699..7d9ce76801 100644
--- a/tests/www/test_utils.py
+++ b/tests/www/test_utils.py
@@ -34,7 +34,13 @@ from sqlalchemy.orm import Query
 from airflow.models import DagRun
 from airflow.utils import json as utils_json
 from airflow.www import utils
-from airflow.www.utils import CustomSQLAInterface, DagRunCustomSQLAInterface, 
json_f, wrapped_markdown
+from airflow.www.utils import (
+    CustomSQLAInterface,
+    DagRunCustomSQLAInterface,
+    encode_dag_run,
+    json_f,
+    wrapped_markdown,
+)
 from tests.test_utils.config import conf_vars
 
 
@@ -321,7 +327,7 @@ class TestAttrRenderer:
 
     def test_encode_dag_run_none(self):
         no_dag_run_result = utils.encode_dag_run(None)
-        assert no_dag_run_result is None
+        assert no_dag_run_result == (None, None)
 
     def test_json_f_webencoder(self):
         dag_run_conf = {
@@ -537,6 +543,49 @@ class TestWrappedMarkdown:
 
                 assert escape(HTML) in rendered
 
+    @pytest.mark.parametrize(
+        "dag_run,expected_val",
+        [
+            [None, (None, None)],
+            [
+                DagRun(run_id="run_id_1", conf={}),
+                (
+                    {
+                        "conf": None,
+                        "conf_is_json": False,
+                        "data_interval_end": None,
+                        "data_interval_start": None,
+                        "end_date": None,
+                        "execution_date": None,
+                        "external_trigger": None,
+                        "last_scheduling_decision": None,
+                        "note": None,
+                        "queued_at": None,
+                        "run_id": "run_id_1",
+                        "run_type": None,
+                        "start_date": None,
+                        "state": None,
+                    },
+                    None,
+                ),
+            ],
+        ],
+    )
+    def test_encode_dag_run(self, dag_run, expected_val):
+        val = encode_dag_run(dag_run)
+        assert val == expected_val
+
+    def test_encode_dag_run_circular_reference(self):
+        conf = {}
+        conf["a"] = conf
+        dr = DagRun(run_id="run_id_1", conf=conf)
+        encoded_dr, error = encode_dag_run(dr)
+        assert encoded_dr is None
+        assert error == (
+            f"Circular reference detected in the DAG Run config 
(#{dr.run_id}). "
+            f"You should check your webserver logs for more details."
+        )
+
 
 class TestFilter:
     def setup_method(self):
diff --git a/tests/www/views/test_views_grid.py 
b/tests/www/views/test_views_grid.py
index 3d13dea4d1..0b82279880 100644
--- a/tests/www/views/test_views_grid.py
+++ b/tests/www/views/test_views_grid.py
@@ -160,6 +160,7 @@ def test_no_runs(admin_client, dag_without_runs):
             "label": None,
         },
         "ordering": ["data_interval_end", "execution_date"],
+        "errors": [],
     }
 
 
@@ -406,6 +407,7 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun], 
session):
             "label": None,
         },
         "ordering": ["data_interval_end", "execution_date"],
+        "errors": [],
     }
 
 
@@ -459,6 +461,7 @@ def test_has_outlet_dataset_flag(admin_client, dag_maker, 
session, app, monkeypa
             "label": None,
         },
         "ordering": ["data_interval_end", "execution_date"],
+        "errors": [],
     }
 
 

Reply via email to