SKalide commented on issue #41470:
URL: https://github.com/apache/airflow/issues/41470#issuecomment-2302875072

   I've looked into this issue more deeply and I believe I can provide a 
solution. I could use some guidance on whether I should submit a PR for this.
   
   The problem, in my opinion, is that after `_execute_callable` is called, it 
enters the `ExecutorSafeguard` wrapper. There, it checks if it's called inside 
a `TaskInstance` (hence the `_execute_callable`) and then returns the execution 
function itself. Because the `execute` method of BigQueryTableExistenceSensor 
is also decorated, it automatically re-enters the `ExecutorSafeguard` wrapper, 
but this time it appears as if it was called outside a Task Instance, causing 
the check to fail. With `allow_nested_operators`, it only fires a warning 
instead of failing.
   
   My proposed solution is to check if the `ExecutorSafeguard` has already been 
called in the current execution context. If so, we can skip the check for 
nested calls. This can be achieved by setting a flag using thread-local storage 
to ensure thread safety.
   
   ```
   class ExecutorSafeguard:
       """
       The ExecutorSafeguard decorator.
   
       Checks if the execute method of an operator isn't manually called outside
       the TaskInstance as we want to avoid bad mixing between decorated and
       classic operators.
       """
   
       test_mode = conf.getboolean("core", "unit_test_mode")
       _local = local()
   
       @classmethod
       def decorator(cls, func):
           @wraps(func)
           def wrapper(self, *args, **kwargs):
               if (
                   getattr(ExecutorSafeguard._local, "in_executor_safeguard", 
False)
                   and self.allow_nested_operators
               ):
                   # If already in ExecutorSafeguard, call execution function - 
recursive call
                   return func(self, *args, **kwargs)
               ExecutorSafeguard._local.in_executor_safeguard = True
   
               try:
                   from airflow.decorators.base import DecoratedOperator
   
                   sentinel = 
kwargs.pop(f"{self.__class__.__name__}__sentinel", None)
   
                   if (
                       not cls.test_mode
                       and sentinel != _sentinel
                       and not isinstance(self, DecoratedOperator)
                   ):
                       message = (
                           f"{self.__class__.__name__}.{func.__name__} cannot 
be called outside TaskInstance!"
                       )
                       raise AirflowException(message)
                   return func(self, *args, **kwargs)
               finally:
                   ExecutorSafeguard._local.in_executor_safeguard = False
   
           return wrapper
   ```
   
   This solution would suppress the warning for legitimate nested calls while 
still protecting against unintended external calls. The use of `local()` 
ensures that this check is thread-safe.
   
   I'm willing to create a Pull Request with this solution if the maintainers 
think this approach is appropriate. What do you think about this solution? Are 
there any aspects I should consider or modify before proceeding with a PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to