[ 
https://issues.apache.org/jira/browse/AIRFLOW-3182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679567#comment-16679567
 ] 

jack commented on AIRFLOW-3182:
-------------------------------

Could the trigger rule you set be wrong?

I think you should do:
run_aggregation = PythonOperator(
    task_id='daily_aggregation',
    python_callable=run_daily_aggregation,
    provide_context=True,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag
 

This means that daily_aggregation will start only when : start, hour_branch & 
task_for_hour-23 are success.

 

In your example daily_aggregation will start when start, hour_branch & 
task_for_hour-23 are done. and SKIP consider to be done. so it make scene it 
run every hour.

> 'all_done' trigger rule works incorrectly with BranchPythonOperator upstream 
> tasks
> ----------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3182
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3182
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Greg H
>            Priority: Major
>         Attachments: BrannchPythonOperator.png
>
>
> We have a job that runs some data processing every hour. At the end of the 
> day we need to run aggregation on all data generated by the 'hourly' jobs, 
> regardless if any 'hourly' job failed or not. For this purpose we have 
> prepared DAG that uses BranchPythonOperator in order to decide which 'hourly' 
> job needs to be run in given time and when task for hour 23 is done, we 
> trigger the aggregation (downstream). For this to work regardless of the last 
> 'hourly' task status the *'all_done'* trigger rule is set in the aggregation 
> task. Unfortunately, such configuration works incorrectly causing aggregation 
> task to be run after every 'hourly' task, despite the fact the aggregation 
> task is set as downstream for 'task_for_hour-23' +only+:
>   !BrannchPythonOperator.png!
> Here's sample code:
> {code:java}
> # coding: utf-8
> from airflow import DAG
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.models import TriggerRule
> from datetime import datetime
> import logging
> dag_id = 'test'
> today = datetime.today().strftime("%Y-%m-%d");
> task_prefix = 'task_for_hour-'
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2018, 6, 18),
>     'catchup': False,
> }
> dag = DAG(
>     dag_id=dag_id,
>     default_args=default_args,
>     schedule_interval="@hourly",
>     catchup=False
> )
> # Setting the current hour
> def get_current_hour():
>     return datetime.now().hour
> # Returns the name id of the task to launch next (task_for_hour-0, 
> task_for_hour-1, etc.)
> def branch():
>     return task_prefix + str(get_current_hour())
> # Running hourly job
> def run_hourly_job(**kwargs):
>     current_hour = get_current_hour()
>     logging.info("Running job for hour: %s" % current_hour)
> # Main daily aggregation
> def run_daily_aggregation(**kwargs):
>     logging.info("Running daily aggregation for %s" % today)
>     
> start_task = DummyOperator(
>     task_id='start',
>     dag=dag
> )
> # 'branch' method returns name of the task to be run next.
> hour_branching = BranchPythonOperator(
>     task_id='hour_branching',
>     python_callable=branch,
>     dag=dag)
> run_aggregation = PythonOperator(
>     task_id='daily_aggregation',
>     python_callable=run_daily_aggregation,
>     provide_context=True,
>     trigger_rule=TriggerRule.ALL_DONE,
>     dag=dag
> )
> start_task.set_downstream(hour_branching)
> # Create tasks for each hour
> for hour in range(24):
>     if hour == 23:
>         task_for_hour_23 = PythonOperator(
>             task_id=task_prefix + '23',
>             python_callable=run_hourly_job,
>             provide_context=True,
>             dag=dag
>         )
>         hour_branching.set_downstream(task_for_hour_23)
>         task_for_hour_23.set_downstream(run_aggregation)
>     else:
>         hour_branching.set_downstream(PythonOperator(
>             task_id=task_prefix + str(hour),
>             python_callable=run_hourly_job,
>             provide_context=True,
>             dag=dag)
>         )
> {code}
> This me be also related to AIRFLOW-1419



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to