Grzegorz Hejnowicz created AIRFLOW-3182:
-------------------------------------------
Summary: 'all_done' trigger rule works incorrectly with
BranchPythonOperator upstream tasks
Key: AIRFLOW-3182
URL: https://issues.apache.org/jira/browse/AIRFLOW-3182
Project: Apache Airflow
Issue Type: Bug
Affects Versions: 1.10.0, 1.9.0
Reporter: Grzegorz Hejnowicz
Attachments: BrannchPythonOperator.png
We have a job that runs some data processing every hour. At the end of the day
we need to run aggregation on all data generated by the 'hourly' jobs,
regardless if any 'hourly' job failed or not. For this purpose we have prepared
DAG that uses BranchPythonOperator in order to decide which 'hourly' job needs
to be run in given time and when task for hour 23 is done, we trigger the
aggregation (downstream). For this to work regardless of the last 'hourly' task
status the *'all_done'* trigger rule is set in the aggregation task.
Unfortunately, such configuration works incorrectly causing aggregation task to
be run after every 'hourly' task, despite the fact the aggregation task is set
as downstream for 'task_for_hour-23' +only+:
Here's sample code:
{code:java}
# coding: utf-8
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import TriggerRule
from datetime import datetime
import logging
dag_id = 'test'
today = datetime.today().strftime("%Y-%m-%d");
task_prefix = 'task_for_hour-'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 6, 18),
'catchup': False,
}
dag = DAG(
dag_id=dag_id,
default_args=default_args,
schedule_interval="@hourly",
catchup=False
)
# Setting the current hour
def get_current_hour():
return datetime.now().hour
# Returns the name id of the task to launch next (task_for_hour-0,
task_for_hour-1, etc.)
def branch():
return task_prefix + str(get_current_hour())
# Running hourly job
def run_hourly_job(**kwargs):
current_hour = get_current_hour()
logging.info("Running job for hour: %s" % current_hour)
# Main daily aggregation
def run_daily_aggregation(**kwargs):
logging.info("Running daily aggregation for %s" % today)
start_task = DummyOperator(
task_id='start',
dag=dag
)
# 'branch' method returns name of the task to be run next.
hour_branching = BranchPythonOperator(
task_id='hour_branching',
python_callable=branch,
dag=dag)
run_aggregation = PythonOperator(
task_id='daily_aggregation',
python_callable=run_daily_aggregation,
provide_context=True,
trigger_rule=TriggerRule.ALL_DONE,
dag=dag
)
start_task.set_downstream(hour_branching)
# Create tasks for each hour
for hour in range(24):
if hour == 23:
task_for_hour_23 = PythonOperator(
task_id=task_prefix + '23',
python_callable=run_hourly_job,
provide_context=True,
dag=dag
)
hour_branching.set_downstream(task_for_hour_23)
task_for_hour_23.set_downstream(run_aggregation)
else:
hour_branching.set_downstream(PythonOperator(
task_id=task_prefix + str(hour),
python_callable=run_hourly_job,
provide_context=True,
dag=dag)
)
{code}
This me be also related to AIRFLOW-1419
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)