Thank you for the detailed explanation Boris.
Best regards, Ruslan Dautkhanov On Mon, Mar 20, 2017 at 12:12 PM, Boris Tyukin <[email protected]> wrote: > depends_on_past is looking at previous task instance which sounds the same > as "latestonly" but the difference becomes apparent if you look at this > example. > > Let's say you have a dag, scheduled to run every day and it has been > failing for the past 3 days. The whole purpose of that dag is to populate > snapshot table or do a daily backup. If you use depends on past, you would > have to rerun all missed runs or mark them as successful eventually doing > useless work (3 daily snapshots or backups for the same data). > > LatestOnly allows you to bypass missed runs and just do it once for most > recent instance. > > Another difference, depends on past is tricky if you use BranchOperator > because some branches may not run one day and run another - it will really > mess up your logic. > > On Mon, Mar 20, 2017 at 12:45 PM, Ruslan Dautkhanov <[email protected]> > wrote: > > > Thanks Boris. It does make sense. > > Although how it's different from depends_on_past task-level parameter? > > In both cases, a task will be skipped if there is another TI of this task > > is still running (from a previous dagrun), right? > > > > > > Thanks, > > Ruslan > > > > > > On Sat, Mar 18, 2017 at 7:11 PM, Boris Tyukin <[email protected]> > > wrote: > > > > > you would just chain them - there is an example that came with airflow > > 1.8 > > > https://github.com/apache/incubator-airflow/blob/master/ > > > airflow/example_dags/example_latest_only.py > > > > > > so in your case, instead of dummy operator, you would use your Oracle > > > operator. > > > > > > Does it make sense? > > > > > > > > > On Sat, Mar 18, 2017 at 7:12 PM, Ruslan Dautkhanov < > [email protected] > > > > > > wrote: > > > > > > > Is there is a way to combine scheduling behavior operators (like > this > > > > LatestOnlyOperator) > > > > with a functional operator (like Oracle_Operator)? I was thinking > > > multiple > > > > inheritance would do,like > > > > > > > > > class Oracle_LatestOnly_Operator (Oracle_Operator, > > LatestOnlyOperator): > > > > > ... > > > > > > > > I might be overthinking this and there could be a simpler way? > > > > Sorry, I am still learning Airflow concepts... > > > > > > > > Thanks. > > > > > > > > > > > > > > > > -- > > > > Ruslan Dautkhanov > > > > > > > > On Sat, Mar 18, 2017 at 2:15 PM, Boris Tyukin <[email protected] > > > > > > wrote: > > > > > > > > > Thanks George for that feature! > > > > > > > > > > sure, just created a jira on this > > > > > https://issues.apache.org/jira/browse/AIRFLOW-1008 > > > > > > > > > > > > > > > On Sat, Mar 18, 2017 at 12:05 PM, siddharth anand < > [email protected] > > > > > > > > wrote: > > > > > > > > > > > Thx Boris . Credit goes to George (gwax) for the implementation > of > > > the > > > > > > LatestOnlyOperator. > > > > > > > > > > > > Boris, > > > > > > Can you describe what you mean in a Jira? > > > > > > -s > > > > > > > > > > > > On Fri, Mar 17, 2017 at 6:02 PM, Boris Tyukin < > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > > > this is nice indeed along with the new catchup option > > > > > > > https://airflow.incubator.apache.org/scheduler.html# > > > > > backfill-and-catchup > > > > > > > > > > > > > > Thanks Sid and Ben for adding these new options! > > > > > > > > > > > > > > for a complete picture, it would be nice to force only one dag > > run > > > at > > > > > the > > > > > > > time. > > > > > > > > > > > > > > On Fri, Mar 17, 2017 at 7:33 PM, siddharth anand < > > > [email protected]> > > > > > > > wrote: > > > > > > > > > > > > > > > With the Apache Airflow 1.8 release imminent, you may want to > > try > > > > out > > > > > > the > > > > > > > > > > > > > > > > *LatestOnlyOperator.* > > > > > > > > > > > > > > > > If you want your DAG to only run on the most recent scheduled > > > slot, > > > > > > > > regardless of backlog, this operator will skip running > > downstream > > > > > tasks > > > > > > > for > > > > > > > > all DAG Runs prior to the current time slot. > > > > > > > > > > > > > > > > For example, I might have a DAG that takes a DB snapshot > once a > > > > day. > > > > > It > > > > > > > > might be that I paused that DAG for 2 weeks or that I had set > > the > > > > > start > > > > > > > > date to a fixed data 2 weeks in the past. When I enable my > > DAG, I > > > > > don't > > > > > > > > want it to run 14 days' worth of snapshots for the current > > state > > > of > > > > > the > > > > > > > DB > > > > > > > > -- that's unnecessary work. > > > > > > > > > > > > > > > > The LatestOnlyOperator avoids that work. > > > > > > > > > > > > > > > > https://github.com/apache/incubator-airflow/commit/ > > > > > > > > edf033be65b575f44aa221d5d0ec9ecb6b32c67a > > > > > > > > > > > > > > > > With it, you can simply use > > > > > > > > latest_only = LatestOnlyOperator(task_id='latest_only', > > dag=dag) > > > > > > > > > > > > > > > > instead of > > > > > > > > def skip_to_current_job(ds, **kwargs): > > > > > > > > now = datetime.now() > > > > > > > > left_window = kwargs['dag'].following_ > > > > > schedule(kwargs['execution_ > > > > > > > > date']) > > > > > > > > right_window = kwargs['dag'].following_ > > schedule(left_window) > > > > > > > > logging.info(('Left Window {}, Now {}, Right Window > > > > > > > > {}').format(left_window,now,right_window)) > > > > > > > > if not now <= right_window: > > > > > > > > logging.info('Not latest execution, skipping > > > downstream.') > > > > > > > > return False > > > > > > > > return True > > > > > > > > > > > > > > > > short_circuit = ShortCircuitOperator( > > > > > > > > task_id = 'short_circuit_if_not_current_job', > > > > > > > > provide_context = True, > > > > > > > > python_callable = skip_to_current_job, > > > > > > > > dag = dag > > > > > > > > ) > > > > > > > > > > > > > > > > -s > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
