Grzegorz Hejnowicz created AIRFLOW-3182:
-------------------------------------------

             Summary: 'all_done' trigger rule works incorrectly with 
BranchPythonOperator upstream tasks
                 Key: AIRFLOW-3182
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3182
             Project: Apache Airflow
          Issue Type: Bug
    Affects Versions: 1.10.0, 1.9.0
            Reporter: Grzegorz Hejnowicz
         Attachments: BrannchPythonOperator.png

We have a job that runs some data processing every hour. At the end of the day 
we need to run aggregation on all data generated by the 'hourly' jobs, 
regardless if any 'hourly' job failed or not. For this purpose we have prepared 
DAG that uses BranchPythonOperator in order to decide which 'hourly' job needs 
to be run in given time and when task for hour 23 is done, we trigger the 
aggregation (downstream). For this to work regardless of the last 'hourly' task 
status the *'all_done'* trigger rule is set in the aggregation task. 
Unfortunately, such configuration works incorrectly causing aggregation task to 
be run after every 'hourly' task, despite the fact the aggregation task is set 
as downstream for 'task_for_hour-23' +only+:

 

Here's sample code:
{code:java}
# coding: utf-8
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import TriggerRule
from datetime import datetime
import logging

dag_id = 'test'

today = datetime.today().strftime("%Y-%m-%d");
task_prefix = 'task_for_hour-'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 18),
    'catchup': False,
}

dag = DAG(
    dag_id=dag_id,
    default_args=default_args,
    schedule_interval="@hourly",
    catchup=False
)

# Setting the current hour
def get_current_hour():
    return datetime.now().hour


# Returns the name id of the task to launch next (task_for_hour-0, 
task_for_hour-1, etc.)
def branch():
    return task_prefix + str(get_current_hour())


# Running hourly job
def run_hourly_job(**kwargs):
    current_hour = get_current_hour()
    logging.info("Running job for hour: %s" % current_hour)


# Main daily aggregation
def run_daily_aggregation(**kwargs):
    logging.info("Running daily aggregation for %s" % today)

    
start_task = DummyOperator(
    task_id='start',
    dag=dag
)

# 'branch' method returns name of the task to be run next.
hour_branching = BranchPythonOperator(
    task_id='hour_branching',
    python_callable=branch,
    dag=dag)

run_aggregation = PythonOperator(
    task_id='daily_aggregation',
    python_callable=run_daily_aggregation,
    provide_context=True,
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag
)

start_task.set_downstream(hour_branching)

# Create tasks for each hour
for hour in range(24):
    if hour == 23:
        task_for_hour_23 = PythonOperator(
            task_id=task_prefix + '23',
            python_callable=run_hourly_job,
            provide_context=True,
            dag=dag
        )

        hour_branching.set_downstream(task_for_hour_23)
        task_for_hour_23.set_downstream(run_aggregation)
    else:
        hour_branching.set_downstream(PythonOperator(
            task_id=task_prefix + str(hour),
            python_callable=run_hourly_job,
            provide_context=True,
            dag=dag)
        )
{code}
This me be also related to AIRFLOW-1419



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to