This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new d22360203d6 Fix `TypeError` when deserializing task with 
`execution_timeout` set to `None` (#46766) (#46822)
d22360203d6 is described below

commit d22360203d68b231d0a92ce51f7c5a814b495a8e
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Feb 17 21:41:57 2025 +0100

    Fix `TypeError` when deserializing task with `execution_timeout` set to 
`None` (#46766) (#46822)
    
    * Fix TypeError when deserializing task with None execution_timeout
    
    * Fix pytest-extraneous-scope-function (PT003)
    
    (cherry picked from commit 97d62ee64e997acfa84791d6aa730cf6ed596c0b)
    
    Co-authored-by: Wojciech Szlachta <[email protected]>
---
 airflow/serialization/serialized_objects.py   |  5 ++++-
 tests/deprecations_ignore.yml                 |  2 +-
 tests/serialization/test_dag_serialization.py | 31 +++++++++++++++++++++++++++
 3 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 9eb3332a6b6..b1fbb05c0f2 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -1297,7 +1297,10 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
             elif k == "subdag":
                 v = SerializedDAG.deserialize_dag(v)
             elif k in {"retry_delay", "execution_timeout", "sla", 
"max_retry_delay"}:
-                v = cls._deserialize_timedelta(v)
+                # If operator's execution_timeout is None and 
core.default_task_execution_timeout is not None,
+                # v will be None so do not deserialize into timedelta
+                if v is not None:
+                    v = cls._deserialize_timedelta(v)
             elif k in encoded_op["template_fields"]:
                 pass
             elif k == "resources":
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index 8be939227d8..29a3566d314 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -25,7 +25,7 @@
 - tests/models/test_dagbag.py::TestDagBag::test_load_subdags
 - 
tests/models/test_mappedoperator.py::test_expand_mapped_task_instance_with_named_index
 - tests/models/test_xcom.py::TestXCom::test_set_serialize_call_old_signature
-
+- 
tests/serialization/test_dag_serialization.py::TestStringifiedDAGs::test_serialize_mapped_sensor_has_reschedule_dep
 
 # WWW
 - 
tests/www/api/experimental/test_dag_runs_endpoint.py::TestDagRunsEndpoint::test_get_dag_runs_success
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 58f16d80f8c..103b396ba53 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -2709,6 +2709,37 @@ def test_task_resources_serde():
     }
 
 
[email protected](params=[None, timedelta(hours=1)])
+def default_task_execution_timeout(request):
+    """
+    Mock setting core.default_task_execution_timeout in airflow.cfg.
+    """
+    from airflow.serialization.serialized_objects import SerializedBaseOperator
+
+    DEFAULT_TASK_EXECUTION_TIMEOUT = request.param
+    with mock.patch.dict(
+        SerializedBaseOperator._CONSTRUCTOR_PARAMS, {"execution_timeout": 
DEFAULT_TASK_EXECUTION_TIMEOUT}
+    ):
+        yield DEFAULT_TASK_EXECUTION_TIMEOUT
+
+
[email protected]("execution_timeout", [None, timedelta(hours=1)])
+def test_task_execution_timeout_serde(execution_timeout, 
default_task_execution_timeout):
+    """
+    Test task execution_timeout serialization/deserialization.
+    """
+
+    with DAG("test_task_execution_timeout", schedule=None, 
start_date=datetime(2020, 1, 1)) as _:
+        task = EmptyOperator(task_id="task1", 
execution_timeout=execution_timeout)
+
+    serialized = BaseSerialization.serialize(task)
+    if execution_timeout != default_task_execution_timeout:
+        assert "execution_timeout" in serialized["__var"]
+
+    deserialized = BaseSerialization.deserialize(serialized)
+    assert deserialized.execution_timeout == task.execution_timeout
+
+
 def test_taskflow_expand_serde():
     from airflow.decorators import task
     from airflow.models.xcom_arg import XComArg

Reply via email to