This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ed5a5998c9 Remove timeout from airflow.utils and update core usages 
accordingly (#54212)
9ed5a5998c9 is described below

commit 9ed5a5998c9f2944514bfb82f84dec79584dbf8d
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Aug 13 18:01:34 2025 +0530

    Remove timeout from airflow.utils and update core usages accordingly 
(#54212)
---
 airflow-core/src/airflow/models/dagbag.py     | 26 +++++++-
 airflow-core/src/airflow/utils/__init__.py    |  3 +
 airflow-core/src/airflow/utils/timeout.py     | 88 ---------------------------
 airflow-core/tests/unit/models/test_dagbag.py | 11 ++++
 4 files changed, 39 insertions(+), 89 deletions(-)

diff --git a/airflow-core/src/airflow/models/dagbag.py 
b/airflow-core/src/airflow/models/dagbag.py
index 36001ef8be9..c84c1ae31ff 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -51,6 +51,7 @@ from airflow.exceptions import (
     AirflowClusterPolicyViolation,
     AirflowDagDuplicatedIdException,
     AirflowException,
+    AirflowTaskTimeout,
 )
 from airflow.listeners.listener import get_listener_manager
 from airflow.models.base import Base, StringID
@@ -65,7 +66,6 @@ from airflow.utils.file import (
 )
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.timeout import timeout
 from airflow.utils.types import NOTSET
 
 try:
@@ -123,6 +123,30 @@ class FileLoadStat(NamedTuple):
     warning_num: int
 
 
[email protected]
+def timeout(seconds=1, error_message="Timeout"):
+    import logging
+
+    log = logging.getLogger(__name__)
+    error_message = error_message + ", PID: " + str(os.getpid())
+
+    def handle_timeout(signum, frame):
+        """Log information and raises AirflowTaskTimeout."""
+        log.error("Process timed out, PID: %s", str(os.getpid()))
+        raise AirflowTaskTimeout(error_message)
+
+    try:
+        try:
+            signal.signal(signal.SIGALRM, handle_timeout)
+            signal.setitimer(signal.ITIMER_REAL, seconds)
+        except ValueError:
+            log.warning("timeout can't be used in the current context", 
exc_info=True)
+        yield
+    finally:
+        with contextlib.suppress(ValueError):
+            signal.setitimer(signal.ITIMER_REAL, 0)
+
+
 class DagBag(LoggingMixin):
     """
     A dagbag is a collection of dags, parsed out of a folder tree and has high 
level configuration settings.
diff --git a/airflow-core/src/airflow/utils/__init__.py 
b/airflow-core/src/airflow/utils/__init__.py
index 3a72f96fff6..9ed8b48407c 100644
--- a/airflow-core/src/airflow/utils/__init__.py
+++ b/airflow-core/src/airflow/utils/__init__.py
@@ -42,6 +42,9 @@ __deprecated_classes = {
         "remove_task_decorator": 
"airflow.sdk.definitions._internal.decorators.remove_task_decorator",
         "fixup_decorator_warning_stack": 
"airflow.sdk.definitions._internal.decorators.fixup_decorator_warning_stack",
     },
+    "timeout": {
+        "timeout": "airflow.sdk.execution_time.timeout.timeout",
+    },
 }
 
 add_deprecated_classes(__deprecated_classes, __name__)
diff --git a/airflow-core/src/airflow/utils/timeout.py 
b/airflow-core/src/airflow/utils/timeout.py
deleted file mode 100644
index 11a5e1bfa1e..00000000000
--- a/airflow-core/src/airflow/utils/timeout.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import os
-import signal
-from contextlib import AbstractContextManager
-from threading import Timer
-
-from airflow.exceptions import AirflowTaskTimeout
-from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.platform import IS_WINDOWS
-
-_timeout = AbstractContextManager[None]
-
-
-class TimeoutWindows(_timeout, LoggingMixin):
-    """Windows timeout version: To be used in a ``with`` block and timeout its 
content."""
-
-    def __init__(self, seconds=1, error_message="Timeout"):
-        super().__init__()
-        self._timer: Timer | None = None
-        self.seconds = seconds
-        self.error_message = error_message + ", PID: " + str(os.getpid())
-
-    def handle_timeout(self, *args):
-        """Log information and raises AirflowTaskTimeout."""
-        self.log.error("Process timed out, PID: %s", str(os.getpid()))
-        raise AirflowTaskTimeout(self.error_message)
-
-    def __enter__(self):
-        if self._timer:
-            self._timer.cancel()
-        self._timer = Timer(self.seconds, self.handle_timeout)
-        self._timer.start()
-
-    def __exit__(self, type_, value, traceback):
-        if self._timer:
-            self._timer.cancel()
-            self._timer = None
-
-
-class TimeoutPosix(_timeout, LoggingMixin):
-    """POSIX Timeout version: To be used in a ``with`` block and timeout its 
content."""
-
-    def __init__(self, seconds=1, error_message="Timeout"):
-        super().__init__()
-        self.seconds = seconds
-        self.error_message = error_message + ", PID: " + str(os.getpid())
-
-    def handle_timeout(self, signum, frame):
-        """Log information and raises AirflowTaskTimeout."""
-        self.log.error("Process timed out, PID: %s", str(os.getpid()))
-        raise AirflowTaskTimeout(self.error_message)
-
-    def __enter__(self):
-        try:
-            signal.signal(signal.SIGALRM, self.handle_timeout)
-            signal.setitimer(signal.ITIMER_REAL, self.seconds)
-        except ValueError:
-            self.log.warning("timeout can't be used in the current context", 
exc_info=True)
-
-    def __exit__(self, type_, value, traceback):
-        try:
-            signal.setitimer(signal.ITIMER_REAL, 0)
-        except ValueError:
-            self.log.warning("timeout can't be used in the current context", 
exc_info=True)
-
-
-if IS_WINDOWS:
-    timeout: type[TimeoutWindows | TimeoutPosix] = TimeoutWindows
-else:
-    timeout = TimeoutPosix
diff --git a/airflow-core/tests/unit/models/test_dagbag.py 
b/airflow-core/tests/unit/models/test_dagbag.py
index 230ddca9f89..e0de8c76751 100644
--- a/airflow-core/tests/unit/models/test_dagbag.py
+++ b/airflow-core/tests/unit/models/test_dagbag.py
@@ -77,6 +77,17 @@ class TestDagBag:
     def teardown_class(self):
         db_clean_up()
 
+    def test_timeout_context_manager_raises_exception(self):
+        """Test that the timeout context manager raises AirflowTaskTimeout 
when time limit is exceeded."""
+        import time
+
+        from airflow.exceptions import AirflowTaskTimeout
+        from airflow.models.dagbag import timeout
+
+        with pytest.raises(AirflowTaskTimeout):
+            with timeout(1, "Test timeout"):
+                time.sleep(2)
+
     def test_get_existing_dag(self, tmp_path):
         """
         Test that we're able to parse some example DAGs and retrieve them

Reply via email to