cansjt opened a new issue, #25431: URL: https://github.com/apache/airflow/issues/25431
### Apache Airflow version 2.3.3 (latest released) ### What happened When implementing a custom operator I stumbled on the following issue: - There is a way to indicate that an attibute of operator should be shallow-copied, instead of deepcopied (by adding its name to the `<OperatorClass>.shallow_copy_attrs` class attribute); - Most operator attributes see their value set in the operator's `__init__()` method; - The arguments given to an operator's `__init__()` method are captured in the instances [`_BaseOperator__init_kwargs`](https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py#L680) attribute; - When an operator instance is deepcopied, the `_BaseOperator__init_kwargs` is also copied; - If the value of an attribute was given to the `__init__()` then it is deepcopied anyway, when copying the content of the `_BaseOperator__init_kwargs` dictionary, despite being specified as that it should not be deepcopied; Note that there is an additional difficulty to this problem: the names of the `kwargs` do not necessarily match the name of the instance attribute. It can be easily worked around, by adding both name to the class' `shallow_copy_attrs` list. That things a bit redundant, though. Not sure what the `_BaseOperator__init_kwargs` is used for, but if one can deepcopy an operator, I cannot help but wonder why it is needed? ### What you think should happen instead _No response_ ### How to reproduce I discovered the issue because, in a custom operator I was passing a object (from a third party package) that has a mis-implemented `__getattr__()` and it was getting copied anyways, having my DAG fall in an infinite recursion when attempting to copy it. Note that for brevity I also took a little shortcut: in the real case, the faulty instance is not directly attached to the operator instance but an attribute on another object that is itself attached to the operator. Which could make the use of the `shallow_copy_attrs` effective where it is here ineffective in the first example. One can consider that the adapter class, in the examples below, takes the role of the intermediate object. Still the intermediate being deepcopied, when it shouldn't, make set `shallow_copy_attrs` ineffective. This first example, shows the initial situation: ```python import copy from airflow.operators.python import PythonOperator class ThirdPartyClassWithMisbehavedGetAttr: def __getattr__(self, item): if attr := getattr(self, f'_{item}'): return attr raise AttributeError class CustomOperator(PythonOperator): shallow_copy_attrs = ('_misbehavedparam', 'misbehavedparam') def __init__(self, *, misbehavedparam: None | ThirdPartyClassWithMisbehavedGetAttr = None, **kwargs): super().__init__(**kwargs) self._misbehavedparam = misbehavedparam or ThirdPartyClassWithMisbehavedGetAttr() def __deepcopy__(self, *args): # Only here to intercept the call and allow debuging breakpoint() super().__deepcopy__(*args) def f(**kwargs): print('here') operator1 = CustomOperator(python_callable=f, task_id='doit') print(operator1._BaseOperator__init_kwargs) result = copy.deepcopy(operator1) # Infinite recursion loop. ``` Running the code above fails with the exception: ``` Traceback (most recent call last): File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in __getattr__ if attr := getattr(self, f'_{item}'): File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in __getattr__ if attr := getattr(self, f'_{item}'): File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in __getattr__ if attr := getattr(self, f'_{item}'): [Previous line repeated 997 more times] RecursionError: maximum recursion depth exceeded ``` Here is a way to work around the problem. Assuming the adapter class below can somehow reconstruct the faulty instance. In my case it is possible (not showed here for brevity): ```python class CustomOperatorWithWorkAround(PythonOperator): def __init__(self, *, misbehavedparam: None | ThirdPartyClassWithMisbehavedGetAttr = None, **kwargs): super().__init__(**kwargs) self._misbehavedparam = MisbehavedAdapter(misbehavedparam or ThirdPartyClassWithMisbehavedGetAttr()) def __deepcopy__(self, *args): breakpoint() super().__deepcopy__(*args) class MisbehavedAdapter: def __init__(self, adaptee): self._adaptee = adaptee @classmethod def copy(cls) -> 'MisbehavedAdapter': # Reconstruct the fault instance somehow return MisbehavedAdapter(ThirdPartyClassWithMisbehavedGetAttr()) def __getattr__(self, item): return getattr(self._adaptee, item) def __reduce_ex__(self, version): return (MisbehavedAdapter.copy, tuple()) operator2 = CustomOperatorWithWorkAround( # misbehavedparam=ThirdPartyClassWithMisbehavedGetAttr(), python_callable=f, task_id='doit', ) print(operator2._BaseOperator__init_kwargs) result = copy.deepcopy(operator2) ``` It's a bit of work but until the third party library is fixed, that works for me. Now if I uncomment the `fdupparam` kwarg in the above example: ```python operator3 = CustomOperatorWithWorkAround(misbehavedparam=ThirdPartyClassWithMisbehavedGetAttr(), python_callable=f, task_id='doit', ) print(operator3._BaseOperator__init_kwargs) result = copy.deepcopy(operator3) ``` The infinite recursion is back again, for the reasons exposed above (copy of the `_BaseOperator__init_kwargs` attribute, which has captured a reference to the faulty instance) So for now, to work around the problem, I have to set the instance attribute outside of the constructor, I added a setter (:cry:) to wrap it with the adapter: ```python class CustomOperatorWithWorkAround(PythonOperator): @property def misbehavedparam(self): return self._fdupparam @misbehavedparam.setter def misbehaved(self, value): self._fdupparam = MisbehavedAdapter(value) def __deepcopy__(self, *args): # breakpoint() return super().__deepcopy__(*args) operator4 = CustomOperatorWithWorkAround(python_callable=f, task_id='doit', ) operator4.fdupparam = ThirdPartyClassWithFdUpGetAttr() print(operator4._BaseOperator__init_kwargs) result = copy.deepcopy(operator4) print(result) operator2 = CustomOperatorWithWorkAround(fdupparam=ThirdPartyClassWithFdUpGetAttr(), python_callable=f, task_id='doit', ) ``` ### Operating System Debian ### Versions of Apache Airflow Providers not relevant. ### Deployment Other ### Deployment details not relevant ### Anything else We could make a special case of the copy of that attribute. There is already one for the copy of the [`_BaseOperator__instantiated`](https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py#L1168) one. Remains the question of how do we want to handle that special case? - Have users list kwargs as part of the `shallow_copy_attrs` list? I find it is exposing and burdening users with implementation details they should not care about. - Do something a bit more complex, but also probably slower? e.g. I was thinking of doing something like: ```diff diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 2795a0f53..c312beb1d 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1164,14 +1164,26 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta): shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs + shallow_map = {} for k, v in self.__dict__.items(): - if k == "_BaseOperator__instantiated": + if k in ("_BaseOperator__instantiated", "_BaseOperator__init_kwargs"): # Don't set this until the _end_, as it changes behaviour of __setattr__ continue if k not in shallow_copy: setattr(result, k, copy.deepcopy(v, memo)) else: setattr(result, k, copy.copy(v)) + shallow_map = {id(v): getattr(result, k)} + result.__init_kwargs = init_kwargs = {} + for k, v in self.__dict__["_BaseOperator__init_kwargs"]: + id_ = id(v) + if id_ in shallow_map: + init_kwargs[k] = shallow_map[id_] + elif id_ in memo: + init_kwargs[k] = memo[id_] + else: + init_kwargs[k] = copy.deepcopy(v, memo) + result.__instantiated = self.__instantiated return result ``` But then, not copying the `_BaseOperator__init_kwargs` first, breaks `__setattr__()`. I find that weird because that is kind of assuming that the name of the formal parameter is also the name of an attribute, and if so that there necessarily are related (which is likely, but nothing actually make sure it is, so to me it seems more like a broken assumption than anything). We can get a working version by moving up the initialization of the `__init_kwargs` attribute: ```diff diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 2795a0f53..6507dd6c1 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1164,14 +1164,26 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta): shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs + shallow_map = {} + result.__init_kwargs = init_kwargs = {} for k, v in self.__dict__.items(): - if k == "_BaseOperator__instantiated": + if k in ("_BaseOperator__instantiated", "_BaseOperator__init_kwargs"): # Don't set this until the _end_, as it changes behaviour of __setattr__ continue if k not in shallow_copy: setattr(result, k, copy.deepcopy(v, memo)) else: setattr(result, k, copy.copy(v)) + shallow_map = {id(v): getattr(result, k)} + for k, v in self.__dict__["_BaseOperator__init_kwargs"]: + id_ = id(v) + if id_ in shallow_map: + init_kwargs[k] = shallow_map[id_] + elif id_ in memo: + init_kwargs[k] = memo[id_] + else: + init_kwargs[k] = copy.deepcopy(v, memo) + result.__instantiated = self.__instantiated return result ``` Shall we use the `memo` dict instead of the additional `shallow_map` dict? Could prevent the same issue to happen further down the line, if the same value is somehow referenced elsewhere. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
