This is an automated email from the ASF dual-hosted git repository.
utkarsharma pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 57adf0b2853 Fixed thread local _sentinel.callers defect and added test
cases (#44646) (#46280)
57adf0b2853 is described below
commit 57adf0b285331cfff587c2be77abadc4aed45b9d
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Thu Jan 30 15:12:20 2025 +0530
Fixed thread local _sentinel.callers defect and added test cases (#44646)
(#46280)
* Update base.py
* Update base.py
* Update base.py
* Update baseoperator.py
* Update base.py
* Update base.py
* Update baseoperator.py
* Update baseoperator.py
* Fixed thread local _sentinel.callers defect and added test cases
* Fixed issue
* Fixed issue
* Fixed issue
* Fixed issue
* Fixed issue
* Fixed issue
* Fixed issue
---------
Co-authored-by: Rahul Goyal <[email protected]>
(cherry picked from commit a77fca25518b07d7d75c3301244d21f6a97c3947)
Co-authored-by: rahulgoyal2987 <[email protected]>
---
airflow/models/baseoperator.py | 2 ++
tests/models/test_baseoperatormeta.py | 18 ++++++++++++++++++
2 files changed, 20 insertions(+)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 65900276271..1b1b22c7be4 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -410,6 +410,8 @@ class ExecutorSafeguard:
sentinel = kwargs.pop(sentinel_key, None)
if sentinel:
+ if not getattr(cls._sentinel, "callers", None):
+ cls._sentinel.callers = {}
cls._sentinel.callers[sentinel_key] = sentinel
else:
sentinel =
cls._sentinel.callers.pop(f"{func.__qualname__.split('.')[0]}__sentinel", None)
diff --git a/tests/models/test_baseoperatormeta.py
b/tests/models/test_baseoperatormeta.py
index 5244e86b2c3..52e45dd1cf3 100644
--- a/tests/models/test_baseoperatormeta.py
+++ b/tests/models/test_baseoperatormeta.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import datetime
+import threading
from typing import TYPE_CHECKING, Any
from unittest.mock import patch
@@ -211,3 +212,20 @@ class TestExecutorSafeguard:
mock_log.warning.assert_called_once_with(
"HelloWorldOperator.execute cannot be called outside TaskInstance!"
)
+
+ def test_thread_local_executor_safeguard(self):
+ class TestExecutorSafeguardThread(threading.Thread):
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self.executor_safeguard = ExecutorSafeguard()
+
+ def run(self):
+ class Wrapper:
+ def wrapper_test_func(self, *args, **kwargs):
+ print("test")
+
+ wrap_func =
self.executor_safeguard.decorator(Wrapper.wrapper_test_func)
+ wrap_func(Wrapper(), Wrapper__sentinel="abc")
+
+ # Test thread local caller value is set properly
+ TestExecutorSafeguardThread().start()