henryzhangsta opened a new issue #17014: URL: https://github.com/apache/airflow/issues/17014
**Apache Airflow version**: 2.1.2 **Kubernetes version (if you are using kubernetes)** (use `kubectl version`): 1.19.8 **Environment**: - **Cloud provider or hardware configuration**: - **OS** (e.g. from /etc/os-release): Amazon Linux EKS - **Kernel** (e.g. `uname -a`): - **Install tools**: Helm (community chart) - **Others**: **What happened**: With the addition of `__new__` to the `BaseOperatorMeta` class, this breaks our usage of operators that allow configuration through `__init_subclass__` arguments. Relevant python bug: https://bugs.python.org/issue29581 **What you expected to happen**: We should be able to use `__init_subclass__` in operators as we used to be able to. We relied on this behavior to customize some of our subclasses. This is working in 2.0.2 (though might exactly be considered a regression). **How to reproduce it**: This is a broken example ```py3 import datetime from airflow import DAG from airflow.operators.bash import BashOperator class PrefixedBashOperator(BashOperator): def __init_subclass__(cls, command: str = None, **kwargs): if command is not None: cls._command = command super().__init_subclass__(**kwargs) def __init__(self, bash_command, **kwargs): super().__init__(bash_command=self._command + ' ' + bash_command, **kwargs) class EchoOperator(PrefixedBashOperator, command='echo'): pass with DAG(dag_id='foo', start_date=datetime.datetime(2021, 7, 1)) as dag: EchoOperator(task_id='foo', bash_command='-e "from airflow"', dag=dag) ``` This results in error: ``` TypeError: __new__() got an unexpected keyword argument 'command' ``` **Anything else we need to know**: This example works. This shows that all that is needed to fix the issue is add `**kwargs` to `__new__` ```py3 import datetime from abc import ABCMeta from airflow import DAG from airflow.models.baseoperator import BaseOperatorMeta from airflow.operators.bash import BashOperator class NewBaseOperatorMeta(BaseOperatorMeta): def __new__(cls, name, bases, namespace, **kwargs): new_cls = super(ABCMeta, cls).__new__(cls, name, bases, namespace, **kwargs) new_cls.__init__ = cls._apply_defaults(new_cls.__init__) # type: ignore return new_cls class PrefixedBashOperator(BashOperator, metaclass=NewBaseOperatorMeta): def __init_subclass__(cls, command: str = None, **kwargs): if command is not None: cls._command = command super().__init_subclass__(**kwargs) def __init__(self, bash_command, **kwargs): super().__init__(bash_command=self._command + ' ' + bash_command, **kwargs) class EchoOperator(PrefixedBashOperator, command='echo'): pass with DAG(dag_id='foo', start_date=datetime.datetime(2021, 7, 1)) as dag: EchoOperator(task_id='foo', bash_command='-e "from airflow"', dag=dag) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
