This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new d22360203d6 Fix `TypeError` when deserializing task with
`execution_timeout` set to `None` (#46766) (#46822)
d22360203d6 is described below
commit d22360203d68b231d0a92ce51f7c5a814b495a8e
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Feb 17 21:41:57 2025 +0100
Fix `TypeError` when deserializing task with `execution_timeout` set to
`None` (#46766) (#46822)
* Fix TypeError when deserializing task with None execution_timeout
* Fix pytest-extraneous-scope-function (PT003)
(cherry picked from commit 97d62ee64e997acfa84791d6aa730cf6ed596c0b)
Co-authored-by: Wojciech Szlachta <[email protected]>
---
airflow/serialization/serialized_objects.py | 5 ++++-
tests/deprecations_ignore.yml | 2 +-
tests/serialization/test_dag_serialization.py | 31 +++++++++++++++++++++++++++
3 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 9eb3332a6b6..b1fbb05c0f2 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -1297,7 +1297,10 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
elif k == "subdag":
v = SerializedDAG.deserialize_dag(v)
elif k in {"retry_delay", "execution_timeout", "sla",
"max_retry_delay"}:
- v = cls._deserialize_timedelta(v)
+ # If operator's execution_timeout is None and
core.default_task_execution_timeout is not None,
+ # v will be None so do not deserialize into timedelta
+ if v is not None:
+ v = cls._deserialize_timedelta(v)
elif k in encoded_op["template_fields"]:
pass
elif k == "resources":
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index 8be939227d8..29a3566d314 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -25,7 +25,7 @@
- tests/models/test_dagbag.py::TestDagBag::test_load_subdags
-
tests/models/test_mappedoperator.py::test_expand_mapped_task_instance_with_named_index
- tests/models/test_xcom.py::TestXCom::test_set_serialize_call_old_signature
-
+-
tests/serialization/test_dag_serialization.py::TestStringifiedDAGs::test_serialize_mapped_sensor_has_reschedule_dep
# WWW
-
tests/www/api/experimental/test_dag_runs_endpoint.py::TestDagRunsEndpoint::test_get_dag_runs_success
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 58f16d80f8c..103b396ba53 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -2709,6 +2709,37 @@ def test_task_resources_serde():
}
[email protected](params=[None, timedelta(hours=1)])
+def default_task_execution_timeout(request):
+ """
+ Mock setting core.default_task_execution_timeout in airflow.cfg.
+ """
+ from airflow.serialization.serialized_objects import SerializedBaseOperator
+
+ DEFAULT_TASK_EXECUTION_TIMEOUT = request.param
+ with mock.patch.dict(
+ SerializedBaseOperator._CONSTRUCTOR_PARAMS, {"execution_timeout":
DEFAULT_TASK_EXECUTION_TIMEOUT}
+ ):
+ yield DEFAULT_TASK_EXECUTION_TIMEOUT
+
+
[email protected]("execution_timeout", [None, timedelta(hours=1)])
+def test_task_execution_timeout_serde(execution_timeout,
default_task_execution_timeout):
+ """
+ Test task execution_timeout serialization/deserialization.
+ """
+
+ with DAG("test_task_execution_timeout", schedule=None,
start_date=datetime(2020, 1, 1)) as _:
+ task = EmptyOperator(task_id="task1",
execution_timeout=execution_timeout)
+
+ serialized = BaseSerialization.serialize(task)
+ if execution_timeout != default_task_execution_timeout:
+ assert "execution_timeout" in serialized["__var"]
+
+ deserialized = BaseSerialization.deserialize(serialized)
+ assert deserialized.execution_timeout == task.execution_timeout
+
+
def test_taskflow_expand_serde():
from airflow.decorators import task
from airflow.models.xcom_arg import XComArg