This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 84c40a7877 Prevent assignment of non JSON serializable values to
DagRun.conf dict (#35096)
84c40a7877 is described below
commit 84c40a7877e5ea9dbee03b707065cb590f872111
Author: Pavel Nikerov <[email protected]>
AuthorDate: Tue Nov 14 23:46:00 2023 +0300
Prevent assignment of non JSON serializable values to DagRun.conf dict
(#35096)
* Update dagrun.py
ConfDict class added, conf attribute updated
Error description fixed
back to previous annotation
conf fixed
Exception description fixed
Error fixed
Test added
hybrid_property for column conf has been added
declared_attr added
Docstring Content Issue fixed
Codestyle fixed
trailing whitespace removed
line break added
---
airflow/models/dagrun.py | 51 +++++++++++++++++++++++++++++++++++++++++++--
tests/models/test_dagrun.py | 14 +++++++++++++
2 files changed, 63 insertions(+), 2 deletions(-)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 50f0c9760c..898d266390 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import itertools
+import json
import os
import warnings
from collections import defaultdict
@@ -97,6 +98,37 @@ class TISchedulingDecision(NamedTuple):
finished_tis: list[TI]
+class ConfDict(dict):
+ """Custom dictionary for storing only JSON serializable values."""
+
+ def __init__(self, val=None):
+ super().__init__(self.is_jsonable(val))
+
+ def __setitem__(self, key, value):
+ self.is_jsonable({key: value})
+ super().__setitem__(key, value)
+
+ @staticmethod
+ def is_jsonable(conf: dict) -> dict | None:
+ """Prevent setting non-json attributes."""
+ try:
+ json.dumps(conf)
+ except TypeError:
+ raise AirflowException("Cannot assign non JSON Serializable value")
+ if isinstance(conf, dict):
+ return conf
+ else:
+ raise AirflowException(f"Object of type {type(conf)} must be a
dict")
+
+ @staticmethod
+ def dump_check(conf: str) -> str:
+ val = json.loads(conf)
+ if isinstance(val, dict):
+ return conf
+ else:
+ raise TypeError(f"Object of type {type(val)} must be a dict")
+
+
def _creator_note(val):
"""Creator the ``note`` association proxy."""
if isinstance(val, str):
@@ -127,7 +159,7 @@ class DagRun(Base, LoggingMixin):
creating_job_id = Column(Integer)
external_trigger = Column(Boolean, default=True)
run_type = Column(String(50), nullable=False)
- conf = Column(PickleType)
+ _conf = Column("conf", PickleType)
# These two must be either both NULL or both datetime.
data_interval_start = Column(UtcDateTime)
data_interval_end = Column(UtcDateTime)
@@ -229,7 +261,12 @@ class DagRun(Base, LoggingMixin):
self.execution_date = execution_date
self.start_date = start_date
self.external_trigger = external_trigger
- self.conf = conf or {}
+
+ if isinstance(conf, str):
+ self._conf = ConfDict.dump_check(conf)
+ else:
+ self._conf = ConfDict(conf or {})
+
if state is not None:
self.state = state
if queued_at is NOTSET:
@@ -259,6 +296,16 @@ class DagRun(Base, LoggingMixin):
)
return run_id
+ def get_conf(self):
+ return self._conf
+
+ def set_conf(self, value):
+ self._conf = ConfDict(value)
+
+ @declared_attr
+ def conf(self):
+ return synonym("_conf", descriptor=property(self.get_conf,
self.set_conf))
+
@property
def stats_tags(self) -> dict[str, str]:
return prune_dict({"dag_id": self.dag_id, "run_type": self.run_type})
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 5732e0d565..cb873b0bc3 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -2618,3 +2618,17 @@ def test_dag_run_id_config(session, dag_maker, pattern,
run_id, result):
else:
with pytest.raises(AirflowException):
dag_maker.create_dagrun(run_id=run_id)
+
+
+def test_dagrun_conf():
+ dag_run = DagRun(conf={"test": 1234})
+ assert dag_run.conf == {"test": 1234}
+
+ with pytest.raises(AirflowException) as err:
+ dag_run.conf["non_json"] = timezone.utcnow()
+ assert str(err.value) == "Cannot assign non JSON Serializable value"
+
+ with pytest.raises(AirflowException) as err:
+ value = 1
+ dag_run.conf = value
+ assert str(err.value) == f"Object of type {type(value)} must be a dict"