ashb commented on a change in pull request #16732:
URL: https://github.com/apache/airflow/pull/16732#discussion_r662211480
##########
File path: airflow/models/baseoperator.py
##########
@@ -1566,36 +1567,41 @@ def chain(*tasks: Union[BaseOperator,
Sequence[BaseOperator]]):
t4.set_downstream(t6)
t5.set_downstream(t6)
- :param tasks: List of tasks or List[airflow.models.BaseOperator] to set
dependencies
- :type tasks: List[airflow.models.BaseOperator] or
airflow.models.BaseOperator
+ :param tasks: List of tasks, List[airflow.models.BaseOperator], XComArg,
or List[airflow.models.XComArg]
+ to set dependencies
Review comment:
As we are changing this line anyway, we don't need to duplicate the
types here -- prose is better
```suggestion
:param tasks: List of tasks or XComArgs to set dependencies
```
##########
File path: airflow/models/baseoperator.py
##########
@@ -1566,36 +1567,41 @@ def chain(*tasks: Union[BaseOperator,
Sequence[BaseOperator]]):
t4.set_downstream(t6)
t5.set_downstream(t6)
- :param tasks: List of tasks or List[airflow.models.BaseOperator] to set
dependencies
- :type tasks: List[airflow.models.BaseOperator] or
airflow.models.BaseOperator
+ :param tasks: List of tasks, List[airflow.models.BaseOperator], XComArg,
or List[airflow.models.XComArg]
+ to set dependencies
+ :type tasks: List[airflow.models.BaseOperator],
airflow.models.BaseOperator, List[airflow.models.XComArg],
+ or XComArg
"""
+ from airflow.models.xcom_arg import XComArg
+
for index, up_task in enumerate(tasks[:-1]):
down_task = tasks[index + 1]
- if isinstance(up_task, BaseOperator):
+ if isinstance(up_task, (BaseOperator, XComArg)):
up_task.set_downstream(down_task)
continue
- if isinstance(down_task, BaseOperator):
+ if isinstance(down_task, (BaseOperator, XComArg)):
down_task.set_upstream(up_task)
continue
if not isinstance(up_task, Sequence) or not isinstance(down_task,
Sequence):
raise TypeError(
- 'Chain not supported between instances of {up_type} and
{down_type}'.format(
+ "Chain not supported between instances of {up_type} and
{down_type}".format(
Review comment:
```suggestion
'Chain not supported between instances of {up_type} and
{down_type}'.format(
```
Please avoid stylistic only changes
##########
File path: tests/dags/test_chain_xcomargs.py
##########
@@ -0,0 +1,66 @@
+#
Review comment:
This shouldn't be a new file, but some extra cases with asserts here
https://github.com/apache/airflow/blob/fa811057a6ae0fc6c5e4bff1e18971c262a42a4c/tests/models/test_baseoperator.py#L381-L401
##########
File path: airflow/models/baseoperator.py
##########
@@ -1566,36 +1567,41 @@ def chain(*tasks: Union[BaseOperator,
Sequence[BaseOperator]]):
t4.set_downstream(t6)
t5.set_downstream(t6)
- :param tasks: List of tasks or List[airflow.models.BaseOperator] to set
dependencies
- :type tasks: List[airflow.models.BaseOperator] or
airflow.models.BaseOperator
+ :param tasks: List of tasks, List[airflow.models.BaseOperator], XComArg,
or List[airflow.models.XComArg]
+ to set dependencies
+ :type tasks: List[airflow.models.BaseOperator],
airflow.models.BaseOperator, List[airflow.models.XComArg],
+ or XComArg
"""
+ from airflow.models.xcom_arg import XComArg
+
for index, up_task in enumerate(tasks[:-1]):
down_task = tasks[index + 1]
- if isinstance(up_task, BaseOperator):
+ if isinstance(up_task, (BaseOperator, XComArg)):
up_task.set_downstream(down_task)
continue
- if isinstance(down_task, BaseOperator):
+ if isinstance(down_task, (BaseOperator, XComArg)):
down_task.set_upstream(up_task)
continue
if not isinstance(up_task, Sequence) or not isinstance(down_task,
Sequence):
raise TypeError(
- 'Chain not supported between instances of {up_type} and
{down_type}'.format(
+ "Chain not supported between instances of {up_type} and
{down_type}".format(
up_type=type(up_task), down_type=type(down_task)
)
)
up_task_list = up_task
down_task_list = down_task
if len(up_task_list) != len(down_task_list):
raise AirflowException(
- f'Chain not supported different length Iterable '
- f'but get {len(up_task_list)} and {len(down_task_list)}'
+ f"Chain not supported different length Iterable "
+ f"but get {len(up_task_list)} and {len(down_task_list)}"
Review comment:
```suggestion
f'Chain not supported different length Iterable '
f'but get {len(up_task_list)} and {len(down_task_list)}'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]