[ 
https://issues.apache.org/jira/browse/AIRFLOW-3182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687820#comment-16687820
 ] 

Iuliia Volkova commented on AIRFLOW-3182:
-----------------------------------------

[~Zeckt] , your error not relative to BranchOperator, just because you don't 
understand what is Branching, you not need Branch 
 # Returns the name id of the task to launch next (task_for_hour-0, 
task_for_hour-1, etc.)
def branch(): return task_prefix + str(get_current_hour())

BranchOperator using when you have a condition (it's described in doc: 
[https://airflow.apache.org/concepts.html?highlight=branch%20operator#branching)]
 and you need to define what do next on this condition, simple example:
{code:java}
from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator, 
BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import TriggerRule

with DAG(dag_id="branch_behavior_operator", start_date=datetime(2018, 11, 15),
         schedule_interval=None) as dag:

    def branch_check():
        if True:
            return 'success_way'
        else:
            return 'dummy_task'


    t1 = BranchPythonOperator(task_id='check_condition', 
python_callable=branch_check)

    def print_hello():
        print('Hello!')

    t2_0 = DummyOperator(task_id='success_way')
    t2 = PythonOperator(task_id='print_task', python_callable=print_hello)
    t2_1 = DummyOperator(task_id='i_need_to_be_success_too')

    t3 = DummyOperator(task_id='dummy_task')

    t1.set_downstream([t3, t2_0])

    t2_0.set_downstream([t2, t2_1])

    t4 = DummyOperator(task_id='final_task', 
trigger_rule=TriggerRule.ALL_SUCCESS)

    t4.set_upstream([t2_1, t2])

{code}
a result of this DAG will be: 

!Screen Shot 2018-11-15 at 13.51.07.png|height=250!

You need just use upstream and downstream without BranchOperator, like it's 
done in my example  t2_0.set_downstream([t2, t2_1]) 

> 'all_done' trigger rule works incorrectly with BranchPythonOperator upstream 
> tasks
> ----------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3182
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3182
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Greg H
>            Priority: Major
>         Attachments: BrannchPythonOperator.png, Screen Shot 2018-11-15 at 
> 13.51.07.png
>
>
> We have a job that runs some data processing every hour. At the end of the 
> day we need to run aggregation on all data generated by the 'hourly' jobs, 
> regardless if any 'hourly' job failed or not. For this purpose we have 
> prepared DAG that uses BranchPythonOperator in order to decide which 'hourly' 
> job needs to be run in given time and when task for hour 23 is done, we 
> trigger the aggregation (downstream). For this to work regardless of the last 
> 'hourly' task status the *'all_done'* trigger rule is set in the aggregation 
> task. Unfortunately, such configuration works incorrectly causing aggregation 
> task to be run after every 'hourly' task, despite the fact the aggregation 
> task is set as downstream for 'task_for_hour-23' +only+:
>   !BrannchPythonOperator.png!
> Here's sample code:
> {code:java}
> # coding: utf-8
> from airflow import DAG
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.models import TriggerRule
> from datetime import datetime
> import logging
> dag_id = 'test'
> today = datetime.today().strftime("%Y-%m-%d");
> task_prefix = 'task_for_hour-'
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2018, 6, 18),
>     'catchup': False,
> }
> dag = DAG(
>     dag_id=dag_id,
>     default_args=default_args,
>     schedule_interval="@hourly",
>     catchup=False
> )
> # Setting the current hour
> def get_current_hour():
>     return datetime.now().hour
> # Returns the name id of the task to launch next (task_for_hour-0, 
> task_for_hour-1, etc.)
> def branch():
>     return task_prefix + str(get_current_hour())
> # Running hourly job
> def run_hourly_job(**kwargs):
>     current_hour = get_current_hour()
>     logging.info("Running job for hour: %s" % current_hour)
> # Main daily aggregation
> def run_daily_aggregation(**kwargs):
>     logging.info("Running daily aggregation for %s" % today)
>     
> start_task = DummyOperator(
>     task_id='start',
>     dag=dag
> )
> # 'branch' method returns name of the task to be run next.
> hour_branching = BranchPythonOperator(
>     task_id='hour_branching',
>     python_callable=branch,
>     dag=dag)
> run_aggregation = PythonOperator(
>     task_id='daily_aggregation',
>     python_callable=run_daily_aggregation,
>     provide_context=True,
>     trigger_rule=TriggerRule.ALL_DONE,
>     dag=dag
> )
> start_task.set_downstream(hour_branching)
> # Create tasks for each hour
> for hour in range(24):
>     if hour == 23:
>         task_for_hour_23 = PythonOperator(
>             task_id=task_prefix + '23',
>             python_callable=run_hourly_job,
>             provide_context=True,
>             dag=dag
>         )
>         hour_branching.set_downstream(task_for_hour_23)
>         task_for_hour_23.set_downstream(run_aggregation)
>     else:
>         hour_branching.set_downstream(PythonOperator(
>             task_id=task_prefix + str(hour),
>             python_callable=run_hourly_job,
>             provide_context=True,
>             dag=dag)
>         )
> {code}
> This me be also related to AIRFLOW-1419



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to