This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a60429eadf feat(jobs/triggerer_job_runner): add triggerer canceled log
(#31757)
a60429eadf is described below
commit a60429eadfffb5fb0f867c220a6cecf628692dcf
Author: Wei Lee <[email protected]>
AuthorDate: Fri Jun 16 16:31:51 2023 +0800
feat(jobs/triggerer_job_runner): add triggerer canceled log (#31757)
Emit log message when trigger is cancelled
Co-authored-by: Daniel Standish
<[email protected]>
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow/jobs/triggerer_job_runner.py | 8 ++++++++
tests/jobs/test_triggerer_job.py | 29 ++++++++++++++++++++++++++++-
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index 7c6b2b2044..5f4b77cefa 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -40,6 +40,7 @@ from airflow.serialization.pydantic.job import JobPydantic
from airflow.stats import Stats
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.typing_compat import TypedDict
+from airflow.utils import timezone
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.trigger_handler import (
@@ -608,6 +609,13 @@ class TriggerRunner(threading.Thread, LoggingMixin):
self.log.info("Trigger %s fired: %s",
self.triggers[trigger_id]["name"], event)
self.triggers[trigger_id]["events"] += 1
self.events.append((trigger_id, event))
+ except asyncio.CancelledError as err:
+ if timeout := trigger.task_instance.trigger_timeout:
+ timeout = timeout.replace(tzinfo=timezone.utc) if not
timeout.tzinfo else timeout
+ if timeout < timezone.utcnow():
+ self.log.error("Trigger cancelled due to timeout")
+ self.log.error("Trigger cancelled; message=%s", err)
+ raise
finally:
# CancelledError will get injected when we're stopped - which is
# fine, the cleanup process will understand that, but we want to
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index 0354465ea4..73b8d878a2 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -22,7 +22,7 @@ import datetime
import importlib
import time
from threading import Thread
-from unittest.mock import patch
+from unittest.mock import MagicMock, patch
import pendulum
import pytest
@@ -259,6 +259,33 @@ def test_trigger_lifecycle(session):
job_runner.trigger_runner.stop = True
+class TestTriggerRunner:
+ @pytest.mark.asyncio
+
@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging")
+ async def test_run_trigger_canceled(self, session) -> None:
+ trigger_runner = TriggerRunner()
+ trigger_runner.triggers = {1: {"task": MagicMock(), "name":
"mock_name", "events": 0}}
+ mock_trigger = MagicMock()
+ mock_trigger.task_instance.trigger_timeout = None
+ mock_trigger.run.side_effect = asyncio.CancelledError()
+
+ with pytest.raises(asyncio.CancelledError):
+ await trigger_runner.run_trigger(1, mock_trigger)
+
+ @pytest.mark.asyncio
+
@patch("airflow.jobs.triggerer_job_runner.TriggerRunner.set_individual_trigger_logging")
+ async def test_run_trigger_timeout(self, session, caplog) -> None:
+ trigger_runner = TriggerRunner()
+ trigger_runner.triggers = {1: {"task": MagicMock(), "name":
"mock_name", "events": 0}}
+ mock_trigger = MagicMock()
+ mock_trigger.task_instance.trigger_timeout = timezone.utcnow() -
datetime.timedelta(hours=1)
+ mock_trigger.run.side_effect = asyncio.CancelledError()
+
+ with pytest.raises(asyncio.CancelledError):
+ await trigger_runner.run_trigger(1, mock_trigger)
+ assert "Trigger cancelled due to timeout" in caplog.text
+
+
def test_trigger_create_race_condition_18392(session, tmp_path):
"""
This verifies the resolution of race condition documented in github issue
#18392.