Dev-iL commented on code in PR #55954:
URL: https://github.com/apache/airflow/pull/55954#discussion_r2382807429


##########
airflow-core/src/airflow/models/connection.py:
##########
@@ -126,19 +137,21 @@ class Connection(Base, LoggingMixin):
 
     __tablename__ = "connection"
 
-    id = Column(Integer(), primary_key=True)
-    conn_id = Column(String(ID_LEN), unique=True, nullable=False)
-    conn_type = Column(String(500), nullable=False)
-    description = Column(Text().with_variant(Text(5000), 
"mysql").with_variant(String(5000), "sqlite"))
-    host = Column(String(500))
-    schema = Column(String(500))
-    login = Column(Text())
-    _password = Column("password", Text())
-    port = Column(Integer())
-    is_encrypted = Column(Boolean, unique=False, default=False)
-    is_extra_encrypted = Column(Boolean, unique=False, default=False)
-    team_id = Column(UUIDType(binary=False), ForeignKey("team.id"), 
nullable=True)
-    _extra = Column("extra", Text())
+    id: Mapped[int] = mapped_column(Integer(), primary_key=True)
+    conn_id: Mapped[str] = mapped_column(String(ID_LEN), unique=True, 
nullable=False)

Review Comment:
   Why is it not the below?
   
   ```suggestion
       conn_id: Mapped[str] = mapped_column(StringID(), unique=True, 
nullable=False)
   ```



##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -117,23 +127,27 @@ class Backfill(Base):
 
     __tablename__ = "backfill"
 
-    id = Column(Integer, primary_key=True, autoincrement=True)
-    dag_id = Column(StringID(), nullable=False)
-    from_date = Column(UtcDateTime, nullable=False)
-    to_date = Column(UtcDateTime, nullable=False)
-    dag_run_conf = Column(JSONField(json=json), nullable=False, default={})
-    is_paused = Column(Boolean, default=False)
+    id: Mapped[int] = mapped_column(Integer, primary_key=True, 
autoincrement=True)
+    dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+    from_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
+    to_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
+    dag_run_conf = mapped_column(JSONField(json=json), nullable=False, 
default={})
+    is_paused: Mapped[bool] = mapped_column(Boolean, default=False)
     """
     Controls whether new dag runs will be created for this backfill.
 
     Does not pause existing dag runs.
     """
-    reprocess_behavior = Column(StringID(), nullable=False, 
default=ReprocessBehavior.NONE)
-    max_active_runs = Column(Integer, default=10, nullable=False)
-    created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
-    completed_at = Column(UtcDateTime, nullable=True)
-    updated_at = Column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow, nullable=False)
-    triggering_user_name = Column(
+    reprocess_behavior: Mapped[str] = mapped_column(
+        StringID(), nullable=False, default=ReprocessBehavior.NONE
+    )
+    max_active_runs: Mapped[int] = mapped_column(Integer, default=10, 
nullable=False)
+    created_at: Mapped[datetime] = mapped_column(UtcDateTime, 
default=timezone.utcnow, nullable=False)
+    completed_at: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)

Review Comment:
   The `nullable` kwarg is unnecessary if `None` is part of the type hint. I'm 
not sure it can be removed while we are still supporting 1.4, but should 
definitely be an action item when moving to 2.0 exclusively.
   
   ```python
   # Testing code for 2.0:
   from sqlalchemy import String, inspect
   from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
   
   
   class Base(DeclarativeBase):
       pass
   
   
   class User(Base):
       __tablename__ = "user"
   
       id: Mapped[int] = mapped_column(primary_key=True)
       nickname: Mapped[str | None]
   
   class User2(Base):
       __tablename__ = "user2"
   
       id: Mapped[int] = mapped_column(primary_key=True)
       nickname: Mapped[str | None] = mapped_column(String(30), nullable=True)
   
   class User3(Base):
       __tablename__ = "user3"
   
       id: Mapped[int] = mapped_column(primary_key=True)
       nickname: Mapped[str]
   
   print(inspect(User).columns.nickname.nullable)
   # True
   print(inspect(User2).columns.nickname.nullable)
   # True
   print(inspect(User3).columns.nickname.nullable)
   # False
   
   ```



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -2202,15 +2201,15 @@ class TaskInstanceNote(Base):
     """For storage of arbitrary notes concerning the task instance."""
 
     __tablename__ = "task_instance_note"
-    ti_id = Column(
+    ti_id = mapped_column(
         String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
         primary_key=True,
         nullable=False,
     )
-    user_id = Column(String(128), nullable=True)
-    content = Column(String(1000).with_variant(Text(1000), "mysql"))
-    created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
-    updated_at = Column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow, nullable=False)
+    user_id = mapped_column(String(128), nullable=True)
+    content = mapped_column(String(1000).with_variant(Text(1000), "mysql"))
+    created_at = mapped_column(UtcDateTime, default=timezone.utcnow, 
nullable=False)
+    updated_at = mapped_column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow, nullable=False)

Review Comment:
   Missing `Mapped[...]`



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -148,57 +147,61 @@ class DagRun(Base, LoggingMixin):
 
     __tablename__ = "dag_run"
 
-    id = Column(Integer, primary_key=True)
-    dag_id = Column(StringID(), nullable=False)
-    queued_at = Column(UtcDateTime)
-    logical_date = Column(UtcDateTime, nullable=True)
-    start_date = Column(UtcDateTime)
-    end_date = Column(UtcDateTime)
-    _state = Column("state", String(50), default=DagRunState.QUEUED)
-    run_id = Column(StringID(), nullable=False)
-    creating_job_id = Column(Integer)
-    run_type = Column(String(50), nullable=False)
-    triggered_by = Column(
+    id: Mapped[int] = mapped_column(Integer, primary_key=True)
+    dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+    queued_at: Mapped[datetime | None] = mapped_column(UtcDateTime)
+    logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    start_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
+    end_date: Mapped[datetime | None] = mapped_column(UtcDateTime)

Review Comment:
   Note that every date, and not just `logical_date` is now nullable, judging 
by the type hint. Was that intentional?



##########
airflow-core/src/airflow/models/taskreschedule.py:
##########
@@ -49,16 +48,16 @@ class TaskReschedule(Base):
     """TaskReschedule tracks rescheduled task instances."""
 
     __tablename__ = "task_reschedule"
-    id = Column(Integer, primary_key=True)
-    ti_id = Column(
+    id = mapped_column(Integer, primary_key=True)
+    ti_id = mapped_column(
         String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
         ForeignKey("task_instance.id", ondelete="CASCADE", 
name="task_reschedule_ti_fkey"),
         nullable=False,
     )
-    start_date = Column(UtcDateTime, nullable=False)
-    end_date = Column(UtcDateTime, nullable=False)
-    duration = Column(Integer, nullable=False)
-    reschedule_date = Column(UtcDateTime, nullable=False)
+    start_date = mapped_column(UtcDateTime, nullable=False)
+    end_date = mapped_column(UtcDateTime, nullable=False)
+    duration = mapped_column(Integer, nullable=False)
+    reschedule_date = mapped_column(UtcDateTime, nullable=False)

Review Comment:
   `Mapped[...]`



##########
airflow-core/src/airflow/models/taskmap.py:
##########
@@ -63,13 +63,13 @@ class TaskMap(TaskInstanceDependencies):
     __tablename__ = "task_map"
 
     # Link to upstream TaskInstance creating this dynamic mapping information.
-    dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
-    task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
-    run_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
-    map_index = Column(Integer, primary_key=True)
+    dag_id = mapped_column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    task_id = mapped_column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    run_id = mapped_column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    map_index = mapped_column(Integer, primary_key=True)
 
-    length = Column(Integer, nullable=False)
-    keys = Column(ExtendedJSON, nullable=True)
+    length = mapped_column(Integer, nullable=False)
+    keys = mapped_column(ExtendedJSON, nullable=True)

Review Comment:
   `Mapped[...]`



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -148,57 +147,61 @@ class DagRun(Base, LoggingMixin):
 
     __tablename__ = "dag_run"
 
-    id = Column(Integer, primary_key=True)
-    dag_id = Column(StringID(), nullable=False)
-    queued_at = Column(UtcDateTime)
-    logical_date = Column(UtcDateTime, nullable=True)
-    start_date = Column(UtcDateTime)
-    end_date = Column(UtcDateTime)
-    _state = Column("state", String(50), default=DagRunState.QUEUED)
-    run_id = Column(StringID(), nullable=False)
-    creating_job_id = Column(Integer)
-    run_type = Column(String(50), nullable=False)
-    triggered_by = Column(
+    id: Mapped[int] = mapped_column(Integer, primary_key=True)
+    dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+    queued_at: Mapped[datetime | None] = mapped_column(UtcDateTime)
+    logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    start_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
+    end_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
+    _state: Mapped[str] = mapped_column("state", String(50), 
default=DagRunState.QUEUED)
+    run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+    creating_job_id: Mapped[int | None] = mapped_column(Integer)

Review Comment:
   The field is now nullable. Intentional?



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -148,57 +147,61 @@ class DagRun(Base, LoggingMixin):
 
     __tablename__ = "dag_run"
 
-    id = Column(Integer, primary_key=True)
-    dag_id = Column(StringID(), nullable=False)
-    queued_at = Column(UtcDateTime)
-    logical_date = Column(UtcDateTime, nullable=True)
-    start_date = Column(UtcDateTime)
-    end_date = Column(UtcDateTime)
-    _state = Column("state", String(50), default=DagRunState.QUEUED)
-    run_id = Column(StringID(), nullable=False)
-    creating_job_id = Column(Integer)
-    run_type = Column(String(50), nullable=False)
-    triggered_by = Column(
+    id: Mapped[int] = mapped_column(Integer, primary_key=True)
+    dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+    queued_at: Mapped[datetime | None] = mapped_column(UtcDateTime)
+    logical_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
+    start_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
+    end_date: Mapped[datetime | None] = mapped_column(UtcDateTime)
+    _state: Mapped[str] = mapped_column("state", String(50), 
default=DagRunState.QUEUED)
+    run_id: Mapped[str] = mapped_column(StringID(), nullable=False)
+    creating_job_id: Mapped[int | None] = mapped_column(Integer)
+    run_type: Mapped[str] = mapped_column(String(50), nullable=False)
+    triggered_by: Mapped[DagRunTriggeredByType | None] = mapped_column(

Review Comment:
   Nullability changed



##########
airflow-core/src/airflow/models/renderedtifields.py:
##########
@@ -69,12 +69,12 @@ class RenderedTaskInstanceFields(TaskInstanceDependencies):
 
     __tablename__ = "rendered_task_instance_fields"
 
-    dag_id = Column(StringID(), primary_key=True)
-    task_id = Column(StringID(), primary_key=True)
-    run_id = Column(StringID(), primary_key=True)
-    map_index = Column(Integer, primary_key=True, server_default=text("-1"))
-    rendered_fields = Column(sqlalchemy_jsonfield.JSONField(json=json), 
nullable=False)
-    k8s_pod_yaml = Column(sqlalchemy_jsonfield.JSONField(json=json), 
nullable=True)
+    dag_id = mapped_column(StringID(), primary_key=True)
+    task_id = mapped_column(StringID(), primary_key=True)
+    run_id = mapped_column(StringID(), primary_key=True)
+    map_index = mapped_column(Integer, primary_key=True, 
server_default=text("-1"))
+    rendered_fields = mapped_column(sqlalchemy_jsonfield.JSONField(json=json), 
nullable=False)
+    k8s_pod_yaml = mapped_column(sqlalchemy_jsonfield.JSONField(json=json), 
nullable=True)

Review Comment:
   Why no `Mapped[...]`?



##########
airflow-core/src/airflow/models/taskinstancehistory.py:
##########
@@ -64,48 +64,48 @@ class TaskInstanceHistory(Base):
     """
 
     __tablename__ = "task_instance_history"
-    task_instance_id = Column(
+    task_instance_id = mapped_column(
         String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
         nullable=False,
         primary_key=True,
     )
-    task_id = Column(StringID(), nullable=False)
-    dag_id = Column(StringID(), nullable=False)
-    run_id = Column(StringID(), nullable=False)
-    map_index = Column(Integer, nullable=False, server_default=text("-1"))
-    try_number = Column(Integer, nullable=False)
-    start_date = Column(UtcDateTime)
-    end_date = Column(UtcDateTime)
-    duration = Column(Float)
-    state = Column(String(20))
-    max_tries = Column(Integer, server_default=text("-1"))
-    hostname = Column(String(1000))
-    unixname = Column(String(1000))
-    pool = Column(String(256), nullable=False)
-    pool_slots = Column(Integer, default=1, nullable=False)
-    queue = Column(String(256))
-    priority_weight = Column(Integer)
-    operator = Column(String(1000))
-    custom_operator_name = Column(String(1000))
-    queued_dttm = Column(UtcDateTime)
-    scheduled_dttm = Column(UtcDateTime)
-    queued_by_job_id = Column(Integer)
-    pid = Column(Integer)
-    executor = Column(String(1000))
-    executor_config = Column(ExecutorConfigType(pickler=dill))
-    updated_at = Column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow)
-    rendered_map_index = Column(String(250))
-    context_carrier = Column(MutableDict.as_mutable(ExtendedJSON))
-    span_status = Column(String(250), server_default=SpanStatus.NOT_STARTED, 
nullable=False)
-
-    external_executor_id = Column(StringID())
-    trigger_id = Column(Integer)
-    trigger_timeout = Column(DateTime)
-    next_method = Column(String(1000))
-    next_kwargs = Column(MutableDict.as_mutable(ExtendedJSON))
-
-    task_display_name = Column(String(2000), nullable=True)
-    dag_version_id = Column(UUIDType(binary=False))
+    task_id = mapped_column(StringID(), nullable=False)
+    dag_id = mapped_column(StringID(), nullable=False)
+    run_id = mapped_column(StringID(), nullable=False)
+    map_index = mapped_column(Integer, nullable=False, 
server_default=text("-1"))
+    try_number = mapped_column(Integer, nullable=False)
+    start_date = mapped_column(UtcDateTime)
+    end_date = mapped_column(UtcDateTime)
+    duration = mapped_column(Float)
+    state = mapped_column(String(20))
+    max_tries = mapped_column(Integer, server_default=text("-1"))
+    hostname = mapped_column(String(1000))
+    unixname = mapped_column(String(1000))
+    pool = mapped_column(String(256), nullable=False)
+    pool_slots = mapped_column(Integer, default=1, nullable=False)
+    queue = mapped_column(String(256))
+    priority_weight = mapped_column(Integer)
+    operator = mapped_column(String(1000))
+    custom_operator_name = mapped_column(String(1000))
+    queued_dttm = mapped_column(UtcDateTime)
+    scheduled_dttm = mapped_column(UtcDateTime)
+    queued_by_job_id = mapped_column(Integer)
+    pid = mapped_column(Integer)
+    executor = mapped_column(String(1000))
+    executor_config = mapped_column(ExecutorConfigType(pickler=dill))
+    updated_at = mapped_column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow)
+    rendered_map_index = mapped_column(String(250))
+    context_carrier = mapped_column(MutableDict.as_mutable(ExtendedJSON))
+    span_status = mapped_column(String(250), 
server_default=SpanStatus.NOT_STARTED, nullable=False)
+
+    external_executor_id = mapped_column(StringID())
+    trigger_id = mapped_column(Integer)
+    trigger_timeout = mapped_column(DateTime)
+    next_method = mapped_column(String(1000))
+    next_kwargs = mapped_column(MutableDict.as_mutable(ExtendedJSON))
+
+    task_display_name = mapped_column(String(2000), nullable=True)
+    dag_version_id = mapped_column(UUIDType(binary=False))

Review Comment:
   `Mapped[...]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to