This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 10e4e436dcf Move timeout test to the SDK (#56616)
10e4e436dcf is described below

commit 10e4e436dcfef418887374093fde578c9c526bfe
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Oct 14 15:46:01 2025 +0100

    Move timeout test to the SDK (#56616)
    
    * Move timeout test to the SDK
    
    The timeout test doesn't belong to the core, it belongs to the SDK.
    This PR moves the test to the appropriate section in SDK.
    
    * fixup! Move timeout test to the SDK
---
 airflow-core/tests/unit/core/test_core.py           | 19 -------------------
 .../task_sdk/execution_time/test_task_runner.py     | 21 +++++++++++++++++++++
 2 files changed, 21 insertions(+), 19 deletions(-)

diff --git a/airflow-core/tests/unit/core/test_core.py 
b/airflow-core/tests/unit/core/test_core.py
index f78346616d1..b6058b619bc 100644
--- a/airflow-core/tests/unit/core/test_core.py
+++ b/airflow-core/tests/unit/core/test_core.py
@@ -17,17 +17,13 @@
 # under the License.
 from __future__ import annotations
 
-import contextlib
 from datetime import timedelta
-from time import sleep
 
 import pytest
 
 from airflow._shared.timezones.timezone import datetime
-from airflow.exceptions import AirflowTaskTimeout
 from airflow.models.baseoperator import BaseOperator
 from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.providers.standard.operators.python import PythonOperator
 from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.db import clear_db_dags, clear_db_runs
@@ -74,21 +70,6 @@ class TestCore:
         with pytest.raises(AttributeError, match=error_message):
             op.dry_run()
 
-    def test_timeout(self, dag_maker):
-        def sleep_and_catch_other_exceptions():
-            with contextlib.suppress(Exception):
-                # Catching Exception should NOT catch AirflowTaskTimeout
-                sleep(5)
-
-        with dag_maker(serialized=True):
-            op = PythonOperator(
-                task_id="test_timeout",
-                execution_timeout=timedelta(seconds=1),
-                python_callable=sleep_and_catch_other_exceptions,
-            )
-        with pytest.raises(AirflowTaskTimeout):
-            dag_maker.run_ti(op.task_id)
-
     def test_dag_params_and_task_params(self, dag_maker):
         # This test case guards how params of DAG and Operator work together.
         # - If any key exists in either DAG's or Operator's params,
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index fad6e4d66e8..55affcead92 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -22,6 +22,7 @@ import functools
 import json
 import os
 import textwrap
+import time
 from collections.abc import Iterable
 from datetime import datetime, timedelta
 from pathlib import Path
@@ -40,6 +41,7 @@ from airflow.exceptions import (
     AirflowSensorTimeout,
     AirflowSkipException,
     AirflowTaskTerminated,
+    AirflowTaskTimeout,
     DownstreamTasksSkipped,
 )
 from airflow.listeners import hookimpl
@@ -114,6 +116,7 @@ from airflow.sdk.execution_time.context import (
 from airflow.sdk.execution_time.task_runner import (
     RuntimeTaskInstance,
     TaskRunnerMarker,
+    _execute_task,
     _push_xcom_if_needed,
     _xcom_push,
     finalize,
@@ -499,6 +502,24 @@ def test_run_task_timeout(time_machine, create_runtime_ti, 
mock_supervisor_comms
     
mock_supervisor_comms.send.assert_called_with(TaskState(state=TaskInstanceState.FAILED,
 end_date=instant))
 
 
+def test_execution_timeout(create_runtime_ti):
+    def sleep_and_catch_other_exceptions():
+        with contextlib.suppress(Exception):
+            # Catching Exception should NOT catch AirflowTaskTimeout
+            time.sleep(5)
+
+    op = PythonOperator(
+        task_id="test_timeout",
+        execution_timeout=timedelta(seconds=1),
+        python_callable=sleep_and_catch_other_exceptions,
+    )
+
+    ti = create_runtime_ti(task=op, dag_id="dag_execution_timeout")
+
+    with pytest.raises(AirflowTaskTimeout):
+        _execute_task(context=ti.get_template_context(), ti=ti, 
log=mock.MagicMock())
+
+
 def test_basic_templated_dag(mocked_parse, make_ti_context, 
mock_supervisor_comms, spy_agency):
     """Test running a Dag with templated task."""
     from airflow.providers.standard.operators.bash import BashOperator

Reply via email to