This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1949f5d  Change the storage of frame to use threadLocal rather than 
Dict (#21993)
1949f5d is described below

commit 1949f5d76b5842d56db91c868ae4655bb7a7689f
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Mar 4 18:46:37 2022 +0100

    Change the storage of frame to use threadLocal rather than Dict (#21993)
    
    There is a very probable WeakKeyDict bug in Python standard
    library (to be confirmed and investigated further) that
    manifests itself in a very rare failure of the
    test_stacktrace_on_failure_starts_with_task_execute_method
    
    This turned out to be related to an unexpected behaviour
    (and most likely a bug - to be confirmed) of WeakKeyDict
    when you have potentially two different objects with the
    same `equals` and `hash` values added to the same
    WeakKeyDict as keys.
    
    More info on similar report (but raised for a bit different
    reason) bug in Python can be found here:
    
    https://bugs.python.org/issue44140
    
    We submitted a PR to fix the problem found
    https://github.com/python/cpython/pull/31685
---
 airflow/models/taskinstance.py | 47 +++++++++++++++++++++++++++++++-----------
 1 file changed, 35 insertions(+), 12 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 67ee56a..bc09900 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -23,13 +23,14 @@ import math
 import os
 import pickle
 import signal
+import threading
 import warnings
 from collections import defaultdict
 from datetime import datetime, timedelta
 from functools import partial
 from inspect import currentframe
 from tempfile import NamedTemporaryFile
-from types import FrameType
+from types import TracebackType
 from typing import (
     IO,
     TYPE_CHECKING,
@@ -45,7 +46,6 @@ from typing import (
     Union,
 )
 from urllib.parse import quote
-from weakref import WeakKeyDictionary
 
 import dill
 import jinja2
@@ -129,7 +129,7 @@ if TYPE_CHECKING:
     from airflow.models.dagrun import DagRun
     from airflow.models.operator import Operator
 
-_EXECUTION_FRAME_MAPPING: "WeakKeyDictionary[Operator, FrameType]" = 
WeakKeyDictionary()
+_TASK_EXECUTION_FRAME_LOCAL_STORAGE = threading.local()
 
 
 @contextlib.contextmanager
@@ -1537,7 +1537,7 @@ class TaskInstance(Base, LoggingMixin):
             else:
                 result = execute_callable(context=context)
         except:  # noqa: E722
-            _EXECUTION_FRAME_MAPPING[task_copy] = currentframe()
+            _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = currentframe()
             raise
         # If the task returns a result, push an XCom containing it
         if task_copy.do_xcom_push and result is not None:
@@ -1731,6 +1731,36 @@ class TaskInstance(Base, LoggingMixin):
         session.commit()
         self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')
 
+    def get_truncated_error_traceback(self, error: BaseException) -> 
Optional[TracebackType]:
+        """
+        Returns truncated error traceback.
+
+        This method returns traceback of the error truncated to the
+        frame saved by earlier try/except along the way. If the frame
+        is found, the traceback will be truncated to below the frame.
+
+        :param error: exception to get traceback from
+        :return: traceback to print
+        """
+        tb = error.__traceback__
+        try:
+            execution_frame = _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame
+        except AttributeError:
+            self.log.warning(
+                "We expected to get frame set in local storage but it was not."
+                " Please report this as an issue with full logs"
+                " at https://github.com/apache/airflow/issues/new";,
+                exc_info=True,
+            )
+            return tb
+        _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = None
+        while tb is not None:
+            if tb.tb_frame is execution_frame:
+                tb = tb.tb_next
+                break
+            tb = tb.tb_next
+        return tb or error.__traceback__
+
     @provide_session
     def handle_failure(
         self,
@@ -1746,14 +1776,7 @@ class TaskInstance(Base, LoggingMixin):
 
         if error:
             if isinstance(error, BaseException):
-                execution_frame = _EXECUTION_FRAME_MAPPING.get(self.task)
-                tb = error.__traceback__
-                while tb is not None:
-                    if tb.tb_frame is execution_frame:
-                        tb = tb.tb_next
-                        break
-                    tb = tb.tb_next
-                tb = tb or error.__traceback__
+                tb = self.get_truncated_error_traceback(error)
                 self.log.error("Task failed with exception", 
exc_info=(type(error), error, tb))
             else:
                 self.log.error("%s", error)

Reply via email to