cansjt opened a new issue, #25431:
URL: https://github.com/apache/airflow/issues/25431

   ### Apache Airflow version
   
   2.3.3 (latest released)
   
   ### What happened
   
   When implementing a custom operator I stumbled on the following issue:
   - There is a way to indicate that an attibute of operator should be 
shallow-copied, instead of deepcopied (by adding its name to the 
`<OperatorClass>.shallow_copy_attrs` class attribute);
   - Most operator attributes see their value set in the operator's 
`__init__()` method;
   - The arguments given to an operator's `__init__()` method are captured in 
the instances 
[`_BaseOperator__init_kwargs`](https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py#L680)
 attribute;
   - When an operator instance is deepcopied, the `_BaseOperator__init_kwargs` 
is also copied;
   - If the value of an attribute was given to the `__init__()` then it is 
deepcopied anyway, when copying the content of the `_BaseOperator__init_kwargs` 
dictionary, despite being specified as that it should not be deepcopied;
   
   Note that there is an additional difficulty to this problem: the names of 
the `kwargs` do not necessarily match the name of the instance attribute. It 
can be easily worked around, by adding both name to the class' 
`shallow_copy_attrs` list. That things a bit redundant, though.
   
   Not sure what the `_BaseOperator__init_kwargs` is used for, but if one can 
deepcopy an operator, I cannot help but wonder why it is needed?
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   I discovered the issue because, in a custom operator I was passing a object 
(from a third party package) that has a mis-implemented `__getattr__()` and it 
was getting copied anyways, having my DAG fall in an infinite recursion when 
attempting to copy it.
   
   Note that for brevity I also took a little shortcut: in the real case, the 
faulty instance is not directly attached to the operator instance but an 
attribute on another object that is itself attached to the operator. Which 
could make the use of the `shallow_copy_attrs` effective where it is here 
ineffective in the first example. One can consider that the adapter class, in 
the examples below, takes the role of the intermediate object. Still the 
intermediate being deepcopied, when it shouldn't, make set `shallow_copy_attrs` 
ineffective.
   
   This first example, shows the initial situation:
   ```python
   import copy
   
   from airflow.operators.python import PythonOperator
   
   
   class ThirdPartyClassWithMisbehavedGetAttr:
   
       def __getattr__(self, item):
           if attr := getattr(self, f'_{item}'):
               return attr
           raise AttributeError
   
   
   class CustomOperator(PythonOperator):
   
       shallow_copy_attrs = ('_misbehavedparam', 'misbehavedparam')
   
       def __init__(self,
                    *,
                    misbehavedparam: None | 
ThirdPartyClassWithMisbehavedGetAttr = None,
                    **kwargs):
           super().__init__(**kwargs)
           self._misbehavedparam = misbehavedparam or 
ThirdPartyClassWithMisbehavedGetAttr()
   
       def __deepcopy__(self, *args):  # Only here to intercept the call and 
allow debuging
           breakpoint()
           super().__deepcopy__(*args)
   
   
   def f(**kwargs):
       print('here')
   
   
   operator1 = CustomOperator(python_callable=f, task_id='doit')
   print(operator1._BaseOperator__init_kwargs)
   result = copy.deepcopy(operator1)  # Infinite recursion loop.
   ```
   Running the code above fails with the exception:
   ```
   Traceback (most recent call last):
     File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in 
__getattr__
       if attr := getattr(self, f'_{item}'):
     File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in 
__getattr__
       if attr := getattr(self, f'_{item}'):
     File "/home/nicolas/src/gitco/jobteaser/aggregator/mre.py", line 11, in 
__getattr__
       if attr := getattr(self, f'_{item}'):
     [Previous line repeated 997 more times]
   RecursionError: maximum recursion depth exceeded
   ``` 
   
   Here is a way to work around the problem. Assuming the adapter class below 
can somehow reconstruct the faulty instance. In my case it is possible (not 
showed here for brevity):
   ```python
   class CustomOperatorWithWorkAround(PythonOperator):
   
       def __init__(self,
                    *,
                    misbehavedparam: None | 
ThirdPartyClassWithMisbehavedGetAttr = None,
                    **kwargs):
           super().__init__(**kwargs)
           self._misbehavedparam = MisbehavedAdapter(misbehavedparam
                                                     or 
ThirdPartyClassWithMisbehavedGetAttr())
   
       def __deepcopy__(self, *args):
           breakpoint()
           super().__deepcopy__(*args)
   
   
   class MisbehavedAdapter:
   
       def __init__(self, adaptee):
           self._adaptee = adaptee
   
       @classmethod
       def copy(cls) -> 'MisbehavedAdapter':
           # Reconstruct the fault instance somehow
           return MisbehavedAdapter(ThirdPartyClassWithMisbehavedGetAttr())
   
       def __getattr__(self, item):
           return getattr(self._adaptee, item)
   
       def __reduce_ex__(self, version):
           return (MisbehavedAdapter.copy, tuple())
   
   
   operator2 = CustomOperatorWithWorkAround(  # 
misbehavedparam=ThirdPartyClassWithMisbehavedGetAttr(),
                                            python_callable=f,
                                            task_id='doit',
                                            )
   print(operator2._BaseOperator__init_kwargs)
   result = copy.deepcopy(operator2)
   ```
   It's a bit of work but until the third party library is fixed, that works 
for me.
   
   Now if I uncomment the `fdupparam` kwarg in the above example:
   ```python
   operator3 = 
CustomOperatorWithWorkAround(misbehavedparam=ThirdPartyClassWithMisbehavedGetAttr(),
                                            python_callable=f,
                                            task_id='doit',
                                            )
   print(operator3._BaseOperator__init_kwargs)
   result = copy.deepcopy(operator3)
   ```
   The infinite recursion is back again, for the reasons exposed above (copy of 
the `_BaseOperator__init_kwargs` attribute, which has captured a reference to 
the faulty instance)
   
   So for now, to work around the problem, I have to set the instance attribute 
outside of the constructor, I added a setter (:cry:) to wrap it with the 
adapter:
   ```python
   class CustomOperatorWithWorkAround(PythonOperator):
   
       @property
       def misbehavedparam(self):
           return self._fdupparam
   
       @misbehavedparam.setter
       def misbehaved(self, value):
           self._fdupparam = MisbehavedAdapter(value)
   
       def __deepcopy__(self, *args):
           # breakpoint()
           return super().__deepcopy__(*args)
   
   
   operator4 = CustomOperatorWithWorkAround(python_callable=f,
                                            task_id='doit',
                                            )
   operator4.fdupparam = ThirdPartyClassWithFdUpGetAttr()
   print(operator4._BaseOperator__init_kwargs)
   result = copy.deepcopy(operator4)
   print(result)
   operator2 = 
CustomOperatorWithWorkAround(fdupparam=ThirdPartyClassWithFdUpGetAttr(),
                                            python_callable=f,
                                            task_id='doit',
                                            )
   ```
   
   
   ### Operating System
   
   Debian
   
   ### Versions of Apache Airflow Providers
   
   not relevant.
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   not relevant
   
   ### Anything else
   
   We could make a special case of the copy of that attribute. There is already 
one for the copy of the 
[`_BaseOperator__instantiated`](https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py#L1168)
 one. Remains the question of how do we want to handle that special case?
   - Have users list kwargs as part of the `shallow_copy_attrs` list? I find it 
is exposing and burdening users with implementation details they should not 
care about.
   - Do something a bit more complex, but also probably slower? e.g. I was 
thinking of doing something like:
   ```diff
   diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
   index 2795a0f53..c312beb1d 100644
   --- a/airflow/models/baseoperator.py
   +++ b/airflow/models/baseoperator.py
   @@ -1164,14 +1164,26 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
    
            shallow_copy = cls.shallow_copy_attrs + 
cls._base_operator_shallow_copy_attrs
    
   +        shallow_map = {}
            for k, v in self.__dict__.items():
   -            if k == "_BaseOperator__instantiated":
   +            if k in ("_BaseOperator__instantiated", 
"_BaseOperator__init_kwargs"):
                    # Don't set this until the _end_, as it changes behaviour 
of __setattr__
                    continue
                if k not in shallow_copy:
                    setattr(result, k, copy.deepcopy(v, memo))
                else:
                    setattr(result, k, copy.copy(v))
   +                shallow_map = {id(v): getattr(result, k)}
   +        result.__init_kwargs = init_kwargs = {}
   +        for k, v in self.__dict__["_BaseOperator__init_kwargs"]:
   +            id_ = id(v)
   +            if id_ in shallow_map:
   +                init_kwargs[k] = shallow_map[id_]
   +            elif id_ in memo:
   +                init_kwargs[k] = memo[id_]
   +            else:
   +                init_kwargs[k] = copy.deepcopy(v, memo)
   +
            result.__instantiated = self.__instantiated
            return result
    
   ```
   But then, not copying the `_BaseOperator__init_kwargs` first, breaks 
`__setattr__()`. I find that weird because that is kind of assuming that the 
name of the formal parameter is also the name of an attribute, and if so that 
there necessarily are related (which is likely, but nothing actually make sure 
it is, so to me it seems more like a broken assumption than anything).
   
   We can get a working version by moving up the initialization of the 
`__init_kwargs` attribute:
   ```diff
   diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
   index 2795a0f53..6507dd6c1 100644
   --- a/airflow/models/baseoperator.py
   +++ b/airflow/models/baseoperator.py
   @@ -1164,14 +1164,26 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
    
            shallow_copy = cls.shallow_copy_attrs + 
cls._base_operator_shallow_copy_attrs
    
   +        shallow_map = {}
   +        result.__init_kwargs = init_kwargs = {}
            for k, v in self.__dict__.items():
   -            if k == "_BaseOperator__instantiated":
   +            if k in ("_BaseOperator__instantiated", 
"_BaseOperator__init_kwargs"):
                    # Don't set this until the _end_, as it changes behaviour 
of __setattr__
                    continue
                if k not in shallow_copy:
                    setattr(result, k, copy.deepcopy(v, memo))
                else:
                    setattr(result, k, copy.copy(v))
   +                shallow_map = {id(v): getattr(result, k)}
   +        for k, v in self.__dict__["_BaseOperator__init_kwargs"]:
   +            id_ = id(v)
   +            if id_ in shallow_map:
   +                init_kwargs[k] = shallow_map[id_]
   +            elif id_ in memo:
   +                init_kwargs[k] = memo[id_]
   +            else:
   +                init_kwargs[k] = copy.deepcopy(v, memo)
   +
            result.__instantiated = self.__instantiated
            return result
    
   ```
   Shall we use the `memo` dict instead of the additional `shallow_map` dict? 
Could prevent the same issue to happen further down the line, if the same value 
is somehow referenced elsewhere. 
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to