Gollum999 opened a new issue #22670:
URL: https://github.com/apache/airflow/issues/22670
### Apache Airflow version
2.2.4 (latest released)
### What happened
If a user defines a custom Operator that also uses a mixin class, there are
some cases where that mixin will not be properly initialized. See example
below.
I believe I've traced this down to the fact that
`airflow.utils.log.logging_mixin.LoggingMixin.__init__` does not call
`super().__init__`.
It is a common misconception that `super()` delegates to the base class of
the type that is calling `super()`. It actually delegates to the next viable
type from the Method Resolution Order (MRO) of the *entire inheritance
hierarchy of the calling type*.
Importantly, this hierarchy may include user-defined types that are
*completely unrelated to the type that is calling `super()`*. See
[here](https://fuhm.net/super-harmful/) for a more in-depth description of the
types of problems this can cause, and
[here](https://rhettinger.wordpress.com/2011/05/26/super-considered-super/) for
some tips for handling this in practice.
Unfortunately I think a proper fix will be much more involved than simply
updating `LoggingMixin.__init__` to call `super().__init__(*args, **kwargs)`.
I tried making this change locally, and I started getting errors like
`TypeError: object.__init__() takes exactly one argument (the instance to
initialize)`. Since these `__init__` functions are (almost) all parameterized,
ultimately all types in the hierarchy need to coordinate in their usage of
`super()` to consume all of the arguments before they reach the "root" call to
`object.__init__`. The second article I linked shows some examples of this,
but in practice I think it would essentially require global knowledge of every
class in Airflow to confidently make this sort of change.
### What you think should happen instead
Users should be able to use mixins in custom operators.
### How to reproduce
Here's an example DAG that breaks:
```
#!/usr/bin/env python3
from datetime import datetime
from airflow import DAG
from airflow.models.baseoperator import BaseOperator
class MyMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.extra_message = ':)'
class HelloOperator(BaseOperator, MyMixin):
def __init__(self, *, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
print(f'Hello {self.name}! {self.extra_message}')
with DAG(
'test_dag',
default_args={'retries': 0},
start_date=datetime(2022, 3, 30),
) as dag:
HelloOperator(task_id='task', name='Bob')
```
This fails with the following error:
```
[2022-03-31, 07:35:57 CDT] {taskinstance.py:1718} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1334, in _run_raw_task
self._execute_task_with_callbacks(context)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1460, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1516, in _execute_task
result = execute_callable(context=context)
File "/home/tsanders/airflow_test/dags/test_dag.py", line 20, in execute
print(f'Hello {self.name}! {self.extra_message}')
AttributeError: 'HelloOperator' object has no attribute 'extra_message'
[2022-03-31, 07:35:57 CDT] {taskinstance.py:1272} INFO - Marking task as
FAILED. dag_id=test_dag, task_id=task, execution_date=20220329T000000,
start_date=20220331T173557, end_date=20220331T173557
[2022-03-31, 07:35:57 CDT] {standard_task_runner.py:89} ERROR - Failed to
execute job 25 for task task
Traceback (most recent call last):
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py",
line 85, in _start_by_fork
args.func(args, dag=self.dag)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/cli_parser.py",
line 48, in command
return func(*args, **kwargs)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/cli.py",
line 92, in wrapper
return f(*args, **kwargs)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
line 298, in task_run
_run_task_by_selected_method(args, dag, ti)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/cli/commands/task_command.py",
line 180, in _run_raw_task
ti._run_raw_task(
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/utils/session.py",
line 70, in wrapper
return func(*args, session=session, **kwargs)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1334, in _run_raw_task
self._execute_task_with_callbacks(context)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1460, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File
"/opt/conda/envs/production/lib/python3.9/site-packages/airflow/models/taskinstance.py",
line 1516, in _execute_task
result = execute_callable(context=context)
File "/home/tsanders/airflow_test/dags/test_dag.py", line 20, in execute
print(f'Hello {self.name}! {self.extra_message}')
AttributeError: 'HelloOperator' object has no attribute 'extra_message'
```
### Operating System
CentOS 7.4
### Versions of Apache Airflow Providers
N/A
### Deployment
Other
### Deployment details
Standalone
### Anything else
As a note, it is possible to work around this in user code by forcing a
different MRO. For example, we can use `class HelloOperator(MyMixin,
BaseOperator)` or `class MyMixin(LoggingMixin)` to ensure that `LoggingMixin`
comes after `MyMixin` in the MRO. However, these workarounds also require that
`HelloOperator` and `MyMixin` handle `super().__init__` correctly - otherwise
you'll see this same type of issue where internal Airflow classes are not
properly initialized.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]