Gollum999 opened a new issue #22670:
URL: https://github.com/apache/airflow/issues/22670


   ### Apache Airflow version
   
   2.2.4 (latest released)
   
   ### What happened
   
   If a user defines a custom Operator that also uses a mixin class, there are 
some cases where that mixin will not be properly initialized.  See example 
below.
   
   I believe I've traced this down to the fact that 
`airflow.utils.log.logging_mixin.LoggingMixin.__init__` does not call 
`super().__init__`.
   
   It is a common misconception that `super()` delegates to the base class of 
the type that is calling `super()`.  It actually delegates to the next viable 
type from the Method Resolution Order (MRO) of the *entire inheritance 
hierarchy of the calling type*. 
    Importantly, this hierarchy may include user-defined types that are 
*completely unrelated to the type that is calling `super()`*.  See 
[here](https://fuhm.net/super-harmful/) for a more in-depth description of the 
types of problems this can cause, and 
[here](https://rhettinger.wordpress.com/2011/05/26/super-considered-super/) for 
some tips for handling this in practice.
   
   Unfortunately I think a proper fix will be much more involved than simply 
updating `LoggingMixin.__init__` to call `super().__init__(*args, **kwargs)`.  
I tried making this change locally, and I started getting errors like 
`TypeError: object.__init__() takes exactly one argument (the instance to 
initialize)`.  Since these `__init__` functions are (almost) all parameterized, 
ultimately all types in the hierarchy need to coordinate in their usage of 
`super()` to consume all of the arguments before they reach the "root" call to 
`object.__init__`.  The second article I linked shows some examples of this, 
but in practice I think it would essentially require global knowledge of every 
class in Airflow to confidently make this sort of change.
   
   ### What you think should happen instead
   
   Users should be able to use mixins in custom operators.
   
   ### How to reproduce
   
   Here's an example DAG that breaks:
   ```
   #!/usr/bin/env python3
   from datetime import datetime
   
   from airflow import DAG
   from airflow.models.baseoperator import BaseOperator
   
   
   class MyMixin:
       def __init__(self, *args, **kwargs):
           super().__init__(*args, **kwargs)
           self.extra_message = ':)'
   
   
   class HelloOperator(BaseOperator, MyMixin):
       def __init__(self, *, name: str, **kwargs) -> None:
           super().__init__(**kwargs)
           self.name = name
   
       def execute(self, context):
           print(f'Hello {self.name}! {self.extra_message}')
   
   
   with DAG(
           'test_dag',
           default_args={'retries': 0},
           start_date=datetime(2022, 3, 30),
   ) as dag:
       HelloOperator(task_id='task', name='Bob')
   
   ```
   
   This fails with the following error:
   ```
   [2022-03-31, 07:35:57 CDT] {taskinstance.py:1718} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1334, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1460, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1516, in _execute_task
       result = execute_callable(context=context)
     File "/home/tsanders/airflow_test/dags/test_dag.py", line 20, in execute
       print(f'Hello {self.name}! {self.extra_message}')
   AttributeError: 'HelloOperator' object has no attribute 'extra_message'
   [2022-03-31, 07:35:57 CDT] {taskinstance.py:1272} INFO - Marking task as 
FAILED. dag_id=test_dag, task_id=task, execution_date=20220329T000000, 
start_date=20220331T173557, end_date=20220331T173557
   [2022-03-31, 07:35:57 CDT] {standard_task_runner.py:89} ERROR - Failed to 
execute job 25 for task task
   Traceback (most recent call last):
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py",
 line 85, in _start_by_fork
       args.func(args, dag=self.dag)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/cli_parser.py",
 line 48, in command
       return func(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 92, in wrapper
       return f(*args, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
 line 298, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
 line 107, in _run_task_by_selected_method
       _run_raw_task(args, ti)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
 line 180, in _run_raw_task
       ti._run_raw_task(
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
 line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1334, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1460, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File 
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 1516, in _execute_task
       result = execute_callable(context=context)
     File "/home/tsanders/airflow_test/dags/test_dag.py", line 20, in execute
       print(f'Hello {self.name}! {self.extra_message}')
   AttributeError: 'HelloOperator' object has no attribute 'extra_message'
   ```
   
   ### Operating System
   
   CentOS 7.4
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Standalone
   
   ### Anything else
   
   As a note, it is possible to work around this in user code by forcing a 
different MRO.  For example, we can use `class HelloOperator(MyMixin, 
BaseOperator)` or `class MyMixin(LoggingMixin)` to ensure that `LoggingMixin` 
comes after `MyMixin` in the MRO.  However, these workarounds also require that 
`HelloOperator` and `MyMixin` handle `super().__init__` correctly - otherwise 
you'll see this same type of issue where internal Airflow classes are not 
properly initialized.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to