This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b7df4639b5b Update trigger_timeout column in task_instance table to 
use UtcDateTime (#44507)
b7df4639b5b is described below

commit b7df4639b5b61cca0b5bac7687f1d6e6e4c6c40a
Author: vatsrahul1001 <[email protected]>
AuthorDate: Thu Dec 12 17:04:27 2024 +0530

    Update trigger_timeout column in task_instance table to use UtcDateTime 
(#44507)
    
    * update task instance trigger timeout to utcdatetime
    
    * update task instance trigger timeout to utcdatetime
    
    * update task instance trigger timeout to utcdatetime
    
    * updatng task_instance file
    
    * update migration desc
    
    * fixing execution api test
    
    * adding migration condition to run only for postgresql
    
    * fix static check
    
    * remove postgresql condition
---
 ...task_instance_trigger_timeout_to_utcdatetime.py | 62 ++++++++++++++++++++++
 airflow/models/taskinstance.py                     |  7 +--
 airflow/utils/db.py                                |  2 +-
 docs/apache-airflow/img/airflow_erd.sha256         |  2 +-
 docs/apache-airflow/migrations-ref.rst             |  4 +-
 .../execution_api/routes/test_task_instances.py    |  4 +-
 6 files changed, 71 insertions(+), 10 deletions(-)

diff --git 
a/airflow/migrations/versions/0051_3_0_0_update_task_instance_trigger_timeout_to_utcdatetime.py
 
b/airflow/migrations/versions/0051_3_0_0_update_task_instance_trigger_timeout_to_utcdatetime.py
new file mode 100644
index 00000000000..5a381577366
--- /dev/null
+++ 
b/airflow/migrations/versions/0051_3_0_0_update_task_instance_trigger_timeout_to_utcdatetime.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+update trigger_timeout column in task_instance table to UTC.
+
+Revision ID: 038dc8bc6284
+Revises: e229247a6cb1
+Create Date: 2024-11-30 10:47:17.542690
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.migrations.db_types import TIMESTAMP
+
+# revision identifiers, used by Alembic.
+revision = "038dc8bc6284"
+down_revision = "e229247a6cb1"
+branch_labels = None
+depends_on = None
+airflow_version = "3.0.0"
+
+
+def upgrade():
+    """Apply update task instance trigger timeout to utcdatetime."""
+    with op.batch_alter_table("task_instance", schema=None) as batch_op:
+        batch_op.alter_column(
+            "trigger_timeout",
+            existing_type=sa.DateTime(),
+            type_=TIMESTAMP(timezone=True),
+            existing_nullable=True,
+        )
+
+
+def downgrade():
+    """Unapply update task instance trigger timeout to utcdatetime."""
+    with op.batch_alter_table("task_instance", schema=None) as batch_op:
+        batch_op.alter_column(
+            "trigger_timeout",
+            existing_type=TIMESTAMP(timezone=True),
+            type_=sa.DateTime(),
+            existing_nullable=True,
+        )
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6ff78617c5f..dbdba3658ad 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1715,11 +1715,8 @@ class TaskInstance(Base, LoggingMixin):
     # The trigger to resume on if we are in state DEFERRED
     trigger_id = Column(Integer)
 
-    # Optional timeout datetime for the trigger (past this, we'll fail)
-    trigger_timeout = Column(DateTime)
-    # The trigger_timeout should be TIMESTAMP(using UtcDateTime) but for ease 
of
-    # migration, we are keeping it as DateTime pending a change where expensive
-    # migration is inevitable.
+    # Optional timeout utcdatetime for the trigger (past this, we'll fail)
+    trigger_timeout = Column(UtcDateTime)
 
     # The method to call next, and any extra arguments to pass to it.
     # Usually used when resuming from DEFERRED.
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 3bb79be0369..fe5c4c0a791 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -94,7 +94,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
     "2.9.2": "686269002441",
     "2.10.0": "22ed7efa9da2",
     "2.10.3": "5f2621c13b39",
-    "3.0.0": "e229247a6cb1",
+    "3.0.0": "038dc8bc6284",
 }
 
 
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 
b/docs/apache-airflow/img/airflow_erd.sha256
index e1bb8bb1931..10b180a41ae 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-95b59583f58fa1dad1ae07cac699b0a72143abae9beaa33f79c3ff0f24e52c7d
\ No newline at end of file
+8f2fd91375c546b297490e701dc3853d7ba53c7cd1422ed7f7e57b9ac86f6eca
\ No newline at end of file
diff --git a/docs/apache-airflow/migrations-ref.rst 
b/docs/apache-airflow/migrations-ref.rst
index ba29c11ac62..edd166e0bf4 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | Revision ID             | Revises ID       | Airflow Version   | Description 
                                                 |
 
+=========================+==================+===================+==============================================================+
-| ``e229247a6cb1`` (head) | ``eed27faa34e3`` | ``3.0.0``         | Add 
DagBundleModel.                                          |
+| ``038dc8bc6284`` (head) | ``e229247a6cb1`` | ``3.0.0``         | update 
trigger_timeout column in task_instance table to UTC. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``e229247a6cb1``        | ``eed27faa34e3`` | ``3.0.0``         | Add 
DagBundleModel.                                          |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | ``eed27faa34e3``        | ``9fc3fc5de720`` | ``3.0.0``         | Remove 
pickled data from xcom table.                         |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git a/tests/api_fastapi/execution_api/routes/test_task_instances.py 
b/tests/api_fastapi/execution_api/routes/test_task_instances.py
index a0011e3ad89..c13effee0bb 100644
--- a/tests/api_fastapi/execution_api/routes/test_task_instances.py
+++ b/tests/api_fastapi/execution_api/routes/test_task_instances.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+from datetime import datetime
 from unittest import mock
 
 import pytest
@@ -234,8 +235,7 @@ class TestTIUpdateState:
         assert tis[0].state == TaskInstanceState.DEFERRED
         assert tis[0].next_method == "execute_callback"
         assert tis[0].next_kwargs == {"key": "value"}
-        # TODO: Make TI.trigger_timeout a UtcDateTime instead of DateTime
-        assert tis[0].trigger_timeout == timezone.datetime(2024, 11, 
23).replace(tzinfo=None)
+        assert tis[0].trigger_timeout == timezone.make_aware(datetime(2024, 
11, 23), timezone=timezone.utc)
 
         t = session.query(Trigger).all()
         assert len(t) == 1

Reply via email to