This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1949f5d Change the storage of frame to use threadLocal rather than
Dict (#21993)
1949f5d is described below
commit 1949f5d76b5842d56db91c868ae4655bb7a7689f
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Mar 4 18:46:37 2022 +0100
Change the storage of frame to use threadLocal rather than Dict (#21993)
There is a very probable WeakKeyDict bug in Python standard
library (to be confirmed and investigated further) that
manifests itself in a very rare failure of the
test_stacktrace_on_failure_starts_with_task_execute_method
This turned out to be related to an unexpected behaviour
(and most likely a bug - to be confirmed) of WeakKeyDict
when you have potentially two different objects with the
same `equals` and `hash` values added to the same
WeakKeyDict as keys.
More info on similar report (but raised for a bit different
reason) bug in Python can be found here:
https://bugs.python.org/issue44140
We submitted a PR to fix the problem found
https://github.com/python/cpython/pull/31685
---
airflow/models/taskinstance.py | 47 +++++++++++++++++++++++++++++++-----------
1 file changed, 35 insertions(+), 12 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 67ee56a..bc09900 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -23,13 +23,14 @@ import math
import os
import pickle
import signal
+import threading
import warnings
from collections import defaultdict
from datetime import datetime, timedelta
from functools import partial
from inspect import currentframe
from tempfile import NamedTemporaryFile
-from types import FrameType
+from types import TracebackType
from typing import (
IO,
TYPE_CHECKING,
@@ -45,7 +46,6 @@ from typing import (
Union,
)
from urllib.parse import quote
-from weakref import WeakKeyDictionary
import dill
import jinja2
@@ -129,7 +129,7 @@ if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
from airflow.models.operator import Operator
-_EXECUTION_FRAME_MAPPING: "WeakKeyDictionary[Operator, FrameType]" =
WeakKeyDictionary()
+_TASK_EXECUTION_FRAME_LOCAL_STORAGE = threading.local()
@contextlib.contextmanager
@@ -1537,7 +1537,7 @@ class TaskInstance(Base, LoggingMixin):
else:
result = execute_callable(context=context)
except: # noqa: E722
- _EXECUTION_FRAME_MAPPING[task_copy] = currentframe()
+ _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = currentframe()
raise
# If the task returns a result, push an XCom containing it
if task_copy.do_xcom_push and result is not None:
@@ -1731,6 +1731,36 @@ class TaskInstance(Base, LoggingMixin):
session.commit()
self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')
+ def get_truncated_error_traceback(self, error: BaseException) ->
Optional[TracebackType]:
+ """
+ Returns truncated error traceback.
+
+ This method returns traceback of the error truncated to the
+ frame saved by earlier try/except along the way. If the frame
+ is found, the traceback will be truncated to below the frame.
+
+ :param error: exception to get traceback from
+ :return: traceback to print
+ """
+ tb = error.__traceback__
+ try:
+ execution_frame = _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame
+ except AttributeError:
+ self.log.warning(
+ "We expected to get frame set in local storage but it was not."
+ " Please report this as an issue with full logs"
+ " at https://github.com/apache/airflow/issues/new",
+ exc_info=True,
+ )
+ return tb
+ _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = None
+ while tb is not None:
+ if tb.tb_frame is execution_frame:
+ tb = tb.tb_next
+ break
+ tb = tb.tb_next
+ return tb or error.__traceback__
+
@provide_session
def handle_failure(
self,
@@ -1746,14 +1776,7 @@ class TaskInstance(Base, LoggingMixin):
if error:
if isinstance(error, BaseException):
- execution_frame = _EXECUTION_FRAME_MAPPING.get(self.task)
- tb = error.__traceback__
- while tb is not None:
- if tb.tb_frame is execution_frame:
- tb = tb.tb_next
- break
- tb = tb.tb_next
- tb = tb or error.__traceback__
+ tb = self.get_truncated_error_traceback(error)
self.log.error("Task failed with exception",
exc_info=(type(error), error, tb))
else:
self.log.error("%s", error)