This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new b4885b2 Only allow passing JSON Serializable conf to
TriggerDagRunOperator (#13964)
b4885b2 is described below
commit b4885b25871ae7ede2028f81b0d88def3e22f23a
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Jan 29 16:24:46 2021 +0000
Only allow passing JSON Serializable conf to TriggerDagRunOperator (#13964)
closes https://github.com/apache/airflow/issues/13414
---
airflow/operators/trigger_dagrun.py | 6 ++++++
tests/operators/test_trigger_dagrun.py | 11 +++++++++++
2 files changed, 17 insertions(+)
diff --git a/airflow/operators/trigger_dagrun.py
b/airflow/operators/trigger_dagrun.py
index 63d3361..3cf7a4f 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -17,6 +17,7 @@
# under the License.
import datetime
+import json
import time
from typing import Dict, List, Optional, Union
@@ -108,6 +109,11 @@ class TriggerDagRunOperator(BaseOperator):
self.execution_date: Optional[datetime.datetime] = execution_date #
type: ignore
+ try:
+ json.dumps(self.conf)
+ except TypeError:
+ raise AirflowException("conf parameter should be JSON
Serializable")
+
def execute(self, context: Dict):
if isinstance(self.execution_date, datetime.datetime):
execution_date = self.execution_date
diff --git a/tests/operators/test_trigger_dagrun.py
b/tests/operators/test_trigger_dagrun.py
index c17d43c..1bdc59b 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -164,6 +164,17 @@ class TestDagRunOperator(TestCase):
assert len(dagruns) == 1
assert dagruns[0].conf, {"foo": "bar"}
+ def test_trigger_dagrun_operator_templated_invalid_conf(self):
+ """Test passing a conf that is not JSON Serializable raise error."""
+
+ with pytest.raises(AirflowException, match="^conf parameter should be
JSON Serializable$"):
+ TriggerDagRunOperator(
+ task_id="test_trigger_dagrun_with_invalid_conf",
+ trigger_dag_id=TRIGGERED_DAG_ID,
+ conf={"foo": "{{ dag.dag_id }}", "datetime":
timezone.utcnow()},
+ dag=self.dag,
+ )
+
def test_trigger_dagrun_operator_templated_conf(self):
"""Test passing a templated conf to the triggered DagRun."""
task = TriggerDagRunOperator(