*Question 2*
You can use depend_on_past=True. Then, future dag runs won't be scheduled
until past one succeed, which I specify as shown below:

default_args = {
    'owner': 'sanand',
    'depends_on_past': True,
    'pool': 'ep_data_pipeline',
    'start_date': START_DATE,
    'email': [import_ep_pipeline_alert_email_dl],
    'email_on_failure': import_airflow_enable_notifications,
    'email_on_retry': import_airflow_enable_notifications,
    'retries': 3,
    'retry_delay': timedelta(seconds=30),
    'priority_weight': import_airflow_priority_weight}dag =
DAG(DAG_NAME, schedule_interval='@hourly', default_args=default_args,
sla_miss_callback=sla_alert_func)



I also use retries to sidestep intermittent issues. If you need to retry a
failed dag run, you can clear that dag run in the UI (or CLI) and the
scheduler will rerun it.

On Thu, Oct 13, 2016 at 10:11 AM, siddharth anand <san...@apache.org> wrote:

> Boris,
>
> *Question 1*
> Only_Run_Latest is in master - https://github.com/apache/
> incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a. That
> will solve your problem.
>
> Releases come out one a quarter sometimes once every 2 quarters, so I
> would recommend that you run off master or off your own fork.
>
> You could also achieve this yourself with the following code snippet. It
> uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
> being executed is not the current one. It will work for any schedule. The
> code below has essentially been implemented in the LatestOnlyOperator above
> for convenience.
>
> def skip_to_current_job(ds, **kwargs):
>
>     now = datetime.now()
>
>     left_window = kwargs['dag'].following_schedule(kwargs['execution_date'
> ])
>
>     right_window = kwargs['dag'].following_schedule(left_window)
>
>     logging.info(('Left Window {}, Now {}, Right Window {}'
> ).format(left_window,now,right_window))
>
>     if not now <= right_window:
>
>         logging.info('Not latest execution, skipping downstream.')
>
>         return False
>
>     return True
>
>
> t0 = ShortCircuitOperator(
>
>   task_id         = 'short_circuit_if_not_current,
>
>   provide_context = True,
>
>   python_callable = skip_to_current_job,
>
>   dag             = dag
>
> )
>
>
> -s
>
>
> On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin <bo...@boristyukin.com>
> wrote:
>
>> Hello all and thanks for such an amazing project! I have been evaluating
>> Airflow and spent a few days reading about it and playing with it and I
>> have a few questions that I struggle to understand.
>>
>> Let's say I have a simple DAG that runs once a day and it is doing a full
>> reload of tables from the source database so the process is not
>> incremental.
>>
>> Let's consider this scenario:
>>
>> Day 1 - OK
>>
>> Day 2 - airflow scheduler or server with airflow is down for some reason
>> ((or
>> DAG is paused)
>>
>> Day 3 - still down(or DAG is paused)
>>
>> Day 4 - server is up and now needs to run missing jobs.
>>
>>
>> How can I make airflow to run only Day 4 job and not backfill Day 2 and 3?
>>
>>
>> I tried to do depend_on_past = True but it does not seem to do this trick.
>>
>>
>> I also found in a roadmap doc this but seems it is not made to the release
>> yet:
>>
>>
>>  Only Run Latest - Champion : Sid
>>
>> • For cases where we need to only run the latest in a series of task
>> instance runs and mark the others as skipped. For example, we may have job
>> to execute a DB snapshot every day. If the DAG is paused for 5 days and
>> then unpaused, we don’t want to run all 5, just the latest. With this
>> feature, we will provide “cron” functionality for task scheduling that is
>> not related to ETL
>>
>>
>> My second question, what if I have another DAG that does incremental loads
>> from a source table:
>>
>>
>> Day 1 - OK, loaded new/changed data for previous day
>>
>> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
>>
>> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
>>
>> Day 4 - source system is up, Airflow Dagrun succeeded
>>
>>
>> My problem (unless I am missing something), Airflow on Day 4 would use
>> execution time from Day 3, so the interval for incremental load would be
>> since the last run (which was Failed). My hope it would use the last
>> _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
>> achieve this?
>>
>> I am aware of a manual backfill command via CLI but I am not sure I want
>> to
>> use due to all the issues and inconsistencies I've read about it.
>>
>> Thanks!
>>
>
>

Reply via email to