This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 20c70028289 Make start_date in Context nullable (#58175)
20c70028289 is described below

commit 20c70028289fffcf7aa9bcbbfe92224f90693156
Author: Ramit Kataria <[email protected]>
AuthorDate: Tue Feb 24 12:15:39 2026 -0800

    Make start_date in Context nullable (#58175)
    
    * Make start_date in Context nullable
    
    If a deadline is missed before a DagRun is started (could be queued for
    a very long time), there would be an exception when creating the Context
    because start_date is currently non-nullable.
    
    * Add cadwyn migration for nullable DagRun.start_date
    
    * more explicit downgrade
    
    Co-authored-by: Amogh Desai <[email protected]>"
    
    * merge conflict static checks
    
    ---------
    
    Co-authored-by: ferruzzi <[email protected]>
---
 .../execution_api/datamodels/taskinstance.py       |   2 +-
 .../api_fastapi/execution_api/versions/__init__.py |   9 +-
 .../execution_api/versions/v2026_03_31.py          |  27 +++++
 .../execution_api/versions/v2026_03_31/__init__.py |  16 +++
 .../versions/v2026_03_31/test_task_instances.py    | 127 +++++++++++++++++++++
 .../src/airflow/sdk/api/datamodels/_generated.py   |   2 +-
 task-sdk/src/airflow/sdk/definitions/context.py    |   2 +-
 task-sdk/src/airflow/sdk/types.py                  |   2 +-
 .../tests/task_sdk/execution_time/test_context.py  |  30 ++++-
 9 files changed, 211 insertions(+), 6 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index 1d1c067d4b5..890314171b8 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -296,7 +296,7 @@ class DagRun(StrictBaseModel):
     data_interval_start: UtcDateTime | None
     data_interval_end: UtcDateTime | None
     run_after: UtcDateTime
-    start_date: UtcDateTime
+    start_date: UtcDateTime | None
     end_date: UtcDateTime | None
     clear_number: int = 0
     run_type: DagRunType
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index c9f4a2f9b66..f4f2d967e02 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -35,13 +35,20 @@ from airflow.api_fastapi.execution_api.versions.v2025_12_08 
import (
 )
 from airflow.api_fastapi.execution_api.versions.v2026_03_31 import (
     AddNoteField,
+    MakeDagRunStartDateNullable,
     ModifyDeferredTaskKwargsToJsonValue,
     RemoveUpstreamMapIndexesField,
 )
 
 bundle = VersionBundle(
     HeadVersion(),
-    Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue, 
RemoveUpstreamMapIndexesField, AddNoteField),
+    Version(
+        "2026-03-31",
+        MakeDagRunStartDateNullable,
+        ModifyDeferredTaskKwargsToJsonValue,
+        RemoveUpstreamMapIndexesField,
+        AddNoteField,
+    ),
     Version("2025-12-08", MovePreviousRunEndpoint, AddDagRunDetailEndpoint),
     Version("2025-11-07", AddPartitionKeyField),
     Version("2025-11-05", AddTriggeringUserNameField),
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
index e592296cf31..2d14493e81f 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
@@ -21,6 +21,7 @@ from typing import Any
 
 from cadwyn import ResponseInfo, VersionChange, 
convert_response_to_previous_version_for, schema
 
+from airflow.api_fastapi.common.types import UtcDateTime
 from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
     DagRun,
     TIDeferredStatePayload,
@@ -68,3 +69,29 @@ class AddNoteField(VersionChange):
         """Remove note field for older API versions."""
         if "dag_run" in response.body and isinstance(response.body["dag_run"], 
dict):
             response.body["dag_run"].pop("note", None)
+
+
+class MakeDagRunStartDateNullable(VersionChange):
+    """Make DagRun.start_date field nullable for runs that haven't started 
yet."""
+
+    description = __doc__
+
+    instructions_to_migrate_to_previous_version = 
(schema(DagRun).field("start_date").had(type=UtcDateTime),)
+
+    @convert_response_to_previous_version_for(TIRunContext)  # type: 
ignore[arg-type]
+    def ensure_start_date_in_ti_run_context(response: ResponseInfo) -> None:  
# type: ignore[misc]
+        """
+        Ensure start_date is never None in DagRun for previous API versions.
+
+        Older Task SDK clients expect start_date to be non-nullable. When the
+        DagRun hasn't started yet (e.g. queued), fall back to run_after.
+        """
+        dag_run = response.body.get("dag_run")
+        if isinstance(dag_run, dict) and dag_run.get("start_date") is None:
+            dag_run["start_date"] = dag_run.get("run_after")
+
+    @convert_response_to_previous_version_for(DagRun)  # type: ignore[arg-type]
+    def ensure_start_date_in_dag_run(response: ResponseInfo) -> None:  # type: 
ignore[misc]
+        """Ensure start_date is never None in direct DagRun responses for 
previous API versions."""
+        if response.body.get("start_date") is None:
+            response.body["start_date"] = response.body.get("run_after")
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/__init__.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_task_instances.py
new file mode 100644
index 00000000000..7fb44ce7ebe
--- /dev/null
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_task_instances.py
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pytest
+
+from airflow._shared.timezones import timezone
+from airflow.utils.state import DagRunState, State
+
+from tests_common.test_utils.db import clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+TIMESTAMP_STR = "2024-09-30T12:00:00Z"
+TIMESTAMP = timezone.parse(TIMESTAMP_STR)
+
+RUN_PATCH_BODY = {
+    "state": "running",
+    "hostname": "test-hostname",
+    "unixname": "test-user",
+    "pid": 12345,
+    "start_date": TIMESTAMP_STR,
+}
+
+
[email protected]
+def old_ver_client(client):
+    """Client configured to use API version before start_date nullable 
change."""
+    client.headers["Airflow-API-Version"] = "2025-12-08"
+    return client
+
+
+class TestDagRunStartDateNullableBackwardCompat:
+    """Test that older API versions get a non-null start_date fallback."""
+
+    @pytest.fixture(autouse=True)
+    def _freeze_time(self, time_machine):
+        time_machine.move_to(TIMESTAMP_STR, tick=False)
+
+    def setup_method(self):
+        clear_db_runs()
+
+    def teardown_method(self):
+        clear_db_runs()
+
+    def test_old_version_gets_run_after_when_start_date_is_null(
+        self,
+        old_ver_client,
+        session,
+        create_task_instance,
+    ):
+        ti = create_task_instance(
+            task_id="test_start_date_nullable",
+            state=State.QUEUED,
+            dagrun_state=DagRunState.QUEUED,
+            session=session,
+            start_date=TIMESTAMP,
+        )
+        ti.dag_run.start_date = None  # DagRun has not started yet
+        session.commit()
+
+        response = 
old_ver_client.patch(f"/execution/task-instances/{ti.id}/run", 
json=RUN_PATCH_BODY)
+        dag_run = response.json()["dag_run"]
+
+        assert response.status_code == 200
+        assert dag_run["start_date"] is not None
+        assert dag_run["start_date"] == dag_run["run_after"]
+
+    def test_head_version_allows_null_start_date(
+        self,
+        client,
+        session,
+        create_task_instance,
+    ):
+        ti = create_task_instance(
+            task_id="test_start_date_null_head",
+            state=State.QUEUED,
+            dagrun_state=DagRunState.QUEUED,
+            session=session,
+            start_date=TIMESTAMP,
+        )
+        ti.dag_run.start_date = None  # DagRun has not started yet
+        session.commit()
+
+        response = client.patch(f"/execution/task-instances/{ti.id}/run", 
json=RUN_PATCH_BODY)
+        dag_run = response.json()["dag_run"]
+
+        assert response.status_code == 200
+        assert dag_run["start_date"] is None
+
+    def test_old_version_preserves_real_start_date(
+        self,
+        old_ver_client,
+        session,
+        create_task_instance,
+    ):
+        ti = create_task_instance(
+            task_id="test_start_date_preserved",
+            state=State.QUEUED,
+            dagrun_state=DagRunState.RUNNING,
+            session=session,
+            start_date=TIMESTAMP,
+        )
+        assert ti.dag_run.start_date == TIMESTAMP  # DagRun has already started
+        session.commit()
+
+        response = 
old_ver_client.patch(f"/execution/task-instances/{ti.id}/run", 
json=RUN_PATCH_BODY)
+        dag_run = response.json()["dag_run"]
+
+        assert response.status_code == 200
+        assert dag_run["start_date"] is not None, "start_date should not be 
None when DagRun has started"
+        assert dag_run["start_date"] == 
TIMESTAMP.isoformat().replace("+00:00", "Z")
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 63a30585320..617e2a23934 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -621,7 +621,7 @@ class DagRun(BaseModel):
     data_interval_start: Annotated[AwareDatetime | None, Field(title="Data 
Interval Start")] = None
     data_interval_end: Annotated[AwareDatetime | None, Field(title="Data 
Interval End")] = None
     run_after: Annotated[AwareDatetime, Field(title="Run After")]
-    start_date: Annotated[AwareDatetime, Field(title="Start Date")]
+    start_date: Annotated[AwareDatetime | None, Field(title="Start Date")] = 
None
     end_date: Annotated[AwareDatetime | None, Field(title="End Date")] = None
     clear_number: Annotated[int | None, Field(title="Clear Number")] = 0
     run_type: DagRunType
diff --git a/task-sdk/src/airflow/sdk/definitions/context.py 
b/task-sdk/src/airflow/sdk/definitions/context.py
index ba4d9b65905..7ddb00df14d 100644
--- a/task-sdk/src/airflow/sdk/definitions/context.py
+++ b/task-sdk/src/airflow/sdk/definitions/context.py
@@ -65,7 +65,7 @@ class Context(TypedDict, total=False):
     prev_end_date_success: NotRequired[DateTime | None]
     reason: NotRequired[str | None]
     run_id: str
-    start_date: DateTime
+    start_date: DateTime | None
     # TODO: Remove Operator from below once we have MappedOperator to the Task 
SDK
     #   and once we can remove context related code from the 
Scheduler/models.TaskInstance
     task: BaseOperator | Operator
diff --git a/task-sdk/src/airflow/sdk/types.py 
b/task-sdk/src/airflow/sdk/types.py
index 237c36d36cd..2f191a6e080 100644
--- a/task-sdk/src/airflow/sdk/types.py
+++ b/task-sdk/src/airflow/sdk/types.py
@@ -80,7 +80,7 @@ class DagRunProtocol(Protocol):
     logical_date: AwareDatetime | None
     data_interval_start: AwareDatetime | None
     data_interval_end: AwareDatetime | None
-    start_date: AwareDatetime
+    start_date: AwareDatetime | None
     end_date: AwareDatetime | None
     run_type: Any
     run_after: AwareDatetime
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py 
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index a512062ff05..a2c3310822b 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -23,7 +23,7 @@ from unittest.mock import MagicMock, patch
 import pytest
 
 from airflow.sdk import BaseOperator, get_current_context, timezone
-from airflow.sdk.api.datamodels._generated import AssetEventResponse, 
AssetResponse
+from airflow.sdk.api.datamodels._generated import AssetEventResponse, 
AssetResponse, DagRun
 from airflow.sdk.bases.xcom import BaseXCom
 from airflow.sdk.definitions.asset import (
     Asset,
@@ -862,6 +862,34 @@ class TestInletEventAccessor:
         ]
 
 
+class TestDagRunStartDateNullable:
+    """Test that DagRun and TIRunContext accept start_date=None (queued runs 
that haven't started)."""
+
+    def test_dag_run_model_accepts_null_start_date(self):
+        """DagRun datamodel should accept start_date=None for runs that 
haven't started yet."""
+        dag_run = DagRun(
+            dag_id="test_dag",
+            run_id="test_run",
+            logical_date="2024-12-01T01:00:00Z",
+            data_interval_start="2024-12-01T00:00:00Z",
+            data_interval_end="2024-12-01T01:00:00Z",
+            start_date=None,
+            run_after="2024-12-01T01:00:00Z",
+            run_type="manual",
+            state="queued",
+            conf=None,
+            consumed_asset_events=[],
+        )
+
+        assert dag_run.start_date is None
+
+    def test_ti_run_context_with_null_start_date(self, make_ti_context):
+        """TIRunContext should be constructable when the DagRun has 
start_date=None."""
+        ti_context = make_ti_context(start_date=None)
+
+        assert ti_context.dag_run.start_date is None
+
+
 class TestAsyncGetConnection:
     """Test async connection retrieval with secrets backends."""
 

Reply via email to