this is nice indeed along with the new catchup option https://airflow.incubator.apache.org/scheduler.html#backfill-and-catchup
Thanks Sid and Ben for adding these new options! for a complete picture, it would be nice to force only one dag run at the time. On Fri, Mar 17, 2017 at 7:33 PM, siddharth anand <[email protected]> wrote: > With the Apache Airflow 1.8 release imminent, you may want to try out the > > *LatestOnlyOperator.* > > If you want your DAG to only run on the most recent scheduled slot, > regardless of backlog, this operator will skip running downstream tasks for > all DAG Runs prior to the current time slot. > > For example, I might have a DAG that takes a DB snapshot once a day. It > might be that I paused that DAG for 2 weeks or that I had set the start > date to a fixed data 2 weeks in the past. When I enable my DAG, I don't > want it to run 14 days' worth of snapshots for the current state of the DB > -- that's unnecessary work. > > The LatestOnlyOperator avoids that work. > > https://github.com/apache/incubator-airflow/commit/ > edf033be65b575f44aa221d5d0ec9ecb6b32c67a > > With it, you can simply use > latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) > > instead of > def skip_to_current_job(ds, **kwargs): > now = datetime.now() > left_window = kwargs['dag'].following_schedule(kwargs['execution_ > date']) > right_window = kwargs['dag'].following_schedule(left_window) > logging.info(('Left Window {}, Now {}, Right Window > {}').format(left_window,now,right_window)) > if not now <= right_window: > logging.info('Not latest execution, skipping downstream.') > return False > return True > > short_circuit = ShortCircuitOperator( > task_id = 'short_circuit_if_not_current_job', > provide_context = True, > python_callable = skip_to_current_job, > dag = dag > ) > > -s >
