jedcunningham commented on code in PR #22975:
URL: https://github.com/apache/airflow/pull/22975#discussion_r849889616


##########
airflow/models/mappedoperator.py:
##########
@@ -484,14 +486,26 @@ def _get_unmap_kwargs(self) -> Dict[str, Any]:
             **self.mapped_kwargs,
         }
 
-    def unmap(self) -> "BaseOperator":
-        """Get the "normal" Operator after applying the current mapping."""
+    def unmap(self, unmap_kwargs: Optional[Dict[str, Any]] = None) -> 
"BaseOperator":
+        """
+        Get the "normal" Operator after applying the current mapping.
+
+        If ``operator_class`` is not a class (i.e. this DAG has been 
deserialized) then this will return a
+        SeriSerializedBaseOperator that aims to "look like" the real operator.

Review Comment:
   ```suggestion
           SerializedBaseOperator that aims to "look like" the real operator.
   ```



##########
docs/apache-airflow/concepts/dynamic-task-mapping.rst:
##########
@@ -224,6 +224,42 @@ Currently it is only possible to map against a dict, a 
list, or one of those typ
 
 If an upstream task returns an unmappable type, the mapped task will fail at 
run-time with an ``UnmappableXComTypePushed`` exception. For instance, you 
can't have the upstream task return a plain string – it must be a list or a 
dict.
 
+How to templated fields and mapped arguments interact?

Review Comment:
   ```suggestion
   How do templated fields and mapped arguments interact?
   ```



##########
airflow/decorators/base.py:
##########
@@ -441,13 +443,38 @@ def _get_unmap_kwargs(self) -> Dict[str, Any]:
             **self.mapped_kwargs,
         }
 
-    def _expand_mapped_field(self, key: str, content: Any, context: Context, 
*, session: Session) -> Any:
-        if key != "op_kwargs" or not isinstance(content, 
collections.abc.Mapping):
-            return content
-        # The magic super() doesn't work here, so we use the explicit form.
-        # Not using super(..., self) to work around pyupgrade bug.
-        sup: Any = super(DecoratedMappedOperator, DecoratedMappedOperator)
-        return {k: sup._expand_mapped_field(self, k, v, context, 
session=session) for k, v in content.items()}
+    def _resolve_expansion_kwargs(
+        self, kwargs: Dict[str, Any], template_fields: Set[str], context: 
Context, session: "Session"
+    ) -> None:
+        expansion_kwargs = self._get_expansion_kwargs()
+
+        self._already_resolved_op_kwargs = set()
+        for k, v in expansion_kwargs.items():
+            if isinstance(v, XComArg):
+                self._already_resolved_op_kwargs.add(k)
+                v = v.resolve(context, session=session)
+            v = self._expand_mapped_field(k, v, context, session=session)
+            kwargs['op_kwargs'][k] = v
+            template_fields.discard(k)
+
+    def render_template(
+        self,
+        value: Any,
+        context: Context,
+        jinja_env: Optional["jinja2.Environment"] = None,
+        seen_oids: Optional[Set] = None,
+    ) -> Any:
+        if hasattr(self, '_combined_op_kwargs') and value is 
self._combined_op_kwargs:
+            # Avoid rendering values that came oout of resolved XComArgs

Review Comment:
   ```suggestion
               # Avoid rendering values that came out of resolved XComArgs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to