This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 10e4e436dcf Move timeout test to the SDK (#56616)
10e4e436dcf is described below
commit 10e4e436dcfef418887374093fde578c9c526bfe
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Oct 14 15:46:01 2025 +0100
Move timeout test to the SDK (#56616)
* Move timeout test to the SDK
The timeout test doesn't belong to the core, it belongs to the SDK.
This PR moves the test to the appropriate section in SDK.
* fixup! Move timeout test to the SDK
---
airflow-core/tests/unit/core/test_core.py | 19 -------------------
.../task_sdk/execution_time/test_task_runner.py | 21 +++++++++++++++++++++
2 files changed, 21 insertions(+), 19 deletions(-)
diff --git a/airflow-core/tests/unit/core/test_core.py
b/airflow-core/tests/unit/core/test_core.py
index f78346616d1..b6058b619bc 100644
--- a/airflow-core/tests/unit/core/test_core.py
+++ b/airflow-core/tests/unit/core/test_core.py
@@ -17,17 +17,13 @@
# under the License.
from __future__ import annotations
-import contextlib
from datetime import timedelta
-from time import sleep
import pytest
from airflow._shared.timezones.timezone import datetime
-from airflow.exceptions import AirflowTaskTimeout
from airflow.models.baseoperator import BaseOperator
from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.providers.standard.operators.python import PythonOperator
from airflow.utils.types import DagRunType
from tests_common.test_utils.db import clear_db_dags, clear_db_runs
@@ -74,21 +70,6 @@ class TestCore:
with pytest.raises(AttributeError, match=error_message):
op.dry_run()
- def test_timeout(self, dag_maker):
- def sleep_and_catch_other_exceptions():
- with contextlib.suppress(Exception):
- # Catching Exception should NOT catch AirflowTaskTimeout
- sleep(5)
-
- with dag_maker(serialized=True):
- op = PythonOperator(
- task_id="test_timeout",
- execution_timeout=timedelta(seconds=1),
- python_callable=sleep_and_catch_other_exceptions,
- )
- with pytest.raises(AirflowTaskTimeout):
- dag_maker.run_ti(op.task_id)
-
def test_dag_params_and_task_params(self, dag_maker):
# This test case guards how params of DAG and Operator work together.
# - If any key exists in either DAG's or Operator's params,
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index fad6e4d66e8..55affcead92 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -22,6 +22,7 @@ import functools
import json
import os
import textwrap
+import time
from collections.abc import Iterable
from datetime import datetime, timedelta
from pathlib import Path
@@ -40,6 +41,7 @@ from airflow.exceptions import (
AirflowSensorTimeout,
AirflowSkipException,
AirflowTaskTerminated,
+ AirflowTaskTimeout,
DownstreamTasksSkipped,
)
from airflow.listeners import hookimpl
@@ -114,6 +116,7 @@ from airflow.sdk.execution_time.context import (
from airflow.sdk.execution_time.task_runner import (
RuntimeTaskInstance,
TaskRunnerMarker,
+ _execute_task,
_push_xcom_if_needed,
_xcom_push,
finalize,
@@ -499,6 +502,24 @@ def test_run_task_timeout(time_machine, create_runtime_ti,
mock_supervisor_comms
mock_supervisor_comms.send.assert_called_with(TaskState(state=TaskInstanceState.FAILED,
end_date=instant))
+def test_execution_timeout(create_runtime_ti):
+ def sleep_and_catch_other_exceptions():
+ with contextlib.suppress(Exception):
+ # Catching Exception should NOT catch AirflowTaskTimeout
+ time.sleep(5)
+
+ op = PythonOperator(
+ task_id="test_timeout",
+ execution_timeout=timedelta(seconds=1),
+ python_callable=sleep_and_catch_other_exceptions,
+ )
+
+ ti = create_runtime_ti(task=op, dag_id="dag_execution_timeout")
+
+ with pytest.raises(AirflowTaskTimeout):
+ _execute_task(context=ti.get_template_context(), ti=ti,
log=mock.MagicMock())
+
+
def test_basic_templated_dag(mocked_parse, make_ti_context,
mock_supervisor_comms, spy_agency):
"""Test running a Dag with templated task."""
from airflow.providers.standard.operators.bash import BashOperator