ashb commented on code in PR #37937:
URL: https://github.com/apache/airflow/pull/37937#discussion_r1531908083
##########
airflow/models/baseoperator.py:
##########
@@ -365,6 +373,62 @@ def partial(
)
+class ExecutorSafeguard:
+ """
+ The ExecutorSafeguard decorator.
+
+ Checks if the execute method of an operator isn't manually called outside
+ the TaskInstance as we want to avoid bad mixing between decorated and
+ classic operators.
+ """
+
+ test_mode = conf.getboolean("core", "unit_test_mode")
+
+ @classmethod
+ def decorator(cls, func):
+ def find_task_instance(frame: FrameSummary):
+ return (
+ frame.name == _execute_task.__name__
+ and _execute_task.__module__.replace(".", os.path.sep) in
frame.filename
+ )
+
+ def find_operator(frame: FrameSummary):
+ from airflow.decorators.base import DecoratedOperator
+ from airflow.operators.python import PythonOperator
+
+ return (
+ frame.name == PythonOperator.execute.__name__
+ and PythonOperator.__module__.replace(".", os.path.sep) in
frame.filename
+ ) or (
+ frame.name == DecoratedOperator.execute.__name__
+ and DecoratedOperator.__module__.replace(".", os.path.sep) in
frame.filename
+ )
+
+ def raise_or_warn(operator: BaseOperator, message: str):
+ if not operator.allow_mixin:
+ raise AirflowException(message)
+ operator.log.warning(message)
+
+ @wraps(func)
+ def wrapper(self, *args, **kwargs):
+ if not cls.test_mode:
Review Comment:
We should probably look at `self.test_mode` too
https://github.com/apache/airflow/blob/83f8a2ec6fc0643bf31d165c013ddf72a120220d/airflow/models/taskinstance.py#L2411
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]