[ 
https://issues.apache.org/jira/browse/AIRFLOW-3182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688009#comment-16688009
 ] 

Greg H commented on AIRFLOW-3182:
---------------------------------

Thanks for your input. I'm new to Airflow and I agree that I may not fully 
understand how some items work (and yes, I did read the docs). Of course, this 
case may be refactored and done different ways.
 
What I understand, the BranhcOperator here is wrong because we're not using 
conditional statement, right? OK, having now the explicit condition, that btw. 
does the same thing as previously, results in the same behaviour...

{code}
def branch():
    for x in range(0, 24):
        if x == get_current_hour():
            return task_prefix + str(x)
{code}

It's just strange to me that some downstream task is run despite it's not 
connected to the executed upstream one ('daily_aggregation' and 
'task_four_hour-7' as in attached screen shot). If you think that this 
behaviour is correct or my example is still wrong, then please reject this 
Jira. 

> 'all_done' trigger rule works incorrectly with BranchPythonOperator upstream 
> tasks
> ----------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3182
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3182
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Greg H
>            Priority: Major
>         Attachments: BrannchPythonOperator.png, Screen Shot 2018-11-15 at 
> 13.51.07.png
>
>
> We have a job that runs some data processing every hour. At the end of the 
> day we need to run aggregation on all data generated by the 'hourly' jobs, 
> regardless if any 'hourly' job failed or not. For this purpose we have 
> prepared DAG that uses BranchPythonOperator in order to decide which 'hourly' 
> job needs to be run in given time and when task for hour 23 is done, we 
> trigger the aggregation (downstream). For this to work regardless of the last 
> 'hourly' task status the *'all_done'* trigger rule is set in the aggregation 
> task. Unfortunately, such configuration works incorrectly causing aggregation 
> task to be run after every 'hourly' task, despite the fact the aggregation 
> task is set as downstream for 'task_for_hour-23' +only+:
>   !BrannchPythonOperator.png!
> Here's sample code:
> {code:java}
> # coding: utf-8
> from airflow import DAG
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.models import TriggerRule
> from datetime import datetime
> import logging
> dag_id = 'test'
> today = datetime.today().strftime("%Y-%m-%d");
> task_prefix = 'task_for_hour-'
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2018, 6, 18),
>     'catchup': False,
> }
> dag = DAG(
>     dag_id=dag_id,
>     default_args=default_args,
>     schedule_interval="@hourly",
>     catchup=False
> )
> # Setting the current hour
> def get_current_hour():
>     return datetime.now().hour
> # Returns the name id of the task to launch next (task_for_hour-0, 
> task_for_hour-1, etc.)
> def branch():
>     return task_prefix + str(get_current_hour())
> # Running hourly job
> def run_hourly_job(**kwargs):
>     current_hour = get_current_hour()
>     logging.info("Running job for hour: %s" % current_hour)
> # Main daily aggregation
> def run_daily_aggregation(**kwargs):
>     logging.info("Running daily aggregation for %s" % today)
>     
> start_task = DummyOperator(
>     task_id='start',
>     dag=dag
> )
> # 'branch' method returns name of the task to be run next.
> hour_branching = BranchPythonOperator(
>     task_id='hour_branching',
>     python_callable=branch,
>     dag=dag)
> run_aggregation = PythonOperator(
>     task_id='daily_aggregation',
>     python_callable=run_daily_aggregation,
>     provide_context=True,
>     trigger_rule=TriggerRule.ALL_DONE,
>     dag=dag
> )
> start_task.set_downstream(hour_branching)
> # Create tasks for each hour
> for hour in range(24):
>     if hour == 23:
>         task_for_hour_23 = PythonOperator(
>             task_id=task_prefix + '23',
>             python_callable=run_hourly_job,
>             provide_context=True,
>             dag=dag
>         )
>         hour_branching.set_downstream(task_for_hour_23)
>         task_for_hour_23.set_downstream(run_aggregation)
>     else:
>         hour_branching.set_downstream(PythonOperator(
>             task_id=task_prefix + str(hour),
>             python_callable=run_hourly_job,
>             provide_context=True,
>             dag=dag)
>         )
> {code}
> This me be also related to AIRFLOW-1419



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to