uranusjr commented on code in PR #26100:
URL: https://github.com/apache/airflow/pull/26100#discussion_r965338277


##########
airflow/models/mappedoperator.py:
##########
@@ -162,6 +165,7 @@ class OperatorPartial:
     """
 
     operator_class: Type["BaseOperator"]
+    params: Union[ParamsDict, dict]

Review Comment:
   To be able to expand a task against `params`, we need to be able to “merge” 
default DAG- and task-level params with the user-mapped params. So `params` is 
split out of other partial kwargs and treated specially. Partial kwargs should 
_never_ contain the `"params"` key.



##########
airflow/models/baseoperator.py:
##########
@@ -307,7 +306,11 @@ def partial(
     partial_kwargs["executor_config"] = partial_kwargs["executor_config"] or {}
     partial_kwargs["resources"] = coerce_resources(partial_kwargs["resources"])
 
-    return OperatorPartial(operator_class=operator_class, 
kwargs=partial_kwargs)
+    return OperatorPartial(
+        operator_class=operator_class,
+        kwargs=partial_kwargs,
+        params=partial_params,

Review Comment:
   … but pass them to this separate attribute.



##########
airflow/models/baseoperator.py:
##########
@@ -1183,17 +1186,17 @@ def render_template_fields(
         context: Context,
         jinja_env: Optional["jinja2.Environment"] = None,
     ) -> Optional["BaseOperator"]:
-        """Template all attributes listed in template_fields.
+        """Template all attributes listed in *self.template_fields*.
 
         This mutates the attributes in-place and is irreversible.
 
-        :param context: Dict with values to apply on content
-        :param jinja_env: Jinja environment
+        :param context: Context dict with values to apply on content.
+        :param jinja_env: Jinja environment to use for rendering.
         """
         if not jinja_env:
             jinja_env = self.get_template_env()
         self._do_render_template_fields(self, self.template_fields, context, 
jinja_env, set())
-        return self
+        return None

Review Comment:
   This does not change any behaviour, I only did it so the behaviour is easier 
to explain…



##########
airflow/models/taskinstance.py:
##########
@@ -2195,10 +2181,10 @@ def render_templates(self, context: Optional[Context] = 
None) -> "Operator":
         """
         if not context:
             context = self.get_template_context()
+        original_task = self.task
         rendered_task = self.task.render_template_fields(context)
-        if rendered_task is None:  # Compatibility -- custom renderer, assume 
unmapped.
-            return self.task
-        original_task, self.task = self.task, rendered_task
+        if rendered_task is not None:  # Mapped operator, assign unmapped task.
+            self.task = rendered_task
         return original_task

Review Comment:
   … here. Previously it’s difficult to follow when `rendered_task` is None and 
when it’s not; now it’s simple—a mapped operator (MappedOperator subclases) 
performs unmapping and returns the unmapped task, while a non-mapped operator 
(BaseOperator subclasses) return None because no unmapping is needed.



##########
airflow/models/baseoperator.py:
##########
@@ -256,7 +256,6 @@ def partial(
     partial_kwargs.setdefault("end_date", end_date)
     partial_kwargs.setdefault("owner", owner)
     partial_kwargs.setdefault("email", email)
-    partial_kwargs.setdefault("params", default_params)

Review Comment:
   So we don’t set the default params in `partial_kwargs`…



##########
airflow/decorators/base.py:
##########
@@ -363,7 +363,6 @@ def _expand(self, expand_input: ExpandInput, *, strict: 
bool) -> XComArg:
         task_id = get_unique_task_id(partial_kwargs.pop("task_id"), dag, 
task_group)
         if task_group:
             task_id = task_group.child_id(task_id)
-        params = partial_kwargs.pop("params", None) or default_params

Review Comment:
   And this line is deleted since it’s now no-op.



##########
airflow/models/mappedoperator.py:
##########
@@ -565,16 +568,24 @@ def _get_unmap_kwargs(self, mapped_kwargs: Mapping[str, 
Any], *, strict: bool) -
                 mapped_kwargs,
                 fail_reason="unmappable or already specified",
             )
-        # Ordering is significant; mapped kwargs should override partial ones.
+
+        # If params appears in the mapped kwargs, we need to merge it into the
+        # partial params, overriding existing keys.
+        params = copy.copy(self.params)
+        with contextlib.suppress(KeyError):
+            params.update(mapped_kwargs["params"])
+
+        # Ordering is significant; mapped kwargs should override partial ones,
+        # and the specially handled params should be respected.
         return {
             "task_id": self.task_id,
             "dag": self.dag,
             "task_group": self.task_group,
-            "params": self.params,
             "start_date": self.start_date,
             "end_date": self.end_date,
             **self.partial_kwargs,
             **mapped_kwargs,
+            "params": params,
         }

Review Comment:
   Actual merging happens here. The merged `params` is put last so it overrides 
the incomplete `mapped_kwargs["params"]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to