This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f976d00fcc Fix scheduler crash during 3.0 to 3.1 migration when 
retry_delay is None (#56202)
1f976d00fcc is described below

commit 1f976d00fccb5875f0bf5aa07b51af12feb01995
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Mon Sep 29 21:31:44 2025 -0500

    Fix scheduler crash during 3.0 to 3.1 migration when retry_delay is None 
(#56202)
    
    * Kaxil's suggestions
    
    * make default a float because some tests are complaining
    
    * Fix test
    
    * fixup! Fix test
    
    * fixup! fixup! Fix test
    
    ---------
    
    Co-authored-by: Kaxil Naik <[email protected]>
---
 airflow-core/src/airflow/serialization/schema.json |  2 +-
 .../airflow/serialization/serialized_objects.py    | 15 +++--
 .../unit/serialization/test_dag_serialization.py   | 68 ++++++++++++++++++----
 scripts/in_container/run_schema_defaults_check.py  |  3 +
 4 files changed, 71 insertions(+), 17 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/schema.json 
b/airflow-core/src/airflow/serialization/schema.json
index 7707494fce5..d79c7477297 100644
--- a/airflow-core/src/airflow/serialization/schema.json
+++ b/airflow-core/src/airflow/serialization/schema.json
@@ -283,7 +283,7 @@
         "pool": { "type": "string", "default": "default_pool" },
         "pool_slots": { "type": "number", "default": 1 },
         "execution_timeout": { "$ref": "#/definitions/timedelta" },
-        "retry_delay": { "$ref": "#/definitions/timedelta" },
+        "retry_delay": { "$ref": "#/definitions/timedelta", "default": 300.0 },
         "retry_exponential_backoff": { "type": "boolean", "default": false },
         "max_retry_delay": { "$ref": "#/definitions/timedelta" },
         "params": { "$ref": "#/definitions/params" },
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index a9a29a6b4d5..aac63b7390f 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1291,7 +1291,7 @@ class SerializedBaseOperator(DAGNode, BaseSerialization):
 
     resources: dict[str, Any] | None = None
     retries: int = 0
-    retry_delay: datetime.timedelta
+    retry_delay: datetime.timedelta = datetime.timedelta(seconds=300)
     retry_exponential_backoff: bool = False
     run_as_user: str | None = None
 
@@ -2057,19 +2057,26 @@ class SerializedBaseOperator(DAGNode, 
BaseSerialization):
         for k, v in OPERATOR_DEFAULTS.items():
             if k not in cls.get_serialized_fields():
                 continue
-            # Exclude values that are the same as the schema defaults
-            if k in schema_defaults and schema_defaults[k] == v:
-                continue
 
             # Exclude values that are None or empty collections
             if v is None or v in [[], (), set(), {}]:
                 continue
 
+            # Check schema defaults first with raw value comparison (fast path)
+            if k in schema_defaults and schema_defaults[k] == v:
+                continue
+
             # Use the existing serialize method to ensure consistent format
             serialized_value = cls.serialize(v)
             # Extract just the value part, consistent with serialize_to_json 
behavior
             if isinstance(serialized_value, dict) and Encoding.TYPE in 
serialized_value:
                 serialized_value = serialized_value[Encoding.VAR]
+
+            # For cases where raw comparison failed but serialized values 
might match
+            # (e.g., timedelta vs float), check again with serialized value
+            if k in schema_defaults and schema_defaults[k] == serialized_value:
+                continue
+
             client_defaults[k] = serialized_value
 
         return client_defaults
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 7fcc987cd46..52143a08791 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -19,6 +19,7 @@
 
 from __future__ import annotations
 
+import contextlib
 import copy
 import dataclasses
 import importlib
@@ -99,6 +100,43 @@ from tests_common.test_utils.timetables import (
 if TYPE_CHECKING:
     from airflow.sdk.definitions.context import Context
 
+
[email protected]
+def operator_defaults(overrides):
+    """
+    Temporarily patches OPERATOR_DEFAULTS, restoring original values after 
context exit.
+
+    Example:
+        with operator_defaults({"retries": 2, "retry_delay": 200.0}):
+            # Test code with modified operator defaults
+    """
+    from airflow.sdk.bases.operator import OPERATOR_DEFAULTS
+
+    original_values = {}
+    try:
+        # Store original values and apply overrides
+        for key, value in overrides.items():
+            original_values[key] = OPERATOR_DEFAULTS.get(key)
+            OPERATOR_DEFAULTS[key] = value
+
+        # Clear the cache to ensure fresh generation
+        SerializedBaseOperator.generate_client_defaults.cache_clear()
+
+        yield
+    finally:
+        # Cleanup: restore original values
+        for key, original_value in original_values.items():
+            if original_value is None and key in OPERATOR_DEFAULTS:
+                # Key didn't exist originally, remove it
+                del OPERATOR_DEFAULTS[key]
+            else:
+                # Restore original value
+                OPERATOR_DEFAULTS[key] = original_value
+
+        # Clear cache again to restore normal behavior
+        SerializedBaseOperator.generate_client_defaults.cache_clear()
+
+
 AIRFLOW_REPO_ROOT_PATH = Path(airflow.__file__).parents[3]
 
 
@@ -117,14 +155,13 @@ TYPE = Encoding.TYPE
 VAR = Encoding.VAR
 serialized_simple_dag_ground_truth = {
     "__version": 3,
-    "client_defaults": {"tasks": {"retry_delay": 300.0}},
     "dag": {
         "default_args": {
             "__type": "dict",
             "__var": {
                 "depends_on_past": False,
                 "retries": 1,
-                "retry_delay": {"__type": "timedelta", "__var": 300.0},
+                "retry_delay": {"__type": "timedelta", "__var": 240.0},
                 "max_retry_delay": {"__type": "timedelta", "__var": 600.0},
             },
         },
@@ -165,7 +202,7 @@ serialized_simple_dag_ground_truth = {
                 "__var": {
                     "task_id": "bash_task",
                     "retries": 1,
-                    "retry_delay": 300.0,
+                    "retry_delay": 240.0,
                     "max_retry_delay": 600.0,
                     "ui_color": "#f0ede4",
                     "template_ext": [".sh", ".bash"],
@@ -224,7 +261,7 @@ serialized_simple_dag_ground_truth = {
                 "__var": {
                     "task_id": "custom_task",
                     "retries": 1,
-                    "retry_delay": 300.0,
+                    "retry_delay": 240.0,
                     "max_retry_delay": 600.0,
                     "_operator_extra_links": {"Google Custom": 
"_link_CustomOpLink"},
                     "template_fields": ["bash_command"],
@@ -294,7 +331,7 @@ def make_simple_dag():
         schedule=timedelta(days=1),
         default_args={
             "retries": 1,
-            "retry_delay": timedelta(minutes=5),
+            "retry_delay": timedelta(minutes=4),
             "max_retry_delay": timedelta(minutes=10),
             "depends_on_past": False,
         },
@@ -3072,7 +3109,7 @@ def test_handle_v1_serdag():
                 "__var": {
                     "depends_on_past": False,
                     "retries": 1,
-                    "retry_delay": {"__type": "timedelta", "__var": 300.0},
+                    "retry_delay": {"__type": "timedelta", "__var": 240.0},
                     "max_retry_delay": {"__type": "timedelta", "__var": 600.0},
                     "sla": {"__type": "timedelta", "__var": 100.0},
                 },
@@ -3110,7 +3147,7 @@ def test_handle_v1_serdag():
                     "__var": {
                         "task_id": "bash_task",
                         "retries": 1,
-                        "retry_delay": 300.0,
+                        "retry_delay": 240.0,
                         "max_retry_delay": 600.0,
                         "sla": 100.0,
                         "downstream_task_ids": [],
@@ -3173,7 +3210,7 @@ def test_handle_v1_serdag():
                     "__var": {
                         "task_id": "custom_task",
                         "retries": 1,
-                        "retry_delay": 300.0,
+                        "retry_delay": 240.0,
                         "max_retry_delay": 600.0,
                         "sla": 100.0,
                         "downstream_task_ids": [],
@@ -3383,7 +3420,7 @@ def test_handle_v2_serdag():
                 "__var": {
                     "depends_on_past": False,
                     "retries": 1,
-                    "retry_delay": {"__type": "timedelta", "__var": 300.0},
+                    "retry_delay": {"__type": "timedelta", "__var": 240.0},
                     "max_retry_delay": {"__type": "timedelta", "__var": 600.0},
                 },
             },
@@ -3425,7 +3462,7 @@ def test_handle_v2_serdag():
                     "__var": {
                         "task_id": "bash_task",
                         "retries": 1,
-                        "retry_delay": 300.0,
+                        "retry_delay": 240.0,
                         "max_retry_delay": 600.0,
                         "downstream_task_ids": [],
                         "ui_color": "#f0ede4",
@@ -3491,7 +3528,7 @@ def test_handle_v2_serdag():
                     "__var": {
                         "task_id": "custom_task",
                         "retries": 1,
-                        "retry_delay": 300.0,
+                        "retry_delay": 240.0,
                         "max_retry_delay": 600.0,
                         "downstream_task_ids": [],
                         "_operator_extra_links": {"Google Custom": 
"_link_CustomOpLink"},
@@ -4004,8 +4041,9 @@ class TestDeserializationDefaultsResolution:
         result = 
SerializedBaseOperator._apply_defaults_to_encoded_op(encoded_op, None)
         assert result == encoded_op
 
+    @operator_defaults({"retries": 2})
     def test_multiple_tasks_share_client_defaults(self):
-        """Test that multiple tasks can share the same client_defaults."""
+        """Test that multiple tasks can share the same client_defaults when 
there are actually non-default values."""
         with DAG(dag_id="test_dag") as dag:
             BashOperator(task_id="task1", bash_command="echo 1")
             BashOperator(task_id="task2", bash_command="echo 2")
@@ -4024,6 +4062,10 @@ class TestDeserializationDefaultsResolution:
         deserialized_task1 = deserialized_dag.get_task("task1")
         deserialized_task2 = deserialized_dag.get_task("task2")
 
+        # Both tasks should have retries=2 from client_defaults
+        assert deserialized_task1.retries == 2
+        assert deserialized_task2.retries == 2
+
         # Both tasks should have the same default values from client_defaults
         for field in client_defaults:
             if hasattr(deserialized_task1, field) and 
hasattr(deserialized_task2, field):
@@ -4035,6 +4077,7 @@ class TestDeserializationDefaultsResolution:
 class TestMappedOperatorSerializationAndClientDefaults:
     """Test MappedOperator serialization with client defaults and callback 
properties."""
 
+    @operator_defaults({"retry_delay": 200.0})
     def test_mapped_operator_client_defaults_application(self):
         """Test that client_defaults are correctly applied to MappedOperator 
during deserialization."""
         with DAG(dag_id="test_mapped_dag") as dag:
@@ -4099,6 +4142,7 @@ class TestMappedOperatorSerializationAndClientDefaults:
             ),
         ],
     )
+    @operator_defaults({"retry_delay": 200.0})
     def test_mapped_operator_client_defaults_optimization(
         self, task_config, dag_id, task_id, non_default_fields
     ):
diff --git a/scripts/in_container/run_schema_defaults_check.py 
b/scripts/in_container/run_schema_defaults_check.py
index bc7c1e2844c..e9744134360 100755
--- a/scripts/in_container/run_schema_defaults_check.py
+++ b/scripts/in_container/run_schema_defaults_check.py
@@ -28,6 +28,7 @@ from __future__ import annotations
 
 import json
 import sys
+from datetime import timedelta
 from pathlib import Path
 from typing import Any
 
@@ -80,6 +81,8 @@ def get_server_side_operator_defaults() -> dict[str, Any]:
                     if isinstance(default_value, (set, tuple)):
                         # Convert to list since schema.json is pure JSON
                         default_value = list(default_value)
+                    elif isinstance(default_value, timedelta):
+                        default_value = default_value.total_seconds()
                     server_defaults[field_name] = default_value
 
         return server_defaults

Reply via email to