This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-3-test by this push:
new 10675af30b Add 'reschedule' to the serialized fields for the
BaseSensorOperator (#23674)
10675af30b is described below
commit 10675af30bda2137af42197c61fd89d665bec0a3
Author: David Caron <[email protected]>
AuthorDate: Tue May 17 08:18:29 2022 -0400
Add 'reschedule' to the serialized fields for the BaseSensorOperator
(#23674)
fix #23411
(cherry picked from commit f9e2a3051cd3a5b6fcf33bca4c929d220cf5661e)
---
airflow/sensors/base.py | 4 ++++
tests/serialization/test_dag_serialization.py | 1 +
tests/ti_deps/deps/test_ready_to_reschedule_dep.py | 7 ++++++-
3 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 7f1cd87c3d..f00b3a6761 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -339,6 +339,10 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
"""Define mode rescheduled sensors."""
return self.mode == 'reschedule'
+ @classmethod
+ def get_serialized_fields(cls):
+ return super().get_serialized_fields() | {"reschedule"}
+
def poke_mode_only(cls):
"""
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 0144501f1a..fe9fc7c7e5 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -1462,6 +1462,7 @@ class TestStringifiedDAGs:
assert "deps" in blob
serialized_op = SerializedBaseOperator.deserialize_operator(blob)
+ assert serialized_op.reschedule == (mode == "reschedule")
assert op.deps == serialized_op.deps
@pytest.mark.parametrize(
diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
index 470166db21..99416bbbc8 100644
--- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -31,7 +31,7 @@ from airflow.utils.timezone import utcnow
class TestNotInReschedulePeriodDep(unittest.TestCase):
def _get_task_instance(self, state):
dag = DAG('test_dag')
- task = Mock(dag=dag)
+ task = Mock(dag=dag, reschedule=True)
ti = TaskInstance(task=task, state=state, run_id=None)
return ti
@@ -52,6 +52,11 @@ class TestNotInReschedulePeriodDep(unittest.TestCase):
dep_context = DepContext(ignore_in_reschedule_period=True)
assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)
+ def test_should_pass_if_not_reschedule_mode(self):
+ ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
+ del ti.task.reschedule
+ assert ReadyToRescheduleDep().is_met(ti=ti)
+
def test_should_pass_if_not_in_none_state(self):
ti = self._get_task_instance(State.UP_FOR_RETRY)
assert ReadyToRescheduleDep().is_met(ti=ti)