[
https://issues.apache.org/jira/browse/AIRFLOW-3182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679567#comment-16679567
]
jack commented on AIRFLOW-3182:
-------------------------------
Could the trigger rule you set be wrong?
I think you should do:
run_aggregation = PythonOperator(
task_id='daily_aggregation',
python_callable=run_daily_aggregation,
provide_context=True,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
This means that daily_aggregation will start only when : start, hour_branch &
task_for_hour-23 are success.
In your example daily_aggregation will start when start, hour_branch &
task_for_hour-23 are done. and SKIP consider to be done. so it make scene it
run every hour.
> 'all_done' trigger rule works incorrectly with BranchPythonOperator upstream
> tasks
> ----------------------------------------------------------------------------------
>
> Key: AIRFLOW-3182
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3182
> Project: Apache Airflow
> Issue Type: Bug
> Affects Versions: 1.9.0, 1.10.0
> Reporter: Greg H
> Priority: Major
> Attachments: BrannchPythonOperator.png
>
>
> We have a job that runs some data processing every hour. At the end of the
> day we need to run aggregation on all data generated by the 'hourly' jobs,
> regardless if any 'hourly' job failed or not. For this purpose we have
> prepared DAG that uses BranchPythonOperator in order to decide which 'hourly'
> job needs to be run in given time and when task for hour 23 is done, we
> trigger the aggregation (downstream). For this to work regardless of the last
> 'hourly' task status the *'all_done'* trigger rule is set in the aggregation
> task. Unfortunately, such configuration works incorrectly causing aggregation
> task to be run after every 'hourly' task, despite the fact the aggregation
> task is set as downstream for 'task_for_hour-23' +only+:
> !BrannchPythonOperator.png!
> Here's sample code:
> {code:java}
> # coding: utf-8
> from airflow import DAG
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.models import TriggerRule
> from datetime import datetime
> import logging
> dag_id = 'test'
> today = datetime.today().strftime("%Y-%m-%d");
> task_prefix = 'task_for_hour-'
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2018, 6, 18),
> 'catchup': False,
> }
> dag = DAG(
> dag_id=dag_id,
> default_args=default_args,
> schedule_interval="@hourly",
> catchup=False
> )
> # Setting the current hour
> def get_current_hour():
> return datetime.now().hour
> # Returns the name id of the task to launch next (task_for_hour-0,
> task_for_hour-1, etc.)
> def branch():
> return task_prefix + str(get_current_hour())
> # Running hourly job
> def run_hourly_job(**kwargs):
> current_hour = get_current_hour()
> logging.info("Running job for hour: %s" % current_hour)
> # Main daily aggregation
> def run_daily_aggregation(**kwargs):
> logging.info("Running daily aggregation for %s" % today)
>
> start_task = DummyOperator(
> task_id='start',
> dag=dag
> )
> # 'branch' method returns name of the task to be run next.
> hour_branching = BranchPythonOperator(
> task_id='hour_branching',
> python_callable=branch,
> dag=dag)
> run_aggregation = PythonOperator(
> task_id='daily_aggregation',
> python_callable=run_daily_aggregation,
> provide_context=True,
> trigger_rule=TriggerRule.ALL_DONE,
> dag=dag
> )
> start_task.set_downstream(hour_branching)
> # Create tasks for each hour
> for hour in range(24):
> if hour == 23:
> task_for_hour_23 = PythonOperator(
> task_id=task_prefix + '23',
> python_callable=run_hourly_job,
> provide_context=True,
> dag=dag
> )
> hour_branching.set_downstream(task_for_hour_23)
> task_for_hour_23.set_downstream(run_aggregation)
> else:
> hour_branching.set_downstream(PythonOperator(
> task_id=task_prefix + str(hour),
> python_callable=run_hourly_job,
> provide_context=True,
> dag=dag)
> )
> {code}
> This me be also related to AIRFLOW-1419
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)