so for my second scenario, I think i would still need to run missing days
jobs one by one (by clearing the failed ones) and I understand this is
recommended approach as I figured from Maxime's video.

But sometimes it is more efficient to combine all missing day runs in one
so I would be using a window for incremental process as the last successful
job (Day 1) to current run (Day 4) - so it will be only one DagRun not 3 to
catch up. Does it make sense? is it possible?

On Thu, Oct 13, 2016 at 1:16 PM, siddharth anand <san...@apache.org> wrote:

> *Question 2*
> You can use depend_on_past=True. Then, future dag runs won't be scheduled
> until past one succeed, which I specify as shown below:
>
> default_args = {
>     'owner': 'sanand',
>     'depends_on_past': True,
>     'pool': 'ep_data_pipeline',
>     'start_date': START_DATE,
>     'email': [import_ep_pipeline_alert_email_dl],
>     'email_on_failure': import_airflow_enable_notifications,
>     'email_on_retry': import_airflow_enable_notifications,
>     'retries': 3,
>     'retry_delay': timedelta(seconds=30),
>     'priority_weight': import_airflow_priority_weight}dag =
> DAG(DAG_NAME, schedule_interval='@hourly', default_args=default_args,
> sla_miss_callback=sla_alert_func)
>
>
>
> I also use retries to sidestep intermittent issues. If you need to retry a
> failed dag run, you can clear that dag run in the UI (or CLI) and the
> scheduler will rerun it.
>
> On Thu, Oct 13, 2016 at 10:11 AM, siddharth anand <san...@apache.org>
> wrote:
>
> > Boris,
> >
> > *Question 1*
> > Only_Run_Latest is in master - https://github.com/apache/
> > incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a. That
> > will solve your problem.
> >
> > Releases come out one a quarter sometimes once every 2 quarters, so I
> > would recommend that you run off master or off your own fork.
> >
> > You could also achieve this yourself with the following code snippet. It
> > uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
> > being executed is not the current one. It will work for any schedule. The
> > code below has essentially been implemented in the LatestOnlyOperator
> above
> > for convenience.
> >
> > def skip_to_current_job(ds, **kwargs):
> >
> >     now = datetime.now()
> >
> >     left_window = kwargs['dag'].following_schedule(kwargs['execution_
> date'
> > ])
> >
> >     right_window = kwargs['dag'].following_schedule(left_window)
> >
> >     logging.info(('Left Window {}, Now {}, Right Window {}'
> > ).format(left_window,now,right_window))
> >
> >     if not now <= right_window:
> >
> >         logging.info('Not latest execution, skipping downstream.')
> >
> >         return False
> >
> >     return True
> >
> >
> > t0 = ShortCircuitOperator(
> >
> >   task_id         = 'short_circuit_if_not_current,
> >
> >   provide_context = True,
> >
> >   python_callable = skip_to_current_job,
> >
> >   dag             = dag
> >
> > )
> >
> >
> > -s
> >
> >
> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin <bo...@boristyukin.com>
> > wrote:
> >
> >> Hello all and thanks for such an amazing project! I have been evaluating
> >> Airflow and spent a few days reading about it and playing with it and I
> >> have a few questions that I struggle to understand.
> >>
> >> Let's say I have a simple DAG that runs once a day and it is doing a
> full
> >> reload of tables from the source database so the process is not
> >> incremental.
> >>
> >> Let's consider this scenario:
> >>
> >> Day 1 - OK
> >>
> >> Day 2 - airflow scheduler or server with airflow is down for some reason
> >> ((or
> >> DAG is paused)
> >>
> >> Day 3 - still down(or DAG is paused)
> >>
> >> Day 4 - server is up and now needs to run missing jobs.
> >>
> >>
> >> How can I make airflow to run only Day 4 job and not backfill Day 2 and
> 3?
> >>
> >>
> >> I tried to do depend_on_past = True but it does not seem to do this
> trick.
> >>
> >>
> >> I also found in a roadmap doc this but seems it is not made to the
> release
> >> yet:
> >>
> >>
> >>  Only Run Latest - Champion : Sid
> >>
> >> • For cases where we need to only run the latest in a series of task
> >> instance runs and mark the others as skipped. For example, we may have
> job
> >> to execute a DB snapshot every day. If the DAG is paused for 5 days and
> >> then unpaused, we don’t want to run all 5, just the latest. With this
> >> feature, we will provide “cron” functionality for task scheduling that
> is
> >> not related to ETL
> >>
> >>
> >> My second question, what if I have another DAG that does incremental
> loads
> >> from a source table:
> >>
> >>
> >> Day 1 - OK, loaded new/changed data for previous day
> >>
> >> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
> >>
> >> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
> >>
> >> Day 4 - source system is up, Airflow Dagrun succeeded
> >>
> >>
> >> My problem (unless I am missing something), Airflow on Day 4 would use
> >> execution time from Day 3, so the interval for incremental load would be
> >> since the last run (which was Failed). My hope it would use the last
> >> _successful_ run so on Day 4 it would go back to Day 1. Is it possible
> to
> >> achieve this?
> >>
> >> I am aware of a manual backfill command via CLI but I am not sure I want
> >> to
> >> use due to all the issues and inconsistencies I've read about it.
> >>
> >> Thanks!
> >>
> >
> >
>

Reply via email to