jedcunningham commented on code in PR #25085:
URL: https://github.com/apache/airflow/pull/25085#discussion_r926912929
##########
airflow/models/taskinstance.py:
##########
@@ -2335,9 +2335,11 @@ def _record_task_map_for_downstreams(self, task:
"Operator", value: Any, *, sess
return
if value is None:
raise XComForMappingNotPushed()
- for validator in validators:
- validator(value)
- assert isinstance(value, collections.abc.Collection) # The validators
type-guard this.
+ if not isinstance(value, (collections.abc.Sequence, dict)):
+ raise UnmappableXComTypePushed(value)
+ if isinstance(value, (bytes, str)):
+ raise UnmappableXComTypePushed(value)
+ assert isinstance(value, collections.abc.Collection) # The
isinstance() checks above guard this.
Review Comment:
```suggestion
if TYPE_CHECKING:
assert isinstance(value, collections.abc.Collection) # The
isinstance() checks above guard this.
```
Is this assert just for typing? If so, we should [only do it when
TYPE_CHECKING](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#dont-use-asserts-outside-tests).
##########
airflow/models/xcom_arg.py:
##########
@@ -53,29 +53,129 @@ class XComArg(DependencyMixin):
This object can be used in legacy Operators via Jinja.
- **Example**: You can make this result to be part of any generated string ::
+ **Example**: You can make this result to be part of any generated string::
any_op = AnyOperator()
xcomarg = any_op.output
op1 = MyOperator(my_text_message=f"the value is {xcomarg}")
op2 = MyOperator(my_text_message=f"the value is {xcomarg['topic']}")
- :param operator: operator to which the XComArg belongs to
- :param key: key value which is used for xcom_pull (key in the XCom table)
+ :param operator: Operator instance to which the XComArg references.
+ :param key: Key used to pull the XCom value. Defaults to *XCOM_RETURN_KEY*,
+ i.e. the referenced operator's return value.
+ """
+
+ operator: "Operator"
+ key: str
+
+ @overload
+ def __new__(cls: Type["XComArg"], operator: "Operator", key: str =
XCOM_RETURN_KEY) -> "XComArg":
+ """Called when the user writes ``XComArg(...)`` directly."""
+
+ @overload
+ def __new__(cls: Type["XComArg"]) -> "XComArg":
+ """Called by Python internals from subclasses."""
+
+ def __new__(cls, *args, **kwargs) -> "XComArg":
+ if cls is XComArg:
+ return PlainXComArg(*args, **kwargs)
+ return super().__new__(cls)
+
+ @staticmethod
+ def iter_xcom_args(arg: Any) -> Iterator["XComArg"]:
+ """Return XComArg instances in an arbitrary value.
+
+ This recursively traverse ``arg`` and look for XComArg instances in any
Review Comment:
```suggestion
Recursively traverse ``arg`` and look for XComArg instances in any
```
nit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]