This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 14d42eb063f Set static type as `datetime` in models when type is 
`UtcDateTime` (#56780)
14d42eb063f is described below

commit 14d42eb063f33eec4be3af906eed00e6ce585488
Author: Vincent <[email protected]>
AuthorDate: Mon Oct 20 11:04:49 2025 -0400

    Set static type as `datetime` in models when type is `UtcDateTime` (#56780)
---
 airflow-core/src/airflow/models/asset.py           | 29 +++++++++++-----------
 airflow-core/src/airflow/models/backfill.py        | 12 ++++-----
 airflow-core/src/airflow/models/dag.py             | 12 ++++-----
 airflow-core/src/airflow/models/dag_version.py     |  5 ++--
 airflow-core/src/airflow/models/dagbundle.py       |  4 ++-
 airflow-core/src/airflow/models/dagcode.py         |  5 ++--
 airflow-core/src/airflow/models/dagrun.py          | 22 ++++++++--------
 airflow-core/src/airflow/models/dagwarning.py      |  3 ++-
 .../src/airflow/models/db_callback_request.py      |  3 ++-
 airflow-core/src/airflow/models/deadline.py        |  2 +-
 airflow-core/src/airflow/models/errors.py          |  4 ++-
 airflow-core/src/airflow/models/hitl.py            |  5 ++--
 airflow-core/src/airflow/models/log.py             |  5 ++--
 airflow-core/src/airflow/models/serialized_dag.py  |  6 ++---
 airflow-core/src/airflow/models/taskinstance.py    | 18 +++++++-------
 .../src/airflow/models/taskinstancehistory.py      | 11 ++++----
 airflow-core/src/airflow/models/tasklog.py         |  4 ++-
 airflow-core/src/airflow/models/taskreschedule.py  |  6 ++---
 airflow-core/src/airflow/models/trigger.py         |  2 +-
 airflow-core/src/airflow/models/xcom.py            |  3 ++-
 20 files changed, 88 insertions(+), 73 deletions(-)

diff --git a/airflow-core/src/airflow/models/asset.py 
b/airflow-core/src/airflow/models/asset.py
index 4d9bd8ae1cf..02513c9a2a3 100644
--- a/airflow-core/src/airflow/models/asset.py
+++ b/airflow-core/src/airflow/models/asset.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime
 from typing import TYPE_CHECKING
 from urllib.parse import urlsplit
 
@@ -303,8 +304,8 @@ class AssetModel(Base):
     )
     extra: Mapped[dict] = 
mapped_column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, 
default={})
 
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
-    updated_at: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    updated_at: Mapped[datetime] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=False
     )
 
@@ -438,7 +439,7 @@ class DagScheduleAssetNameReference(Base):
         nullable=False,
     )
     dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
 
     dag = relationship("DagModel", 
back_populates="schedule_asset_name_references")
 
@@ -484,7 +485,7 @@ class DagScheduleAssetUriReference(Base):
         nullable=False,
     )
     dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
 
     dag = relationship("DagModel", 
back_populates="schedule_asset_uri_references")
 
@@ -518,8 +519,8 @@ class DagScheduleAssetAliasReference(Base):
 
     alias_id: Mapped[int] = mapped_column(Integer, primary_key=True, 
nullable=False)
     dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
-    updated_at: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    updated_at: Mapped[datetime] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=False
     )
 
@@ -562,8 +563,8 @@ class DagScheduleAssetReference(Base):
 
     asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, 
nullable=False)
     dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
-    updated_at: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    updated_at: Mapped[datetime] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=False
     )
 
@@ -616,8 +617,8 @@ class TaskOutletAssetReference(Base):
     asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, 
nullable=False)
     dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
     task_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
-    updated_at: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    updated_at: Mapped[datetime] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=False
     )
 
@@ -667,8 +668,8 @@ class TaskInletAssetReference(Base):
     asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, 
nullable=False)
     dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
     task_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
-    updated_at: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    updated_at: Mapped[datetime] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=False
     )
 
@@ -712,7 +713,7 @@ class AssetDagRunQueue(Base):
 
     asset_id: Mapped[int] = mapped_column(Integer, primary_key=True, 
nullable=False)
     target_dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True, 
nullable=False)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
     asset = relationship("AssetModel", viewonly=True)
     dag_model = relationship("DagModel", viewonly=True)
 
@@ -782,7 +783,7 @@ class AssetEvent(Base):
     source_dag_id: Mapped[str | None] = mapped_column(StringID(), 
nullable=True)
     source_run_id: Mapped[str | None] = mapped_column(StringID(), 
nullable=True)
     source_map_index: Mapped[int | None] = mapped_column(Integer, 
nullable=True, server_default=text("-1"))
-    timestamp: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    timestamp: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
 
     __tablename__ = "asset_event"
     __table_args__ = (
diff --git a/airflow-core/src/airflow/models/backfill.py 
b/airflow-core/src/airflow/models/backfill.py
index 569224ca8cb..e459aa30fa6 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -117,8 +117,8 @@ class Backfill(Base):
 
     id: Mapped[int] = mapped_column(Integer, primary_key=True, 
autoincrement=True)
     dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
-    from_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
-    to_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
+    from_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
+    to_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
     dag_run_conf: Mapped[JSONField] = mapped_column(JSONField(json=json), 
nullable=False, default={})
     is_paused: Mapped[bool | None] = mapped_column(Boolean, default=False, 
nullable=True)
     """
@@ -130,9 +130,9 @@ class Backfill(Base):
         StringID(), nullable=False, default=ReprocessBehavior.NONE
     )
     max_active_runs: Mapped[int] = mapped_column(Integer, default=10, 
nullable=False)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
-    completed_at: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
-    updated_at: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    completed_at: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    updated_at: Mapped[datetime] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=False
     )
     triggering_user_name: Mapped[str | None] = mapped_column(
@@ -173,7 +173,7 @@ class BackfillDagRun(Base):
     backfill_id: Mapped[int] = mapped_column(Integer, nullable=False)
     dag_run_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
     exception_reason: Mapped[str | None] = mapped_column(StringID(), 
nullable=True)
-    logical_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
nullable=False)
+    logical_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
     sort_ordinal: Mapped[int] = mapped_column(Integer, nullable=False)
 
     backfill = relationship("Backfill", 
back_populates="backfill_dag_run_associations")
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 34edc73c367..7cceac5e9ee 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -322,12 +322,12 @@ class DagModel(Base):
     # Whether that DAG was seen on the last DagBag load
     is_stale: Mapped[bool] = mapped_column(Boolean, default=True)
     # Last time the scheduler started
-    last_parsed_time: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    last_parsed_time: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     # How long it took to parse this file
     last_parse_duration: Mapped[float | None] = mapped_column(Float, 
nullable=True)
     # Time when the DAG last received a refresh signal
     # (e.g. the DAG's "refresh" button was clicked in the web UI)
-    last_expired: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    last_expired: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     # The location of the file containing the DAG object
     # Note: Do not depend on fileloc pointing to a file; in the case of a
     # packaged DAG, it will point to the subpath of the DAG within the
@@ -375,14 +375,14 @@ class DagModel(Base):
     fail_fast: Mapped[bool] = mapped_column(Boolean, nullable=False, 
default=False, server_default="0")
 
     # The logical date of the next dag run.
-    next_dagrun: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    next_dagrun: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
 
     # Must be either both NULL or both datetime.
-    next_dagrun_data_interval_start: Mapped[UtcDateTime | None] = 
mapped_column(UtcDateTime, nullable=True)
-    next_dagrun_data_interval_end: Mapped[UtcDateTime | None] = 
mapped_column(UtcDateTime, nullable=True)
+    next_dagrun_data_interval_start: Mapped[datetime | None] = 
mapped_column(UtcDateTime, nullable=True)
+    next_dagrun_data_interval_end: Mapped[datetime | None] = 
mapped_column(UtcDateTime, nullable=True)
 
     # Earliest time at which this ``next_dagrun`` can be created.
-    next_dagrun_create_after: Mapped[UtcDateTime | None] = 
mapped_column(UtcDateTime, nullable=True)
+    next_dagrun_create_after: Mapped[datetime | None] = 
mapped_column(UtcDateTime, nullable=True)
 
     __table_args__ = (Index("idx_next_dagrun_create_after", 
next_dagrun_create_after, unique=False),)
 
diff --git a/airflow-core/src/airflow/models/dag_version.py 
b/airflow-core/src/airflow/models/dag_version.py
index 687e1367ece..fbc04f9171e 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import logging
+from datetime import datetime
 from typing import TYPE_CHECKING
 
 import uuid6
@@ -65,8 +66,8 @@ class DagVersion(Base):
         cascade_backrefs=False,
     )
     task_instances = relationship("TaskInstance", back_populates="dag_version")
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
nullable=False, default=timezone.utcnow)
-    last_updated: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, 
default=timezone.utcnow)
+    last_updated: Mapped[datetime] = mapped_column(
         UtcDateTime, nullable=False, default=timezone.utcnow, 
onupdate=timezone.utcnow
     )
 
diff --git a/airflow-core/src/airflow/models/dagbundle.py 
b/airflow-core/src/airflow/models/dagbundle.py
index 1def1ac7189..fa81fc73514 100644
--- a/airflow-core/src/airflow/models/dagbundle.py
+++ b/airflow-core/src/airflow/models/dagbundle.py
@@ -16,6 +16,8 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime
+
 from sqlalchemy import Boolean, String
 from sqlalchemy.orm import Mapped, relationship
 from sqlalchemy_utils import JSONType
@@ -45,7 +47,7 @@ class DagBundleModel(Base, LoggingMixin):
     name: Mapped[str] = mapped_column(StringID(length=250), primary_key=True, 
nullable=False)
     active: Mapped[bool | None] = mapped_column(Boolean, default=True, 
nullable=True)
     version: Mapped[str | None] = mapped_column(String(200), nullable=True)
-    last_refreshed: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    last_refreshed: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     signed_url_template: Mapped[str | None] = mapped_column(String(200), 
nullable=True)
     template_params: Mapped[dict | None] = mapped_column(JSONType, 
nullable=True)
     teams = relationship("Team", secondary=dag_bundle_team_association_table, 
back_populates="dag_bundles")
diff --git a/airflow-core/src/airflow/models/dagcode.py 
b/airflow-core/src/airflow/models/dagcode.py
index 5be3c5395f2..e4c56df6edd 100644
--- a/airflow-core/src/airflow/models/dagcode.py
+++ b/airflow-core/src/airflow/models/dagcode.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import logging
+from datetime import datetime
 from typing import TYPE_CHECKING
 
 import uuid6
@@ -58,8 +59,8 @@ class DagCode(Base):
     dag_id: Mapped[str] = mapped_column(String(ID_LEN), nullable=False)
     fileloc: Mapped[str] = mapped_column(String(2000), nullable=False)
     # The max length of fileloc exceeds the limit of indexing.
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
nullable=False, default=timezone.utcnow)
-    last_updated: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, 
default=timezone.utcnow)
+    last_updated: Mapped[datetime] = mapped_column(
         UtcDateTime, nullable=False, default=timezone.utcnow, 
onupdate=timezone.utcnow
     )
     source_code: Mapped[str] = mapped_column(Text().with_variant(MEDIUMTEXT(), 
"mysql"), nullable=False)
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index 2c9cac3193d..b4ae52f07d0 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -149,10 +149,10 @@ class DagRun(Base, LoggingMixin):
 
     id: Mapped[int] = mapped_column(Integer, primary_key=True)
     dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
-    queued_at: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
-    logical_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
-    start_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
-    end_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    queued_at: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    start_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    end_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     _state: Mapped[str] = mapped_column("state", String(50), 
default=DagRunState.QUEUED)
     run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
     creating_job_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
@@ -168,12 +168,12 @@ class DagRun(Base, LoggingMixin):
         JSON().with_variant(postgresql.JSONB, "postgresql"), nullable=True
     )
     # These two must be either both NULL or both datetime.
-    data_interval_start: Mapped[UtcDateTime | None] = 
mapped_column(UtcDateTime, nullable=True)
-    data_interval_end: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    data_interval_start: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    data_interval_end: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     # Earliest time when this DagRun can start running.
-    run_after: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=_default_run_after, nullable=False)
+    run_after: Mapped[datetime] = mapped_column(UtcDateTime, 
default=_default_run_after, nullable=False)
     # When a scheduler last attempted to schedule TIs for this DagRun
-    last_scheduling_decision: Mapped[UtcDateTime | None] = 
mapped_column(UtcDateTime, nullable=True)
+    last_scheduling_decision: Mapped[datetime | None] = 
mapped_column(UtcDateTime, nullable=True)
     # Foreign key to LogTemplate. DagRun rows created prior to this column's
     # existence have this set to NULL. Later rows automatically populate this 
on
     # insert to point to the latest LogTemplate entry.
@@ -182,7 +182,7 @@ class DagRun(Base, LoggingMixin):
         ForeignKey("log_template.id", 
name="task_instance_log_template_id_fkey", ondelete="NO ACTION"),
         default=select(func.max(LogTemplate.__table__.c.id)),
     )
-    updated_at: Mapped[UtcDateTime] = mapped_column(
+    updated_at: Mapped[datetime] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow
     )
     # Keeps track of the number of times the dagrun had been cleared.
@@ -2125,8 +2125,8 @@ class DagRunNote(Base):
     user_id: Mapped[str | None] = mapped_column(String(128), nullable=True)
     dag_run_id: Mapped[int] = mapped_column(Integer, primary_key=True, 
nullable=False)
     content: Mapped[str | None] = 
mapped_column(String(1000).with_variant(Text(1000), "mysql"))
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
-    updated_at: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    updated_at: Mapped[datetime] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=False
     )
 
diff --git a/airflow-core/src/airflow/models/dagwarning.py 
b/airflow-core/src/airflow/models/dagwarning.py
index cec81ce8abe..6f4e4c7d14a 100644
--- a/airflow-core/src/airflow/models/dagwarning.py
+++ b/airflow-core/src/airflow/models/dagwarning.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime
 from enum import Enum
 from typing import TYPE_CHECKING
 
@@ -46,7 +47,7 @@ class DagWarning(Base):
     dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True)
     warning_type: Mapped[str] = mapped_column(String(50), primary_key=True)
     message: Mapped[str] = mapped_column(Text, nullable=False)
-    timestamp: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
nullable=False, default=timezone.utcnow)
+    timestamp: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, 
default=timezone.utcnow)
 
     dag_model = relationship("DagModel", viewonly=True, lazy="selectin")
 
diff --git a/airflow-core/src/airflow/models/db_callback_request.py 
b/airflow-core/src/airflow/models/db_callback_request.py
index 2176f3cccfb..c7f8481d306 100644
--- a/airflow-core/src/airflow/models/db_callback_request.py
+++ b/airflow-core/src/airflow/models/db_callback_request.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime
 from importlib import import_module
 from typing import TYPE_CHECKING
 
@@ -37,7 +38,7 @@ class DbCallbackRequest(Base):
     __tablename__ = "callback_request"
 
     id: Mapped[int] = mapped_column(Integer(), nullable=False, 
primary_key=True)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
     priority_weight: Mapped[int] = mapped_column(Integer(), nullable=False)
     callback_data: Mapped[dict] = mapped_column(ExtendedJSON, nullable=False)
     callback_type: Mapped[str] = mapped_column(String(20), nullable=False)
diff --git a/airflow-core/src/airflow/models/deadline.py 
b/airflow-core/src/airflow/models/deadline.py
index 052901f3ee2..eeb8ddfc873 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -105,7 +105,7 @@ class Deadline(Base):
     )
 
     # The time after which the Deadline has passed and the callback should be 
triggered.
-    deadline_time: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
nullable=False)
+    deadline_time: Mapped[datetime] = mapped_column(UtcDateTime, 
nullable=False)
     # The (serialized) callback to be called when the Deadline has passed.
     _callback: Mapped[dict] = mapped_column(
         "callback", sqlalchemy_jsonfield.JSONField(json=json), nullable=False
diff --git a/airflow-core/src/airflow/models/errors.py 
b/airflow-core/src/airflow/models/errors.py
index 16d67206623..e20096aa05d 100644
--- a/airflow-core/src/airflow/models/errors.py
+++ b/airflow-core/src/airflow/models/errors.py
@@ -17,6 +17,8 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime
+
 from sqlalchemy import Integer, String, Text
 from sqlalchemy.orm import Mapped
 
@@ -30,7 +32,7 @@ class ParseImportError(Base):
 
     __tablename__ = "import_error"
     id: Mapped[int] = mapped_column(Integer, primary_key=True)
-    timestamp: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    timestamp: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     filename: Mapped[str | None] = mapped_column(String(1024), nullable=True)
     bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True)
     stacktrace: Mapped[str | None] = mapped_column(Text, nullable=True)
diff --git a/airflow-core/src/airflow/models/hitl.py 
b/airflow-core/src/airflow/models/hitl.py
index fbfee77fbcf..7553f2fc097 100644
--- a/airflow-core/src/airflow/models/hitl.py
+++ b/airflow-core/src/airflow/models/hitl.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime
 from typing import TYPE_CHECKING, Any, TypedDict
 
 import sqlalchemy_jsonfield
@@ -153,10 +154,10 @@ class HITLDetail(Base, HITLDetailPropertyMixin):
         sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}
     )
     assignees: Mapped[dict | None] = 
mapped_column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
 
     # Response Content Detail
-    responded_at: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    responded_at: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     responded_by: Mapped[dict | None] = mapped_column(
         sqlalchemy_jsonfield.JSONField(json=json), nullable=True
     )
diff --git a/airflow-core/src/airflow/models/log.py 
b/airflow-core/src/airflow/models/log.py
index f0849332bd2..f86780295eb 100644
--- a/airflow-core/src/airflow/models/log.py
+++ b/airflow-core/src/airflow/models/log.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime
 from typing import TYPE_CHECKING
 
 from sqlalchemy import Index, Integer, String, Text
@@ -37,12 +38,12 @@ class Log(Base):
     __tablename__ = "log"
 
     id: Mapped[int] = mapped_column(Integer, primary_key=True)
-    dttm: Mapped[UtcDateTime] = mapped_column(UtcDateTime)
+    dttm: Mapped[datetime] = mapped_column(UtcDateTime)
     dag_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
     task_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
     map_index: Mapped[int | None] = mapped_column(Integer, nullable=True)
     event: Mapped[str] = mapped_column(String(60))
-    logical_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     run_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
     owner: Mapped[str | None] = mapped_column(String(500), nullable=True)
     owner_display_name: Mapped[str | None] = mapped_column(String(500), 
nullable=True)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index c0bf5be2860..7067a89c787 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -22,7 +22,7 @@ from __future__ import annotations
 import logging
 import zlib
 from collections.abc import Callable, Iterable, Iterator, Sequence
-from datetime import timedelta
+from datetime import datetime, timedelta
 from typing import TYPE_CHECKING, Any, Literal
 
 import sqlalchemy_jsonfield
@@ -286,8 +286,8 @@ class SerializedDagModel(Base):
         "data", sqlalchemy_jsonfield.JSONField(json=json).with_variant(JSONB, 
"postgresql"), nullable=True
     )
     _data_compressed: Mapped[bytes | None] = mapped_column("data_compressed", 
LargeBinary, nullable=True)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
nullable=False, default=timezone.utcnow)
-    last_updated: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, 
default=timezone.utcnow)
+    last_updated: Mapped[datetime] = mapped_column(
         UtcDateTime, nullable=False, default=timezone.utcnow, 
onupdate=timezone.utcnow
     )
     dag_hash: Mapped[str] = mapped_column(String(32), nullable=False)
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 3146f6a4acf..65c24ec687b 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -384,8 +384,8 @@ class TaskInstance(Base, LoggingMixin):
     run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
     map_index: Mapped[int] = mapped_column(Integer, nullable=False, 
server_default=text("-1"))
 
-    start_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
-    end_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    start_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    end_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     duration: Mapped[float | None] = mapped_column(Float, nullable=True)
     state: Mapped[str | None] = mapped_column(String(20), nullable=True)
     try_number: Mapped[int] = mapped_column(Integer, default=0)
@@ -398,15 +398,15 @@ class TaskInstance(Base, LoggingMixin):
     priority_weight: Mapped[int] = mapped_column(Integer)
     operator: Mapped[str | None] = mapped_column(String(1000), nullable=True)
     custom_operator_name: Mapped[str] = mapped_column(String(1000))
-    queued_dttm: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
-    scheduled_dttm: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    queued_dttm: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    scheduled_dttm: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     queued_by_job_id: Mapped[int | None] = mapped_column(Integer, 
nullable=True)
 
-    last_heartbeat_at: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    last_heartbeat_at: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     pid: Mapped[int | None] = mapped_column(Integer, nullable=True)
     executor: Mapped[str | None] = mapped_column(String(1000), nullable=True)
     executor_config: Mapped[dict] = 
mapped_column(ExecutorConfigType(pickler=dill))
-    updated_at: Mapped[UtcDateTime | None] = mapped_column(
+    updated_at: Mapped[datetime | None] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=True
     )
     _rendered_map_index: Mapped[str | None] = 
mapped_column("rendered_map_index", String(250), nullable=True)
@@ -421,7 +421,7 @@ class TaskInstance(Base, LoggingMixin):
     trigger_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
 
     # Optional timeout utcdatetime for the trigger (past this, we'll fail)
-    trigger_timeout: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    trigger_timeout: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
 
     # The method to call next, and any extra arguments to pass to it.
     # Usually used when resuming from DEFERRED.
@@ -2215,8 +2215,8 @@ class TaskInstanceNote(Base):
     )
     user_id: Mapped[str | None] = mapped_column(String(128), nullable=True)
     content: Mapped[str | None] = 
mapped_column(String(1000).with_variant(Text(1000), "mysql"))
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
-    updated_at: Mapped[UtcDateTime] = mapped_column(
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    updated_at: Mapped[datetime] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=False
     )
 
diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py 
b/airflow-core/src/airflow/models/taskinstancehistory.py
index 8b09b001847..3e885aec095 100644
--- a/airflow-core/src/airflow/models/taskinstancehistory.py
+++ b/airflow-core/src/airflow/models/taskinstancehistory.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime
 from typing import TYPE_CHECKING
 
 import dill
@@ -76,8 +77,8 @@ class TaskInstanceHistory(Base):
     run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
     map_index: Mapped[int] = mapped_column(Integer, nullable=False, 
server_default=text("-1"))
     try_number: Mapped[int] = mapped_column(Integer, nullable=False)
-    start_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
-    end_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    start_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    end_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     duration: Mapped[float | None] = mapped_column(Float, nullable=True)
     state: Mapped[str | None] = mapped_column(String(20), nullable=True)
     max_tries: Mapped[int | None] = mapped_column(Integer, 
server_default=text("-1"), nullable=True)
@@ -89,13 +90,13 @@ class TaskInstanceHistory(Base):
     priority_weight: Mapped[int | None] = mapped_column(Integer, nullable=True)
     operator: Mapped[str | None] = mapped_column(String(1000), nullable=True)
     custom_operator_name: Mapped[str | None] = mapped_column(String(1000), 
nullable=True)
-    queued_dttm: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
-    scheduled_dttm: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    queued_dttm: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    scheduled_dttm: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     queued_by_job_id: Mapped[int | None] = mapped_column(Integer, 
nullable=True)
     pid: Mapped[int | None] = mapped_column(Integer, nullable=True)
     executor: Mapped[str | None] = mapped_column(String(1000), nullable=True)
     executor_config: Mapped[dict | None] = 
mapped_column(ExecutorConfigType(pickler=dill), nullable=True)
-    updated_at: Mapped[UtcDateTime | None] = mapped_column(
+    updated_at: Mapped[datetime | None] = mapped_column(
         UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, 
nullable=True
     )
     rendered_map_index: Mapped[str | None] = mapped_column(String(250), 
nullable=True)
diff --git a/airflow-core/src/airflow/models/tasklog.py 
b/airflow-core/src/airflow/models/tasklog.py
index c28005bd11e..7e2c81f4b0c 100644
--- a/airflow-core/src/airflow/models/tasklog.py
+++ b/airflow-core/src/airflow/models/tasklog.py
@@ -17,6 +17,8 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime
+
 from sqlalchemy import Integer, Text
 from sqlalchemy.orm import Mapped
 
@@ -38,7 +40,7 @@ class LogTemplate(Base):
     id: Mapped[int] = mapped_column(Integer, primary_key=True, 
autoincrement=True)
     filename: Mapped[str] = mapped_column(Text, nullable=False)
     elasticsearch_id: Mapped[str] = mapped_column(Text, nullable=False)
-    created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
nullable=False, default=timezone.utcnow)
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, 
default=timezone.utcnow)
 
     def __repr__(self) -> str:
         attrs = ", ".join(f"{k}={getattr(self, k)}" for k in ("filename", 
"elasticsearch_id"))
diff --git a/airflow-core/src/airflow/models/taskreschedule.py 
b/airflow-core/src/airflow/models/taskreschedule.py
index 2dc6bcb18b2..996bce62d0d 100644
--- a/airflow-core/src/airflow/models/taskreschedule.py
+++ b/airflow-core/src/airflow/models/taskreschedule.py
@@ -55,10 +55,10 @@ class TaskReschedule(Base):
         ForeignKey("task_instance.id", ondelete="CASCADE", 
name="task_reschedule_ti_fkey"),
         nullable=False,
     )
-    start_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
nullable=False)
-    end_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
+    start_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime, 
nullable=False)
+    end_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime, 
nullable=False)
     duration: Mapped[int] = mapped_column(Integer, nullable=False)
-    reschedule_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
nullable=False)
+    reschedule_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime, 
nullable=False)
 
     task_instance = relationship(
         "TaskInstance", primaryjoin="TaskReschedule.ti_id == 
foreign(TaskInstance.id)", uselist=False
diff --git a/airflow-core/src/airflow/models/trigger.py 
b/airflow-core/src/airflow/models/trigger.py
index 94da2acc6aa..419d7b79e48 100644
--- a/airflow-core/src/airflow/models/trigger.py
+++ b/airflow-core/src/airflow/models/trigger.py
@@ -93,7 +93,7 @@ class Trigger(Base):
     id: Mapped[int] = mapped_column(Integer, primary_key=True)
     classpath: Mapped[str] = mapped_column(String(1000), nullable=False)
     encrypted_kwargs: Mapped[str] = mapped_column("kwargs", Text, 
nullable=False)
-    created_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
nullable=False)
+    created_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime, 
nullable=False)
     triggerer_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
 
     triggerer_job = relationship(
diff --git a/airflow-core/src/airflow/models/xcom.py 
b/airflow-core/src/airflow/models/xcom.py
index 5c300f1f620..197e10ebb5e 100644
--- a/airflow-core/src/airflow/models/xcom.py
+++ b/airflow-core/src/airflow/models/xcom.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import json
 import logging
 from collections.abc import Iterable
+from datetime import datetime
 from typing import TYPE_CHECKING, Any, cast
 
 from sqlalchemy import (
@@ -74,7 +75,7 @@ class XComModel(TaskInstanceDependencies):
     run_id: Mapped[str] = mapped_column(String(ID_LEN, **COLLATION_ARGS), 
nullable=False)
 
     value: Mapped[Any] = mapped_column(JSON().with_variant(postgresql.JSONB, 
"postgresql"), nullable=True)
-    timestamp: Mapped[UtcDateTime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    timestamp: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
 
     __table_args__ = (
         # Ideally we should create a unique index over (key, dag_id, task_id, 
run_id),


Reply via email to