[ 
https://issues.apache.org/jira/browse/AIRFLOW-3182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grzegorz Hejnowicz updated AIRFLOW-3182:
----------------------------------------
    Description: 
We have a job that runs some data processing every hour. At the end of the day 
we need to run aggregation on all data generated by the 'hourly' jobs, 
regardless if any 'hourly' job failed or not. For this purpose we have prepared 
DAG that uses BranchPythonOperator in order to decide which 'hourly' job needs 
to be run in given time and when task for hour 23 is done, we trigger the 
aggregation (downstream). For this to work regardless of the last 'hourly' task 
status the *'all_done'* trigger rule is set in the aggregation task. 
Unfortunately, such configuration works incorrectly causing aggregation task to 
be run after every 'hourly' task, despite the fact the aggregation task is set 
as downstream for 'task_for_hour-23' +only+:

  !BrannchPythonOperator.png!

Here's sample code:
{code:java}
# coding: utf-8
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import TriggerRule
from datetime import datetime
import logging

dag_id = 'test'

today = datetime.today().strftime("%Y-%m-%d");
task_prefix = 'task_for_hour-'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 18),
    'catchup': False,
}

dag = DAG(
    dag_id=dag_id,
    default_args=default_args,
    schedule_interval="@hourly",
    catchup=False
)

# Setting the current hour
def get_current_hour():
    return datetime.now().hour


# Returns the name id of the task to launch next (task_for_hour-0, 
task_for_hour-1, etc.)
def branch():
    return task_prefix + str(get_current_hour())


# Running hourly job
def run_hourly_job(**kwargs):
    current_hour = get_current_hour()
    logging.info("Running job for hour: %s" % current_hour)


# Main daily aggregation
def run_daily_aggregation(**kwargs):
    logging.info("Running daily aggregation for %s" % today)

    
start_task = DummyOperator(
    task_id='start',
    dag=dag
)

# 'branch' method returns name of the task to be run next.
hour_branching = BranchPythonOperator(
    task_id='hour_branching',
    python_callable=branch,
    dag=dag)

run_aggregation = PythonOperator(
    task_id='daily_aggregation',
    python_callable=run_daily_aggregation,
    provide_context=True,
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag
)

start_task.set_downstream(hour_branching)

# Create tasks for each hour
for hour in range(24):
    if hour == 23:
        task_for_hour_23 = PythonOperator(
            task_id=task_prefix + '23',
            python_callable=run_hourly_job,
            provide_context=True,
            dag=dag
        )

        hour_branching.set_downstream(task_for_hour_23)
        task_for_hour_23.set_downstream(run_aggregation)
    else:
        hour_branching.set_downstream(PythonOperator(
            task_id=task_prefix + str(hour),
            python_callable=run_hourly_job,
            provide_context=True,
            dag=dag)
        )
{code}
This me be also related to AIRFLOW-1419

  was:
We have a job that runs some data processing every hour. At the end of the day 
we need to run aggregation on all data generated by the 'hourly' jobs, 
regardless if any 'hourly' job failed or not. For this purpose we have prepared 
DAG that uses BranchPythonOperator in order to decide which 'hourly' job needs 
to be run in given time and when task for hour 23 is done, we trigger the 
aggregation (downstream). For this to work regardless of the last 'hourly' task 
status the *'all_done'* trigger rule is set in the aggregation task. 
Unfortunately, such configuration works incorrectly causing aggregation task to 
be run after every 'hourly' task, despite the fact the aggregation task is set 
as downstream for 'task_for_hour-23' +only+:

 

Here's sample code:
{code:java}
# coding: utf-8
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import TriggerRule
from datetime import datetime
import logging

dag_id = 'test'

today = datetime.today().strftime("%Y-%m-%d");
task_prefix = 'task_for_hour-'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 18),
    'catchup': False,
}

dag = DAG(
    dag_id=dag_id,
    default_args=default_args,
    schedule_interval="@hourly",
    catchup=False
)

# Setting the current hour
def get_current_hour():
    return datetime.now().hour


# Returns the name id of the task to launch next (task_for_hour-0, 
task_for_hour-1, etc.)
def branch():
    return task_prefix + str(get_current_hour())


# Running hourly job
def run_hourly_job(**kwargs):
    current_hour = get_current_hour()
    logging.info("Running job for hour: %s" % current_hour)


# Main daily aggregation
def run_daily_aggregation(**kwargs):
    logging.info("Running daily aggregation for %s" % today)

    
start_task = DummyOperator(
    task_id='start',
    dag=dag
)

# 'branch' method returns name of the task to be run next.
hour_branching = BranchPythonOperator(
    task_id='hour_branching',
    python_callable=branch,
    dag=dag)

run_aggregation = PythonOperator(
    task_id='daily_aggregation',
    python_callable=run_daily_aggregation,
    provide_context=True,
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag
)

start_task.set_downstream(hour_branching)

# Create tasks for each hour
for hour in range(24):
    if hour == 23:
        task_for_hour_23 = PythonOperator(
            task_id=task_prefix + '23',
            python_callable=run_hourly_job,
            provide_context=True,
            dag=dag
        )

        hour_branching.set_downstream(task_for_hour_23)
        task_for_hour_23.set_downstream(run_aggregation)
    else:
        hour_branching.set_downstream(PythonOperator(
            task_id=task_prefix + str(hour),
            python_callable=run_hourly_job,
            provide_context=True,
            dag=dag)
        )
{code}
This me be also related to AIRFLOW-1419


> 'all_done' trigger rule works incorrectly with BranchPythonOperator upstream 
> tasks
> ----------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3182
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3182
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.9.0, 1.10.0
>            Reporter: Grzegorz Hejnowicz
>            Priority: Major
>         Attachments: BrannchPythonOperator.png
>
>
> We have a job that runs some data processing every hour. At the end of the 
> day we need to run aggregation on all data generated by the 'hourly' jobs, 
> regardless if any 'hourly' job failed or not. For this purpose we have 
> prepared DAG that uses BranchPythonOperator in order to decide which 'hourly' 
> job needs to be run in given time and when task for hour 23 is done, we 
> trigger the aggregation (downstream). For this to work regardless of the last 
> 'hourly' task status the *'all_done'* trigger rule is set in the aggregation 
> task. Unfortunately, such configuration works incorrectly causing aggregation 
> task to be run after every 'hourly' task, despite the fact the aggregation 
> task is set as downstream for 'task_for_hour-23' +only+:
>   !BrannchPythonOperator.png!
> Here's sample code:
> {code:java}
> # coding: utf-8
> from airflow import DAG
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.models import TriggerRule
> from datetime import datetime
> import logging
> dag_id = 'test'
> today = datetime.today().strftime("%Y-%m-%d");
> task_prefix = 'task_for_hour-'
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2018, 6, 18),
>     'catchup': False,
> }
> dag = DAG(
>     dag_id=dag_id,
>     default_args=default_args,
>     schedule_interval="@hourly",
>     catchup=False
> )
> # Setting the current hour
> def get_current_hour():
>     return datetime.now().hour
> # Returns the name id of the task to launch next (task_for_hour-0, 
> task_for_hour-1, etc.)
> def branch():
>     return task_prefix + str(get_current_hour())
> # Running hourly job
> def run_hourly_job(**kwargs):
>     current_hour = get_current_hour()
>     logging.info("Running job for hour: %s" % current_hour)
> # Main daily aggregation
> def run_daily_aggregation(**kwargs):
>     logging.info("Running daily aggregation for %s" % today)
>     
> start_task = DummyOperator(
>     task_id='start',
>     dag=dag
> )
> # 'branch' method returns name of the task to be run next.
> hour_branching = BranchPythonOperator(
>     task_id='hour_branching',
>     python_callable=branch,
>     dag=dag)
> run_aggregation = PythonOperator(
>     task_id='daily_aggregation',
>     python_callable=run_daily_aggregation,
>     provide_context=True,
>     trigger_rule=TriggerRule.ALL_DONE,
>     dag=dag
> )
> start_task.set_downstream(hour_branching)
> # Create tasks for each hour
> for hour in range(24):
>     if hour == 23:
>         task_for_hour_23 = PythonOperator(
>             task_id=task_prefix + '23',
>             python_callable=run_hourly_job,
>             provide_context=True,
>             dag=dag
>         )
>         hour_branching.set_downstream(task_for_hour_23)
>         task_for_hour_23.set_downstream(run_aggregation)
>     else:
>         hour_branching.set_downstream(PythonOperator(
>             task_id=task_prefix + str(hour),
>             python_callable=run_hourly_job,
>             provide_context=True,
>             dag=dag)
>         )
> {code}
> This me be also related to AIRFLOW-1419



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to