This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 14d42eb063f Set static type as `datetime` in models when type is
`UtcDateTime` (#56780)
14d42eb063f is described below
commit 14d42eb063f33eec4be3af906eed00e6ce585488
Author: Vincent <[email protected]>
AuthorDate: Mon Oct 20 11:04:49 2025 -0400
Set static type as `datetime` in models when type is `UtcDateTime` (#56780)
---
airflow-core/src/airflow/models/asset.py | 29 +++++++++++-----------
airflow-core/src/airflow/models/backfill.py | 12 ++++-----
airflow-core/src/airflow/models/dag.py | 12 ++++-----
airflow-core/src/airflow/models/dag_version.py | 5 ++--
airflow-core/src/airflow/models/dagbundle.py | 4 ++-
airflow-core/src/airflow/models/dagcode.py | 5 ++--
airflow-core/src/airflow/models/dagrun.py | 22 ++++++++--------
airflow-core/src/airflow/models/dagwarning.py | 3 ++-
.../src/airflow/models/db_callback_request.py | 3 ++-
airflow-core/src/airflow/models/deadline.py | 2 +-
airflow-core/src/airflow/models/errors.py | 4 ++-
airflow-core/src/airflow/models/hitl.py | 5 ++--
airflow-core/src/airflow/models/log.py | 5 ++--
airflow-core/src/airflow/models/serialized_dag.py | 6 ++---
airflow-core/src/airflow/models/taskinstance.py | 18 +++++++-------
.../src/airflow/models/taskinstancehistory.py | 11 ++++----
airflow-core/src/airflow/models/tasklog.py | 4 ++-
airflow-core/src/airflow/models/taskreschedule.py | 6 ++---
airflow-core/src/airflow/models/trigger.py | 2 +-
airflow-core/src/airflow/models/xcom.py | 3 ++-
20 files changed, 88 insertions(+), 73 deletions(-)
diff --git a/airflow-core/src/airflow/models/asset.py
b/airflow-core/src/airflow/models/asset.py
index 4d9bd8ae1cf..02513c9a2a3 100644
--- a/airflow-core/src/airflow/models/asset.py
+++ b/airflow-core/src/airflow/models/asset.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+from datetime import datetime
from typing import TYPE_CHECKING
from urllib.parse import urlsplit
@@ -303,8 +304,8 @@ class AssetModel(Base):
)
extra: Mapped[dict] =
mapped_column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False,
default={})
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
- updated_at: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=False
)
@@ -438,7 +439,7 @@ class DagScheduleAssetNameReference(Base):
nullable=False,
)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
dag = relationship("DagModel",
back_populates="schedule_asset_name_references")
@@ -484,7 +485,7 @@ class DagScheduleAssetUriReference(Base):
nullable=False,
)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
dag = relationship("DagModel",
back_populates="schedule_asset_uri_references")
@@ -518,8 +519,8 @@ class DagScheduleAssetAliasReference(Base):
alias_id: Mapped[int] = mapped_column(Integer, primary_key=True,
nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
- updated_at: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=False
)
@@ -562,8 +563,8 @@ class DagScheduleAssetReference(Base):
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True,
nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
- updated_at: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=False
)
@@ -616,8 +617,8 @@ class TaskOutletAssetReference(Base):
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True,
nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
task_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
- updated_at: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=False
)
@@ -667,8 +668,8 @@ class TaskInletAssetReference(Base):
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True,
nullable=False)
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
task_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
- updated_at: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=False
)
@@ -712,7 +713,7 @@ class AssetDagRunQueue(Base):
asset_id: Mapped[int] = mapped_column(Integer, primary_key=True,
nullable=False)
target_dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True,
nullable=False)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
asset = relationship("AssetModel", viewonly=True)
dag_model = relationship("DagModel", viewonly=True)
@@ -782,7 +783,7 @@ class AssetEvent(Base):
source_dag_id: Mapped[str | None] = mapped_column(StringID(),
nullable=True)
source_run_id: Mapped[str | None] = mapped_column(StringID(),
nullable=True)
source_map_index: Mapped[int | None] = mapped_column(Integer,
nullable=True, server_default=text("-1"))
- timestamp: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ timestamp: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
__tablename__ = "asset_event"
__table_args__ = (
diff --git a/airflow-core/src/airflow/models/backfill.py
b/airflow-core/src/airflow/models/backfill.py
index 569224ca8cb..e459aa30fa6 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -117,8 +117,8 @@ class Backfill(Base):
id: Mapped[int] = mapped_column(Integer, primary_key=True,
autoincrement=True)
dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
- from_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
- to_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
+ from_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
+ to_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
dag_run_conf: Mapped[JSONField] = mapped_column(JSONField(json=json),
nullable=False, default={})
is_paused: Mapped[bool | None] = mapped_column(Boolean, default=False,
nullable=True)
"""
@@ -130,9 +130,9 @@ class Backfill(Base):
StringID(), nullable=False, default=ReprocessBehavior.NONE
)
max_active_runs: Mapped[int] = mapped_column(Integer, default=10,
nullable=False)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
- completed_at: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
- updated_at: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ completed_at: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=False
)
triggering_user_name: Mapped[str | None] = mapped_column(
@@ -173,7 +173,7 @@ class BackfillDagRun(Base):
backfill_id: Mapped[int] = mapped_column(Integer, nullable=False)
dag_run_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
exception_reason: Mapped[str | None] = mapped_column(StringID(),
nullable=True)
- logical_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
nullable=False)
+ logical_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
sort_ordinal: Mapped[int] = mapped_column(Integer, nullable=False)
backfill = relationship("Backfill",
back_populates="backfill_dag_run_associations")
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 34edc73c367..7cceac5e9ee 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -322,12 +322,12 @@ class DagModel(Base):
# Whether that DAG was seen on the last DagBag load
is_stale: Mapped[bool] = mapped_column(Boolean, default=True)
# Last time the scheduler started
- last_parsed_time: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ last_parsed_time: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
# How long it took to parse this file
last_parse_duration: Mapped[float | None] = mapped_column(Float,
nullable=True)
# Time when the DAG last received a refresh signal
# (e.g. the DAG's "refresh" button was clicked in the web UI)
- last_expired: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ last_expired: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
# The location of the file containing the DAG object
# Note: Do not depend on fileloc pointing to a file; in the case of a
# packaged DAG, it will point to the subpath of the DAG within the
@@ -375,14 +375,14 @@ class DagModel(Base):
fail_fast: Mapped[bool] = mapped_column(Boolean, nullable=False,
default=False, server_default="0")
# The logical date of the next dag run.
- next_dagrun: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ next_dagrun: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
# Must be either both NULL or both datetime.
- next_dagrun_data_interval_start: Mapped[UtcDateTime | None] =
mapped_column(UtcDateTime, nullable=True)
- next_dagrun_data_interval_end: Mapped[UtcDateTime | None] =
mapped_column(UtcDateTime, nullable=True)
+ next_dagrun_data_interval_start: Mapped[datetime | None] =
mapped_column(UtcDateTime, nullable=True)
+ next_dagrun_data_interval_end: Mapped[datetime | None] =
mapped_column(UtcDateTime, nullable=True)
# Earliest time at which this ``next_dagrun`` can be created.
- next_dagrun_create_after: Mapped[UtcDateTime | None] =
mapped_column(UtcDateTime, nullable=True)
+ next_dagrun_create_after: Mapped[datetime | None] =
mapped_column(UtcDateTime, nullable=True)
__table_args__ = (Index("idx_next_dagrun_create_after",
next_dagrun_create_after, unique=False),)
diff --git a/airflow-core/src/airflow/models/dag_version.py
b/airflow-core/src/airflow/models/dag_version.py
index 687e1367ece..fbc04f9171e 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import logging
+from datetime import datetime
from typing import TYPE_CHECKING
import uuid6
@@ -65,8 +66,8 @@ class DagVersion(Base):
cascade_backrefs=False,
)
task_instances = relationship("TaskInstance", back_populates="dag_version")
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
nullable=False, default=timezone.utcnow)
- last_updated: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False,
default=timezone.utcnow)
+ last_updated: Mapped[datetime] = mapped_column(
UtcDateTime, nullable=False, default=timezone.utcnow,
onupdate=timezone.utcnow
)
diff --git a/airflow-core/src/airflow/models/dagbundle.py
b/airflow-core/src/airflow/models/dagbundle.py
index 1def1ac7189..fa81fc73514 100644
--- a/airflow-core/src/airflow/models/dagbundle.py
+++ b/airflow-core/src/airflow/models/dagbundle.py
@@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations
+from datetime import datetime
+
from sqlalchemy import Boolean, String
from sqlalchemy.orm import Mapped, relationship
from sqlalchemy_utils import JSONType
@@ -45,7 +47,7 @@ class DagBundleModel(Base, LoggingMixin):
name: Mapped[str] = mapped_column(StringID(length=250), primary_key=True,
nullable=False)
active: Mapped[bool | None] = mapped_column(Boolean, default=True,
nullable=True)
version: Mapped[str | None] = mapped_column(String(200), nullable=True)
- last_refreshed: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ last_refreshed: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
signed_url_template: Mapped[str | None] = mapped_column(String(200),
nullable=True)
template_params: Mapped[dict | None] = mapped_column(JSONType,
nullable=True)
teams = relationship("Team", secondary=dag_bundle_team_association_table,
back_populates="dag_bundles")
diff --git a/airflow-core/src/airflow/models/dagcode.py
b/airflow-core/src/airflow/models/dagcode.py
index 5be3c5395f2..e4c56df6edd 100644
--- a/airflow-core/src/airflow/models/dagcode.py
+++ b/airflow-core/src/airflow/models/dagcode.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import logging
+from datetime import datetime
from typing import TYPE_CHECKING
import uuid6
@@ -58,8 +59,8 @@ class DagCode(Base):
dag_id: Mapped[str] = mapped_column(String(ID_LEN), nullable=False)
fileloc: Mapped[str] = mapped_column(String(2000), nullable=False)
# The max length of fileloc exceeds the limit of indexing.
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
nullable=False, default=timezone.utcnow)
- last_updated: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False,
default=timezone.utcnow)
+ last_updated: Mapped[datetime] = mapped_column(
UtcDateTime, nullable=False, default=timezone.utcnow,
onupdate=timezone.utcnow
)
source_code: Mapped[str] = mapped_column(Text().with_variant(MEDIUMTEXT(),
"mysql"), nullable=False)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 2c9cac3193d..b4ae52f07d0 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -149,10 +149,10 @@ class DagRun(Base, LoggingMixin):
id: Mapped[int] = mapped_column(Integer, primary_key=True)
dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
- queued_at: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
- logical_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
- start_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
- end_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ queued_at: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ start_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ end_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
_state: Mapped[str] = mapped_column("state", String(50),
default=DagRunState.QUEUED)
run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
creating_job_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
@@ -168,12 +168,12 @@ class DagRun(Base, LoggingMixin):
JSON().with_variant(postgresql.JSONB, "postgresql"), nullable=True
)
# These two must be either both NULL or both datetime.
- data_interval_start: Mapped[UtcDateTime | None] =
mapped_column(UtcDateTime, nullable=True)
- data_interval_end: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ data_interval_start: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ data_interval_end: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
# Earliest time when this DagRun can start running.
- run_after: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=_default_run_after, nullable=False)
+ run_after: Mapped[datetime] = mapped_column(UtcDateTime,
default=_default_run_after, nullable=False)
# When a scheduler last attempted to schedule TIs for this DagRun
- last_scheduling_decision: Mapped[UtcDateTime | None] =
mapped_column(UtcDateTime, nullable=True)
+ last_scheduling_decision: Mapped[datetime | None] =
mapped_column(UtcDateTime, nullable=True)
# Foreign key to LogTemplate. DagRun rows created prior to this column's
# existence have this set to NULL. Later rows automatically populate this
on
# insert to point to the latest LogTemplate entry.
@@ -182,7 +182,7 @@ class DagRun(Base, LoggingMixin):
ForeignKey("log_template.id",
name="task_instance_log_template_id_fkey", ondelete="NO ACTION"),
default=select(func.max(LogTemplate.__table__.c.id)),
)
- updated_at: Mapped[UtcDateTime] = mapped_column(
+ updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow
)
# Keeps track of the number of times the dagrun had been cleared.
@@ -2125,8 +2125,8 @@ class DagRunNote(Base):
user_id: Mapped[str | None] = mapped_column(String(128), nullable=True)
dag_run_id: Mapped[int] = mapped_column(Integer, primary_key=True,
nullable=False)
content: Mapped[str | None] =
mapped_column(String(1000).with_variant(Text(1000), "mysql"))
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
- updated_at: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=False
)
diff --git a/airflow-core/src/airflow/models/dagwarning.py
b/airflow-core/src/airflow/models/dagwarning.py
index cec81ce8abe..6f4e4c7d14a 100644
--- a/airflow-core/src/airflow/models/dagwarning.py
+++ b/airflow-core/src/airflow/models/dagwarning.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING
@@ -46,7 +47,7 @@ class DagWarning(Base):
dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True)
warning_type: Mapped[str] = mapped_column(String(50), primary_key=True)
message: Mapped[str] = mapped_column(Text, nullable=False)
- timestamp: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
nullable=False, default=timezone.utcnow)
+ timestamp: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False,
default=timezone.utcnow)
dag_model = relationship("DagModel", viewonly=True, lazy="selectin")
diff --git a/airflow-core/src/airflow/models/db_callback_request.py
b/airflow-core/src/airflow/models/db_callback_request.py
index 2176f3cccfb..c7f8481d306 100644
--- a/airflow-core/src/airflow/models/db_callback_request.py
+++ b/airflow-core/src/airflow/models/db_callback_request.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+from datetime import datetime
from importlib import import_module
from typing import TYPE_CHECKING
@@ -37,7 +38,7 @@ class DbCallbackRequest(Base):
__tablename__ = "callback_request"
id: Mapped[int] = mapped_column(Integer(), nullable=False,
primary_key=True)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
priority_weight: Mapped[int] = mapped_column(Integer(), nullable=False)
callback_data: Mapped[dict] = mapped_column(ExtendedJSON, nullable=False)
callback_type: Mapped[str] = mapped_column(String(20), nullable=False)
diff --git a/airflow-core/src/airflow/models/deadline.py
b/airflow-core/src/airflow/models/deadline.py
index 052901f3ee2..eeb8ddfc873 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -105,7 +105,7 @@ class Deadline(Base):
)
# The time after which the Deadline has passed and the callback should be
triggered.
- deadline_time: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
nullable=False)
+ deadline_time: Mapped[datetime] = mapped_column(UtcDateTime,
nullable=False)
# The (serialized) callback to be called when the Deadline has passed.
_callback: Mapped[dict] = mapped_column(
"callback", sqlalchemy_jsonfield.JSONField(json=json), nullable=False
diff --git a/airflow-core/src/airflow/models/errors.py
b/airflow-core/src/airflow/models/errors.py
index 16d67206623..e20096aa05d 100644
--- a/airflow-core/src/airflow/models/errors.py
+++ b/airflow-core/src/airflow/models/errors.py
@@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations
+from datetime import datetime
+
from sqlalchemy import Integer, String, Text
from sqlalchemy.orm import Mapped
@@ -30,7 +32,7 @@ class ParseImportError(Base):
__tablename__ = "import_error"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
- timestamp: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ timestamp: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
filename: Mapped[str | None] = mapped_column(String(1024), nullable=True)
bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True)
stacktrace: Mapped[str | None] = mapped_column(Text, nullable=True)
diff --git a/airflow-core/src/airflow/models/hitl.py
b/airflow-core/src/airflow/models/hitl.py
index fbfee77fbcf..7553f2fc097 100644
--- a/airflow-core/src/airflow/models/hitl.py
+++ b/airflow-core/src/airflow/models/hitl.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+from datetime import datetime
from typing import TYPE_CHECKING, Any, TypedDict
import sqlalchemy_jsonfield
@@ -153,10 +154,10 @@ class HITLDetail(Base, HITLDetailPropertyMixin):
sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}
)
assignees: Mapped[dict | None] =
mapped_column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
# Response Content Detail
- responded_at: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ responded_at: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
responded_by: Mapped[dict | None] = mapped_column(
sqlalchemy_jsonfield.JSONField(json=json), nullable=True
)
diff --git a/airflow-core/src/airflow/models/log.py
b/airflow-core/src/airflow/models/log.py
index f0849332bd2..f86780295eb 100644
--- a/airflow-core/src/airflow/models/log.py
+++ b/airflow-core/src/airflow/models/log.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import Index, Integer, String, Text
@@ -37,12 +38,12 @@ class Log(Base):
__tablename__ = "log"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
- dttm: Mapped[UtcDateTime] = mapped_column(UtcDateTime)
+ dttm: Mapped[datetime] = mapped_column(UtcDateTime)
dag_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
task_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
map_index: Mapped[int | None] = mapped_column(Integer, nullable=True)
event: Mapped[str] = mapped_column(String(60))
- logical_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
run_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
owner: Mapped[str | None] = mapped_column(String(500), nullable=True)
owner_display_name: Mapped[str | None] = mapped_column(String(500),
nullable=True)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index c0bf5be2860..7067a89c787 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -22,7 +22,7 @@ from __future__ import annotations
import logging
import zlib
from collections.abc import Callable, Iterable, Iterator, Sequence
-from datetime import timedelta
+from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Literal
import sqlalchemy_jsonfield
@@ -286,8 +286,8 @@ class SerializedDagModel(Base):
"data", sqlalchemy_jsonfield.JSONField(json=json).with_variant(JSONB,
"postgresql"), nullable=True
)
_data_compressed: Mapped[bytes | None] = mapped_column("data_compressed",
LargeBinary, nullable=True)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
nullable=False, default=timezone.utcnow)
- last_updated: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False,
default=timezone.utcnow)
+ last_updated: Mapped[datetime] = mapped_column(
UtcDateTime, nullable=False, default=timezone.utcnow,
onupdate=timezone.utcnow
)
dag_hash: Mapped[str] = mapped_column(String(32), nullable=False)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 3146f6a4acf..65c24ec687b 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -384,8 +384,8 @@ class TaskInstance(Base, LoggingMixin):
run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
map_index: Mapped[int] = mapped_column(Integer, nullable=False,
server_default=text("-1"))
- start_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
- end_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ start_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ end_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
duration: Mapped[float | None] = mapped_column(Float, nullable=True)
state: Mapped[str | None] = mapped_column(String(20), nullable=True)
try_number: Mapped[int] = mapped_column(Integer, default=0)
@@ -398,15 +398,15 @@ class TaskInstance(Base, LoggingMixin):
priority_weight: Mapped[int] = mapped_column(Integer)
operator: Mapped[str | None] = mapped_column(String(1000), nullable=True)
custom_operator_name: Mapped[str] = mapped_column(String(1000))
- queued_dttm: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
- scheduled_dttm: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ queued_dttm: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ scheduled_dttm: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
queued_by_job_id: Mapped[int | None] = mapped_column(Integer,
nullable=True)
- last_heartbeat_at: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ last_heartbeat_at: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
pid: Mapped[int | None] = mapped_column(Integer, nullable=True)
executor: Mapped[str | None] = mapped_column(String(1000), nullable=True)
executor_config: Mapped[dict] =
mapped_column(ExecutorConfigType(pickler=dill))
- updated_at: Mapped[UtcDateTime | None] = mapped_column(
+ updated_at: Mapped[datetime | None] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=True
)
_rendered_map_index: Mapped[str | None] =
mapped_column("rendered_map_index", String(250), nullable=True)
@@ -421,7 +421,7 @@ class TaskInstance(Base, LoggingMixin):
trigger_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
# Optional timeout utcdatetime for the trigger (past this, we'll fail)
- trigger_timeout: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ trigger_timeout: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
# The method to call next, and any extra arguments to pass to it.
# Usually used when resuming from DEFERRED.
@@ -2215,8 +2215,8 @@ class TaskInstanceNote(Base):
)
user_id: Mapped[str | None] = mapped_column(String(128), nullable=True)
content: Mapped[str | None] =
mapped_column(String(1000).with_variant(Text(1000), "mysql"))
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
- updated_at: Mapped[UtcDateTime] = mapped_column(
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=False
)
diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py
b/airflow-core/src/airflow/models/taskinstancehistory.py
index 8b09b001847..3e885aec095 100644
--- a/airflow-core/src/airflow/models/taskinstancehistory.py
+++ b/airflow-core/src/airflow/models/taskinstancehistory.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+from datetime import datetime
from typing import TYPE_CHECKING
import dill
@@ -76,8 +77,8 @@ class TaskInstanceHistory(Base):
run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
map_index: Mapped[int] = mapped_column(Integer, nullable=False,
server_default=text("-1"))
try_number: Mapped[int] = mapped_column(Integer, nullable=False)
- start_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
- end_date: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ start_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ end_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
duration: Mapped[float | None] = mapped_column(Float, nullable=True)
state: Mapped[str | None] = mapped_column(String(20), nullable=True)
max_tries: Mapped[int | None] = mapped_column(Integer,
server_default=text("-1"), nullable=True)
@@ -89,13 +90,13 @@ class TaskInstanceHistory(Base):
priority_weight: Mapped[int | None] = mapped_column(Integer, nullable=True)
operator: Mapped[str | None] = mapped_column(String(1000), nullable=True)
custom_operator_name: Mapped[str | None] = mapped_column(String(1000),
nullable=True)
- queued_dttm: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
- scheduled_dttm: Mapped[UtcDateTime | None] = mapped_column(UtcDateTime,
nullable=True)
+ queued_dttm: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
+ scheduled_dttm: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
queued_by_job_id: Mapped[int | None] = mapped_column(Integer,
nullable=True)
pid: Mapped[int | None] = mapped_column(Integer, nullable=True)
executor: Mapped[str | None] = mapped_column(String(1000), nullable=True)
executor_config: Mapped[dict | None] =
mapped_column(ExecutorConfigType(pickler=dill), nullable=True)
- updated_at: Mapped[UtcDateTime | None] = mapped_column(
+ updated_at: Mapped[datetime | None] = mapped_column(
UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow,
nullable=True
)
rendered_map_index: Mapped[str | None] = mapped_column(String(250),
nullable=True)
diff --git a/airflow-core/src/airflow/models/tasklog.py
b/airflow-core/src/airflow/models/tasklog.py
index c28005bd11e..7e2c81f4b0c 100644
--- a/airflow-core/src/airflow/models/tasklog.py
+++ b/airflow-core/src/airflow/models/tasklog.py
@@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations
+from datetime import datetime
+
from sqlalchemy import Integer, Text
from sqlalchemy.orm import Mapped
@@ -38,7 +40,7 @@ class LogTemplate(Base):
id: Mapped[int] = mapped_column(Integer, primary_key=True,
autoincrement=True)
filename: Mapped[str] = mapped_column(Text, nullable=False)
elasticsearch_id: Mapped[str] = mapped_column(Text, nullable=False)
- created_at: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
nullable=False, default=timezone.utcnow)
+ created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False,
default=timezone.utcnow)
def __repr__(self) -> str:
attrs = ", ".join(f"{k}={getattr(self, k)}" for k in ("filename",
"elasticsearch_id"))
diff --git a/airflow-core/src/airflow/models/taskreschedule.py
b/airflow-core/src/airflow/models/taskreschedule.py
index 2dc6bcb18b2..996bce62d0d 100644
--- a/airflow-core/src/airflow/models/taskreschedule.py
+++ b/airflow-core/src/airflow/models/taskreschedule.py
@@ -55,10 +55,10 @@ class TaskReschedule(Base):
ForeignKey("task_instance.id", ondelete="CASCADE",
name="task_reschedule_ti_fkey"),
nullable=False,
)
- start_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
nullable=False)
- end_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime, nullable=False)
+ start_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime,
nullable=False)
+ end_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime,
nullable=False)
duration: Mapped[int] = mapped_column(Integer, nullable=False)
- reschedule_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
nullable=False)
+ reschedule_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime,
nullable=False)
task_instance = relationship(
"TaskInstance", primaryjoin="TaskReschedule.ti_id ==
foreign(TaskInstance.id)", uselist=False
diff --git a/airflow-core/src/airflow/models/trigger.py
b/airflow-core/src/airflow/models/trigger.py
index 94da2acc6aa..419d7b79e48 100644
--- a/airflow-core/src/airflow/models/trigger.py
+++ b/airflow-core/src/airflow/models/trigger.py
@@ -93,7 +93,7 @@ class Trigger(Base):
id: Mapped[int] = mapped_column(Integer, primary_key=True)
classpath: Mapped[str] = mapped_column(String(1000), nullable=False)
encrypted_kwargs: Mapped[str] = mapped_column("kwargs", Text,
nullable=False)
- created_date: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
nullable=False)
+ created_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime,
nullable=False)
triggerer_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
triggerer_job = relationship(
diff --git a/airflow-core/src/airflow/models/xcom.py
b/airflow-core/src/airflow/models/xcom.py
index 5c300f1f620..197e10ebb5e 100644
--- a/airflow-core/src/airflow/models/xcom.py
+++ b/airflow-core/src/airflow/models/xcom.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import json
import logging
from collections.abc import Iterable
+from datetime import datetime
from typing import TYPE_CHECKING, Any, cast
from sqlalchemy import (
@@ -74,7 +75,7 @@ class XComModel(TaskInstanceDependencies):
run_id: Mapped[str] = mapped_column(String(ID_LEN, **COLLATION_ARGS),
nullable=False)
value: Mapped[Any] = mapped_column(JSON().with_variant(postgresql.JSONB,
"postgresql"), nullable=True)
- timestamp: Mapped[UtcDateTime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
+ timestamp: Mapped[datetime] = mapped_column(UtcDateTime,
default=timezone.utcnow, nullable=False)
__table_args__ = (
# Ideally we should create a unique index over (key, dag_id, task_id,
run_id),