This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a10b98e8ea Add error handling for encoding the dag runs (#40222)
a10b98e8ea is described below
commit a10b98e8eaf068922b705fba28db79ff00508b39
Author: M. Olcay Tercanlı <[email protected]>
AuthorDate: Tue Jul 16 16:51:33 2024 +0200
Add error handling for encoding the dag runs (#40222)
---
.editorconfig | 3 ++
airflow/www/static/js/api/useGridData.ts | 13 +++++++
airflow/www/static/js/dag/grid/index.test.tsx | 1 +
airflow/www/utils.py | 54 +++++++++++++++++----------
airflow/www/views.py | 11 +++++-
tests/www/test_utils.py | 53 +++++++++++++++++++++++++-
tests/www/views/test_views_grid.py | 3 ++
7 files changed, 115 insertions(+), 23 deletions(-)
diff --git a/.editorconfig b/.editorconfig
index c69a40f427..bdc9b52ce6 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -37,6 +37,9 @@ indent_size = 4
[*.js]
indent_size = 2
+[*.ts]
+indent_size = 2
+
[*.css]
indent_size = 2
diff --git a/airflow/www/static/js/api/useGridData.ts
b/airflow/www/static/js/api/useGridData.ts
index 1a633acea3..6bed9dfd71 100644
--- a/airflow/www/static/js/api/useGridData.ts
+++ b/airflow/www/static/js/api/useGridData.ts
@@ -47,6 +47,7 @@ export interface GridData {
dagRuns: DagRun[];
groups: Task;
ordering: RunOrdering;
+ errors: string[];
}
export const emptyGridData: GridData = {
@@ -57,6 +58,7 @@ export const emptyGridData: GridData = {
instances: [],
},
ordering: [],
+ errors: [],
};
const formatOrdering = (data: GridData) => ({
@@ -132,6 +134,17 @@ const useGridData = () => {
}
// turn off auto refresh if there are no active runs
if (!areActiveRuns(response.dagRuns)) stopRefresh();
+ // if any errors returned then show as toast message
+ if (response.errors.length > 0) {
+ response.errors.forEach((errorMsg) => {
+ const error = Error(errorMsg);
+ errorToast({
+ title: "Error",
+ error,
+ });
+ });
+ }
+
return response;
},
{
diff --git a/airflow/www/static/js/dag/grid/index.test.tsx
b/airflow/www/static/js/dag/grid/index.test.tsx
index ece090f7c8..b6eb000b76 100644
--- a/airflow/www/static/js/dag/grid/index.test.tsx
+++ b/airflow/www/static/js/dag/grid/index.test.tsx
@@ -156,6 +156,7 @@ const mockGridData = {
},
],
ordering: ["dataIntervalStart"],
+ errors: [],
} as useGridDataModule.GridData;
const EXPAND = "Expand all task groups";
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index f361381fcc..413d9fe2b6 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import json
+import logging
import textwrap
import time
from typing import TYPE_CHECKING, Any, Callable, Sequence
@@ -67,6 +68,8 @@ if TYPE_CHECKING:
TI = TaskInstance
+logger = logging.getLogger(__name__)
+
def datetime_to_string(value: DateTime | None) -> str | None:
if value is None:
@@ -163,28 +166,39 @@ def get_dag_run_conf(
def encode_dag_run(
dag_run: DagRun | None, *, json_encoder: type[json.JSONEncoder] =
json.JSONEncoder
-) -> dict[str, Any] | None:
+) -> tuple[dict[str, Any] | None, None | str]:
if not dag_run:
- return None
-
- dag_run_conf, conf_is_json = get_dag_run_conf(dag_run.conf,
json_encoder=json_encoder)
+ return None, None
+
+ try:
+ dag_run_conf, conf_is_json = get_dag_run_conf(dag_run.conf,
json_encoder=json_encoder)
+ encoded_dag_run = {
+ "run_id": dag_run.run_id,
+ "queued_at": datetime_to_string(dag_run.queued_at),
+ "start_date": datetime_to_string(dag_run.start_date),
+ "end_date": datetime_to_string(dag_run.end_date),
+ "state": dag_run.state,
+ "execution_date": datetime_to_string(dag_run.execution_date),
+ "data_interval_start":
datetime_to_string(dag_run.data_interval_start),
+ "data_interval_end": datetime_to_string(dag_run.data_interval_end),
+ "run_type": dag_run.run_type,
+ "last_scheduling_decision":
datetime_to_string(dag_run.last_scheduling_decision),
+ "external_trigger": dag_run.external_trigger,
+ "conf": dag_run_conf,
+ "conf_is_json": conf_is_json,
+ "note": dag_run.note,
+ }
+ except ValueError as e:
+ logger.error("Error while encoding the DAG Run!", exc_info=e)
+ if str(e) == "Circular reference detected":
+ return None, (
+ f"Circular reference detected in the DAG Run config
(#{dag_run.run_id}). "
+ f"You should check your webserver logs for more details."
+ )
+ else:
+ raise e
- return {
- "run_id": dag_run.run_id,
- "queued_at": datetime_to_string(dag_run.queued_at),
- "start_date": datetime_to_string(dag_run.start_date),
- "end_date": datetime_to_string(dag_run.end_date),
- "state": dag_run.state,
- "execution_date": datetime_to_string(dag_run.execution_date),
- "data_interval_start": datetime_to_string(dag_run.data_interval_start),
- "data_interval_end": datetime_to_string(dag_run.data_interval_end),
- "run_type": dag_run.run_type,
- "last_scheduling_decision":
datetime_to_string(dag_run.last_scheduling_decision),
- "external_trigger": dag_run.external_trigger,
- "conf": dag_run_conf,
- "conf_is_json": conf_is_json,
- "note": dag_run.note,
- }
+ return encoded_dag_run, None
def check_import_errors(fileloc, session):
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 7b16975101..6d54cf93c0 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3350,15 +3350,24 @@ class Airflow(AirflowBaseView):
if run_states:
query = query.where(DagRun.state.in_(run_states))
+ # Retrieve, sort and encode the previous DAG Runs
dag_runs = wwwutils.sorted_dag_runs(
query, ordering=dag.timetable.run_ordering, limit=num_runs,
session=session
)
+ encoded_runs = []
+ encoding_errors = []
+ for dr in dag_runs:
+ encoded_dr, error = wwwutils.encode_dag_run(dr,
json_encoder=utils_json.WebEncoder)
+ if error:
+ encoding_errors.append(error)
+ else:
+ encoded_runs.append(encoded_dr)
- encoded_runs = [wwwutils.encode_dag_run(dr,
json_encoder=utils_json.WebEncoder) for dr in dag_runs]
data = {
"groups": dag_to_grid(dag, dag_runs, session),
"dag_runs": encoded_runs,
"ordering": dag.timetable.run_ordering,
+ "errors": encoding_errors,
}
# avoid spaces to reduce payload size
return (
diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py
index a90d924699..7d9ce76801 100644
--- a/tests/www/test_utils.py
+++ b/tests/www/test_utils.py
@@ -34,7 +34,13 @@ from sqlalchemy.orm import Query
from airflow.models import DagRun
from airflow.utils import json as utils_json
from airflow.www import utils
-from airflow.www.utils import CustomSQLAInterface, DagRunCustomSQLAInterface,
json_f, wrapped_markdown
+from airflow.www.utils import (
+ CustomSQLAInterface,
+ DagRunCustomSQLAInterface,
+ encode_dag_run,
+ json_f,
+ wrapped_markdown,
+)
from tests.test_utils.config import conf_vars
@@ -321,7 +327,7 @@ class TestAttrRenderer:
def test_encode_dag_run_none(self):
no_dag_run_result = utils.encode_dag_run(None)
- assert no_dag_run_result is None
+ assert no_dag_run_result == (None, None)
def test_json_f_webencoder(self):
dag_run_conf = {
@@ -537,6 +543,49 @@ class TestWrappedMarkdown:
assert escape(HTML) in rendered
+ @pytest.mark.parametrize(
+ "dag_run,expected_val",
+ [
+ [None, (None, None)],
+ [
+ DagRun(run_id="run_id_1", conf={}),
+ (
+ {
+ "conf": None,
+ "conf_is_json": False,
+ "data_interval_end": None,
+ "data_interval_start": None,
+ "end_date": None,
+ "execution_date": None,
+ "external_trigger": None,
+ "last_scheduling_decision": None,
+ "note": None,
+ "queued_at": None,
+ "run_id": "run_id_1",
+ "run_type": None,
+ "start_date": None,
+ "state": None,
+ },
+ None,
+ ),
+ ],
+ ],
+ )
+ def test_encode_dag_run(self, dag_run, expected_val):
+ val = encode_dag_run(dag_run)
+ assert val == expected_val
+
+ def test_encode_dag_run_circular_reference(self):
+ conf = {}
+ conf["a"] = conf
+ dr = DagRun(run_id="run_id_1", conf=conf)
+ encoded_dr, error = encode_dag_run(dr)
+ assert encoded_dr is None
+ assert error == (
+ f"Circular reference detected in the DAG Run config
(#{dr.run_id}). "
+ f"You should check your webserver logs for more details."
+ )
+
class TestFilter:
def setup_method(self):
diff --git a/tests/www/views/test_views_grid.py
b/tests/www/views/test_views_grid.py
index 3d13dea4d1..0b82279880 100644
--- a/tests/www/views/test_views_grid.py
+++ b/tests/www/views/test_views_grid.py
@@ -160,6 +160,7 @@ def test_no_runs(admin_client, dag_without_runs):
"label": None,
},
"ordering": ["data_interval_end", "execution_date"],
+ "errors": [],
}
@@ -406,6 +407,7 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun],
session):
"label": None,
},
"ordering": ["data_interval_end", "execution_date"],
+ "errors": [],
}
@@ -459,6 +461,7 @@ def test_has_outlet_dataset_flag(admin_client, dag_maker,
session, app, monkeypa
"label": None,
},
"ordering": ["data_interval_end", "execution_date"],
+ "errors": [],
}