so for my second scenario, I think i would still need to run missing days jobs one by one (by clearing the failed ones) and I understand this is recommended approach as I figured from Maxime's video.
But sometimes it is more efficient to combine all missing day runs in one so I would be using a window for incremental process as the last successful job (Day 1) to current run (Day 4) - so it will be only one DagRun not 3 to catch up. Does it make sense? is it possible? On Thu, Oct 13, 2016 at 1:16 PM, siddharth anand <[email protected]> wrote: > *Question 2* > You can use depend_on_past=True. Then, future dag runs won't be scheduled > until past one succeed, which I specify as shown below: > > default_args = { > 'owner': 'sanand', > 'depends_on_past': True, > 'pool': 'ep_data_pipeline', > 'start_date': START_DATE, > 'email': [import_ep_pipeline_alert_email_dl], > 'email_on_failure': import_airflow_enable_notifications, > 'email_on_retry': import_airflow_enable_notifications, > 'retries': 3, > 'retry_delay': timedelta(seconds=30), > 'priority_weight': import_airflow_priority_weight}dag = > DAG(DAG_NAME, schedule_interval='@hourly', default_args=default_args, > sla_miss_callback=sla_alert_func) > > > > I also use retries to sidestep intermittent issues. If you need to retry a > failed dag run, you can clear that dag run in the UI (or CLI) and the > scheduler will rerun it. > > On Thu, Oct 13, 2016 at 10:11 AM, siddharth anand <[email protected]> > wrote: > > > Boris, > > > > *Question 1* > > Only_Run_Latest is in master - https://github.com/apache/ > > incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a. That > > will solve your problem. > > > > Releases come out one a quarter sometimes once every 2 quarters, so I > > would recommend that you run off master or off your own fork. > > > > You could also achieve this yourself with the following code snippet. It > > uses a ShortCircuitOperator that will skip downstream tasks if the DagRun > > being executed is not the current one. It will work for any schedule. The > > code below has essentially been implemented in the LatestOnlyOperator > above > > for convenience. > > > > def skip_to_current_job(ds, **kwargs): > > > > now = datetime.now() > > > > left_window = kwargs['dag'].following_schedule(kwargs['execution_ > date' > > ]) > > > > right_window = kwargs['dag'].following_schedule(left_window) > > > > logging.info(('Left Window {}, Now {}, Right Window {}' > > ).format(left_window,now,right_window)) > > > > if not now <= right_window: > > > > logging.info('Not latest execution, skipping downstream.') > > > > return False > > > > return True > > > > > > t0 = ShortCircuitOperator( > > > > task_id = 'short_circuit_if_not_current, > > > > provide_context = True, > > > > python_callable = skip_to_current_job, > > > > dag = dag > > > > ) > > > > > > -s > > > > > > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin <[email protected]> > > wrote: > > > >> Hello all and thanks for such an amazing project! I have been evaluating > >> Airflow and spent a few days reading about it and playing with it and I > >> have a few questions that I struggle to understand. > >> > >> Let's say I have a simple DAG that runs once a day and it is doing a > full > >> reload of tables from the source database so the process is not > >> incremental. > >> > >> Let's consider this scenario: > >> > >> Day 1 - OK > >> > >> Day 2 - airflow scheduler or server with airflow is down for some reason > >> ((or > >> DAG is paused) > >> > >> Day 3 - still down(or DAG is paused) > >> > >> Day 4 - server is up and now needs to run missing jobs. > >> > >> > >> How can I make airflow to run only Day 4 job and not backfill Day 2 and > 3? > >> > >> > >> I tried to do depend_on_past = True but it does not seem to do this > trick. > >> > >> > >> I also found in a roadmap doc this but seems it is not made to the > release > >> yet: > >> > >> > >> Only Run Latest - Champion : Sid > >> > >> • For cases where we need to only run the latest in a series of task > >> instance runs and mark the others as skipped. For example, we may have > job > >> to execute a DB snapshot every day. If the DAG is paused for 5 days and > >> then unpaused, we don’t want to run all 5, just the latest. With this > >> feature, we will provide “cron” functionality for task scheduling that > is > >> not related to ETL > >> > >> > >> My second question, what if I have another DAG that does incremental > loads > >> from a source table: > >> > >> > >> Day 1 - OK, loaded new/changed data for previous day > >> > >> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed > >> > >> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed > >> > >> Day 4 - source system is up, Airflow Dagrun succeeded > >> > >> > >> My problem (unless I am missing something), Airflow on Day 4 would use > >> execution time from Day 3, so the interval for incremental load would be > >> since the last run (which was Failed). My hope it would use the last > >> _successful_ run so on Day 4 it would go back to Day 1. Is it possible > to > >> achieve this? > >> > >> I am aware of a manual backfill command via CLI but I am not sure I want > >> to > >> use due to all the issues and inconsistencies I've read about it. > >> > >> Thanks! > >> > > > > >
