This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 20c70028289 Make start_date in Context nullable (#58175)
20c70028289 is described below
commit 20c70028289fffcf7aa9bcbbfe92224f90693156
Author: Ramit Kataria <[email protected]>
AuthorDate: Tue Feb 24 12:15:39 2026 -0800
Make start_date in Context nullable (#58175)
* Make start_date in Context nullable
If a deadline is missed before a DagRun is started (could be queued for
a very long time), there would be an exception when creating the Context
because start_date is currently non-nullable.
* Add cadwyn migration for nullable DagRun.start_date
* more explicit downgrade
Co-authored-by: Amogh Desai <[email protected]>"
* merge conflict static checks
---------
Co-authored-by: ferruzzi <[email protected]>
---
.../execution_api/datamodels/taskinstance.py | 2 +-
.../api_fastapi/execution_api/versions/__init__.py | 9 +-
.../execution_api/versions/v2026_03_31.py | 27 +++++
.../execution_api/versions/v2026_03_31/__init__.py | 16 +++
.../versions/v2026_03_31/test_task_instances.py | 127 +++++++++++++++++++++
.../src/airflow/sdk/api/datamodels/_generated.py | 2 +-
task-sdk/src/airflow/sdk/definitions/context.py | 2 +-
task-sdk/src/airflow/sdk/types.py | 2 +-
.../tests/task_sdk/execution_time/test_context.py | 30 ++++-
9 files changed, 211 insertions(+), 6 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index 1d1c067d4b5..890314171b8 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -296,7 +296,7 @@ class DagRun(StrictBaseModel):
data_interval_start: UtcDateTime | None
data_interval_end: UtcDateTime | None
run_after: UtcDateTime
- start_date: UtcDateTime
+ start_date: UtcDateTime | None
end_date: UtcDateTime | None
clear_number: int = 0
run_type: DagRunType
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index c9f4a2f9b66..f4f2d967e02 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -35,13 +35,20 @@ from airflow.api_fastapi.execution_api.versions.v2025_12_08
import (
)
from airflow.api_fastapi.execution_api.versions.v2026_03_31 import (
AddNoteField,
+ MakeDagRunStartDateNullable,
ModifyDeferredTaskKwargsToJsonValue,
RemoveUpstreamMapIndexesField,
)
bundle = VersionBundle(
HeadVersion(),
- Version("2026-03-31", ModifyDeferredTaskKwargsToJsonValue,
RemoveUpstreamMapIndexesField, AddNoteField),
+ Version(
+ "2026-03-31",
+ MakeDagRunStartDateNullable,
+ ModifyDeferredTaskKwargsToJsonValue,
+ RemoveUpstreamMapIndexesField,
+ AddNoteField,
+ ),
Version("2025-12-08", MovePreviousRunEndpoint, AddDagRunDetailEndpoint),
Version("2025-11-07", AddPartitionKeyField),
Version("2025-11-05", AddTriggeringUserNameField),
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
index e592296cf31..2d14493e81f 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_03_31.py
@@ -21,6 +21,7 @@ from typing import Any
from cadwyn import ResponseInfo, VersionChange,
convert_response_to_previous_version_for, schema
+from airflow.api_fastapi.common.types import UtcDateTime
from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
DagRun,
TIDeferredStatePayload,
@@ -68,3 +69,29 @@ class AddNoteField(VersionChange):
"""Remove note field for older API versions."""
if "dag_run" in response.body and isinstance(response.body["dag_run"],
dict):
response.body["dag_run"].pop("note", None)
+
+
+class MakeDagRunStartDateNullable(VersionChange):
+ """Make DagRun.start_date field nullable for runs that haven't started
yet."""
+
+ description = __doc__
+
+ instructions_to_migrate_to_previous_version =
(schema(DagRun).field("start_date").had(type=UtcDateTime),)
+
+ @convert_response_to_previous_version_for(TIRunContext) # type:
ignore[arg-type]
+ def ensure_start_date_in_ti_run_context(response: ResponseInfo) -> None:
# type: ignore[misc]
+ """
+ Ensure start_date is never None in DagRun for previous API versions.
+
+ Older Task SDK clients expect start_date to be non-nullable. When the
+ DagRun hasn't started yet (e.g. queued), fall back to run_after.
+ """
+ dag_run = response.body.get("dag_run")
+ if isinstance(dag_run, dict) and dag_run.get("start_date") is None:
+ dag_run["start_date"] = dag_run.get("run_after")
+
+ @convert_response_to_previous_version_for(DagRun) # type: ignore[arg-type]
+ def ensure_start_date_in_dag_run(response: ResponseInfo) -> None: # type:
ignore[misc]
+ """Ensure start_date is never None in direct DagRun responses for
previous API versions."""
+ if response.body.get("start_date") is None:
+ response.body["start_date"] = response.body.get("run_after")
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/__init__.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_task_instances.py
new file mode 100644
index 00000000000..7fb44ce7ebe
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_03_31/test_task_instances.py
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pytest
+
+from airflow._shared.timezones import timezone
+from airflow.utils.state import DagRunState, State
+
+from tests_common.test_utils.db import clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+TIMESTAMP_STR = "2024-09-30T12:00:00Z"
+TIMESTAMP = timezone.parse(TIMESTAMP_STR)
+
+RUN_PATCH_BODY = {
+ "state": "running",
+ "hostname": "test-hostname",
+ "unixname": "test-user",
+ "pid": 12345,
+ "start_date": TIMESTAMP_STR,
+}
+
+
[email protected]
+def old_ver_client(client):
+ """Client configured to use API version before start_date nullable
change."""
+ client.headers["Airflow-API-Version"] = "2025-12-08"
+ return client
+
+
+class TestDagRunStartDateNullableBackwardCompat:
+ """Test that older API versions get a non-null start_date fallback."""
+
+ @pytest.fixture(autouse=True)
+ def _freeze_time(self, time_machine):
+ time_machine.move_to(TIMESTAMP_STR, tick=False)
+
+ def setup_method(self):
+ clear_db_runs()
+
+ def teardown_method(self):
+ clear_db_runs()
+
+ def test_old_version_gets_run_after_when_start_date_is_null(
+ self,
+ old_ver_client,
+ session,
+ create_task_instance,
+ ):
+ ti = create_task_instance(
+ task_id="test_start_date_nullable",
+ state=State.QUEUED,
+ dagrun_state=DagRunState.QUEUED,
+ session=session,
+ start_date=TIMESTAMP,
+ )
+ ti.dag_run.start_date = None # DagRun has not started yet
+ session.commit()
+
+ response =
old_ver_client.patch(f"/execution/task-instances/{ti.id}/run",
json=RUN_PATCH_BODY)
+ dag_run = response.json()["dag_run"]
+
+ assert response.status_code == 200
+ assert dag_run["start_date"] is not None
+ assert dag_run["start_date"] == dag_run["run_after"]
+
+ def test_head_version_allows_null_start_date(
+ self,
+ client,
+ session,
+ create_task_instance,
+ ):
+ ti = create_task_instance(
+ task_id="test_start_date_null_head",
+ state=State.QUEUED,
+ dagrun_state=DagRunState.QUEUED,
+ session=session,
+ start_date=TIMESTAMP,
+ )
+ ti.dag_run.start_date = None # DagRun has not started yet
+ session.commit()
+
+ response = client.patch(f"/execution/task-instances/{ti.id}/run",
json=RUN_PATCH_BODY)
+ dag_run = response.json()["dag_run"]
+
+ assert response.status_code == 200
+ assert dag_run["start_date"] is None
+
+ def test_old_version_preserves_real_start_date(
+ self,
+ old_ver_client,
+ session,
+ create_task_instance,
+ ):
+ ti = create_task_instance(
+ task_id="test_start_date_preserved",
+ state=State.QUEUED,
+ dagrun_state=DagRunState.RUNNING,
+ session=session,
+ start_date=TIMESTAMP,
+ )
+ assert ti.dag_run.start_date == TIMESTAMP # DagRun has already started
+ session.commit()
+
+ response =
old_ver_client.patch(f"/execution/task-instances/{ti.id}/run",
json=RUN_PATCH_BODY)
+ dag_run = response.json()["dag_run"]
+
+ assert response.status_code == 200
+ assert dag_run["start_date"] is not None, "start_date should not be
None when DagRun has started"
+ assert dag_run["start_date"] ==
TIMESTAMP.isoformat().replace("+00:00", "Z")
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 63a30585320..617e2a23934 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -621,7 +621,7 @@ class DagRun(BaseModel):
data_interval_start: Annotated[AwareDatetime | None, Field(title="Data
Interval Start")] = None
data_interval_end: Annotated[AwareDatetime | None, Field(title="Data
Interval End")] = None
run_after: Annotated[AwareDatetime, Field(title="Run After")]
- start_date: Annotated[AwareDatetime, Field(title="Start Date")]
+ start_date: Annotated[AwareDatetime | None, Field(title="Start Date")] =
None
end_date: Annotated[AwareDatetime | None, Field(title="End Date")] = None
clear_number: Annotated[int | None, Field(title="Clear Number")] = 0
run_type: DagRunType
diff --git a/task-sdk/src/airflow/sdk/definitions/context.py
b/task-sdk/src/airflow/sdk/definitions/context.py
index ba4d9b65905..7ddb00df14d 100644
--- a/task-sdk/src/airflow/sdk/definitions/context.py
+++ b/task-sdk/src/airflow/sdk/definitions/context.py
@@ -65,7 +65,7 @@ class Context(TypedDict, total=False):
prev_end_date_success: NotRequired[DateTime | None]
reason: NotRequired[str | None]
run_id: str
- start_date: DateTime
+ start_date: DateTime | None
# TODO: Remove Operator from below once we have MappedOperator to the Task
SDK
# and once we can remove context related code from the
Scheduler/models.TaskInstance
task: BaseOperator | Operator
diff --git a/task-sdk/src/airflow/sdk/types.py
b/task-sdk/src/airflow/sdk/types.py
index 237c36d36cd..2f191a6e080 100644
--- a/task-sdk/src/airflow/sdk/types.py
+++ b/task-sdk/src/airflow/sdk/types.py
@@ -80,7 +80,7 @@ class DagRunProtocol(Protocol):
logical_date: AwareDatetime | None
data_interval_start: AwareDatetime | None
data_interval_end: AwareDatetime | None
- start_date: AwareDatetime
+ start_date: AwareDatetime | None
end_date: AwareDatetime | None
run_type: Any
run_after: AwareDatetime
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index a512062ff05..a2c3310822b 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -23,7 +23,7 @@ from unittest.mock import MagicMock, patch
import pytest
from airflow.sdk import BaseOperator, get_current_context, timezone
-from airflow.sdk.api.datamodels._generated import AssetEventResponse,
AssetResponse
+from airflow.sdk.api.datamodels._generated import AssetEventResponse,
AssetResponse, DagRun
from airflow.sdk.bases.xcom import BaseXCom
from airflow.sdk.definitions.asset import (
Asset,
@@ -862,6 +862,34 @@ class TestInletEventAccessor:
]
+class TestDagRunStartDateNullable:
+ """Test that DagRun and TIRunContext accept start_date=None (queued runs
that haven't started)."""
+
+ def test_dag_run_model_accepts_null_start_date(self):
+ """DagRun datamodel should accept start_date=None for runs that
haven't started yet."""
+ dag_run = DagRun(
+ dag_id="test_dag",
+ run_id="test_run",
+ logical_date="2024-12-01T01:00:00Z",
+ data_interval_start="2024-12-01T00:00:00Z",
+ data_interval_end="2024-12-01T01:00:00Z",
+ start_date=None,
+ run_after="2024-12-01T01:00:00Z",
+ run_type="manual",
+ state="queued",
+ conf=None,
+ consumed_asset_events=[],
+ )
+
+ assert dag_run.start_date is None
+
+ def test_ti_run_context_with_null_start_date(self, make_ti_context):
+ """TIRunContext should be constructable when the DagRun has
start_date=None."""
+ ti_context = make_ti_context(start_date=None)
+
+ assert ti_context.dag_run.start_date is None
+
+
class TestAsyncGetConnection:
"""Test async connection retrieval with secrets backends."""