This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 45740b19cf Set end_date and duration for triggers completed with 
end_from_trigger as True. (#41754)
45740b19cf is described below

commit 45740b19cfc5afcd4a3239504384357d7994c1c4
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Wed Aug 28 18:38:13 2024 +0530

    Set end_date and duration for triggers completed with end_from_trigger as 
True. (#41754)
---
 airflow/triggers/base.py     |  2 +-
 tests/models/test_trigger.py | 10 +++++++++-
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py
index 7b5338ad2f..bc1da861f3 100644
--- a/airflow/triggers/base.py
+++ b/airflow/triggers/base.py
@@ -203,7 +203,7 @@ class BaseTaskEndEvent(TriggerEvent):
         """
         # Mark the task with terminal state and prevent it from resuming on 
worker
         task_instance.trigger_id = None
-        task_instance.state = self.task_instance_state
+        task_instance.set_state(self.task_instance_state, session=session)
         self._submit_callback_if_necessary(task_instance=task_instance, 
session=session)
         self._push_xcoms_if_necessary(task_instance=task_instance)
 
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index 4aa5b8b581..407d6edd75 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -19,7 +19,9 @@ from __future__ import annotations
 import datetime
 import json
 from typing import Any, AsyncIterator
+from unittest.mock import patch
 
+import pendulum
 import pytest
 import pytz
 from cryptography.fernet import Fernet
@@ -161,11 +163,15 @@ def test_submit_failure(session, create_task_instance):
         (TaskSkippedEvent, "skipped"),
     ],
 )
-def test_submit_event_task_end(session, create_task_instance, event_cls, 
expected):
+@patch("airflow.utils.timezone.utcnow")
+def test_submit_event_task_end(mock_utcnow, session, create_task_instance, 
event_cls, expected):
     """
     Tests that events inheriting BaseTaskEndEvent *don't* re-wake their 
dependent
     but mark them in the appropriate terminal state and send xcom
     """
+    now = pendulum.now("UTC")
+    mock_utcnow.return_value = now
+
     # Make a trigger
     trigger = Trigger(classpath="does.not.matter", kwargs={})
     trigger.id = 1
@@ -199,6 +205,8 @@ def test_submit_event_task_end(session, 
create_task_instance, event_cls, expecte
     ti = session.query(TaskInstance).one()
     assert ti.state == expected
     assert ti.next_kwargs is None
+    assert ti.end_date == now
+    assert ti.duration is not None
     actual_xcoms = {x.key: x.value for x in get_xcoms(ti)}
     assert actual_xcoms == {"return_value": "xcomret", "a": "b", "c": "d"}
 

Reply via email to