potiuk commented on a change in pull request #6950: [AIRFLOW-6392] Remove 
cyclic dependency baseoperator <-> helpers
URL: https://github.com/apache/airflow/pull/6950#discussion_r361906152
 
 

 ##########
 File path: airflow/utils/helpers.py
 ##########
 @@ -142,83 +135,6 @@ def as_flattened_list(iterable):
     return [e for i in iterable for e in i]
 
 
-def chain(*tasks):
-    r"""
-    Given a number of tasks, builds a dependency chain.
-    Support mix airflow.models.BaseOperator and 
List[airflow.models.BaseOperator].
-    If you want to chain between two List[airflow.models.BaseOperator], have to
-    make sure they have same length.
-
-    chain(t1, [t2, t3], [t4, t5], t6)
-
-    is equivalent to
-
-      / -> t2 -> t4 \
-    t1               -> t6
-      \ -> t3 -> t5 /
-
-    t1.set_downstream(t2)
-    t1.set_downstream(t3)
-    t2.set_downstream(t4)
-    t3.set_downstream(t5)
-    t4.set_downstream(t6)
-    t5.set_downstream(t6)
-
-    :param tasks: List of tasks or List[airflow.models.BaseOperator] to set 
dependencies
-    :type tasks: List[airflow.models.BaseOperator] or 
airflow.models.BaseOperator
-    """
-    from airflow.models.baseoperator import BaseOperator
-    for index, up_task in enumerate(tasks[:-1]):
-        down_task = tasks[index + 1]
-        if isinstance(up_task, BaseOperator):
-            up_task.set_downstream(down_task)
-        elif isinstance(down_task, BaseOperator):
-            down_task.set_upstream(up_task)
-        else:
-            if not isinstance(up_task, Iterable) or not isinstance(down_task, 
Iterable):
-                raise TypeError(
-                    'Chain not supported between instances of {up_type} and 
{down_type}'.format(
-                        up_type=type(up_task), down_type=type(down_task)))
-            elif len(up_task) != len(down_task):
-                raise AirflowException(
-                    'Chain not supported different length Iterable but get 
{up_len} and {down_len}'.format(
-                        up_len=len(up_task), down_len=len(down_task)))
-            else:
-                for up, down in zip(up_task, down_task):
-                    up.set_downstream(down)
-
-
-def cross_downstream(from_tasks, to_tasks):
-    r"""
-    Set downstream dependencies for all tasks in from_tasks to all tasks in 
to_tasks.
-    E.g.: cross_downstream(from_tasks=[t1, t2, t3], to_tasks=[t4, t5, t6])
-    Is equivalent to:
-
-    t1 --> t4
-       \ /
-    t2 -X> t5
-       / \
-    t3 --> t6
-
-    t1.set_downstream(t4)
-    t1.set_downstream(t5)
-    t1.set_downstream(t6)
-    t2.set_downstream(t4)
-    t2.set_downstream(t5)
-    t2.set_downstream(t6)
-    t3.set_downstream(t4)
-    t3.set_downstream(t5)
-    t3.set_downstream(t6)
-
-    :param from_tasks: List of tasks to start from.
-    :type from_tasks: List[airflow.models.BaseOperator]
-    :param to_tasks: List of tasks to set as downstream dependencies.
-    :type to_tasks: List[airflow.models.BaseOperator]
-    """
-    for task in from_tasks:
-        task.set_downstream(to_tasks)
-
-
 def pprinttable(rows):
 
 Review comment:
   It's unrelated :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to