SKalide commented on issue #41470:
URL: https://github.com/apache/airflow/issues/41470#issuecomment-2302875072
I've looked into this issue more deeply and I believe I can provide a
solution. I could use some guidance on whether I should submit a PR for this.
The problem, in my opinion, is that after `_execute_callable` is called, it
enters the `ExecutorSafeguard` wrapper. There, it checks if it's called inside
a `TaskInstance` (hence the `_execute_callable`) and then returns the execution
function itself. Because the `execute` method of BigQueryTableExistenceSensor
is also decorated, it automatically re-enters the `ExecutorSafeguard` wrapper,
but this time it appears as if it was called outside a Task Instance, causing
the check to fail. With `allow_nested_operators`, it only fires a warning
instead of failing.
My proposed solution is to check if the `ExecutorSafeguard` has already been
called in the current execution context. If so, we can skip the check for
nested calls. This can be achieved by setting a flag using thread-local storage
to ensure thread safety.
```
class ExecutorSafeguard:
"""
The ExecutorSafeguard decorator.
Checks if the execute method of an operator isn't manually called outside
the TaskInstance as we want to avoid bad mixing between decorated and
classic operators.
"""
test_mode = conf.getboolean("core", "unit_test_mode")
_local = local()
@classmethod
def decorator(cls, func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if (
getattr(ExecutorSafeguard._local, "in_executor_safeguard",
False)
and self.allow_nested_operators
):
# If already in ExecutorSafeguard, call execution function -
recursive call
return func(self, *args, **kwargs)
ExecutorSafeguard._local.in_executor_safeguard = True
try:
from airflow.decorators.base import DecoratedOperator
sentinel =
kwargs.pop(f"{self.__class__.__name__}__sentinel", None)
if (
not cls.test_mode
and sentinel != _sentinel
and not isinstance(self, DecoratedOperator)
):
message = (
f"{self.__class__.__name__}.{func.__name__} cannot
be called outside TaskInstance!"
)
raise AirflowException(message)
return func(self, *args, **kwargs)
finally:
ExecutorSafeguard._local.in_executor_safeguard = False
return wrapper
```
This solution would suppress the warning for legitimate nested calls while
still protecting against unintended external calls. The use of `local()`
ensures that this check is thread-safe.
I'm willing to create a Pull Request with this solution if the maintainers
think this approach is appropriate. What do you think about this solution? Are
there any aspects I should consider or modify before proceeding with a PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]