ashb commented on a change in pull request #20743:
URL: https://github.com/apache/airflow/pull/20743#discussion_r786143028



##########
File path: airflow/models/baseoperator.py
##########
@@ -207,6 +207,7 @@ def apply_defaults(self: "BaseOperator", *args: Any, 
**kwargs: Any) -> Any:
 
             result = func(self, **kwargs, default_args=default_args)
             # Store the args passed to init -- we need them to support 
task.map serialzation!
+            kwargs.pop('task_id', None)

Review comment:
       This is the other option:
   
   ```diff
   diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
   index 752b8a6cf..71170320d 100644
   --- a/airflow/models/baseoperator.py
   +++ b/airflow/models/baseoperator.py
   @@ -136,6 +136,7 @@ class BaseOperatorMeta(abc.ABCMeta):
            non_optional_args = {
                name for (name, param) in non_variadic_params.items() if 
param.default == param.empty
            }
   +        non_optional_args -= {'task_id'}
    
            class autostacklevel_warn:
                def __init__(self):
   @@ -158,7 +159,7 @@ class BaseOperatorMeta(abc.ABCMeta):
                func.__globals__['warnings'] = autostacklevel_warn()
    
            @functools.wraps(func)
   -        def apply_defaults(self: "BaseOperator", *args: Any, **kwargs: Any) 
-> Any:
   +        def apply_defaults(self: "BaseOperator", *args: Any, task_id: str, 
**kwargs: Any) -> Any:
                from airflow.models.dag import DagContext
                from airflow.utils.task_group import TaskGroupContext
    
   @@ -201,15 +202,15 @@ class BaseOperatorMeta(abc.ABCMeta):
    
                hook = getattr(self, '_hook_apply_defaults', None)
                if hook:
   -                args, kwargs = hook(**kwargs, default_args=default_args)
   +                args, kwargs = hook(task_id=task_id, **kwargs, 
default_args=default_args)
   +                task_id = kwargs.pop('task_id')
                    default_args = kwargs.pop('default_args', {})
    
                if not hasattr(self, '_BaseOperator__init_kwargs'):
                    self._BaseOperator__init_kwargs = {}
    
   -            result = func(self, **kwargs, default_args=default_args)
   +            result = func(self, **kwargs, task_id=task_id, 
default_args=default_args)
                # Store the args passed to init -- we need them to support 
task.map serialzation!
   -            kwargs.pop('task_id', None)
                self._BaseOperator__init_kwargs.update(kwargs)  # type: ignore
    
                # Here we set upstream task defined by XComArgs passed to 
template fields of the operator
   
   ```
   
   Which do you think is better @uranusjr?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to