I can't see an image.

We run most of our dags with depends_on_past=True.

If you want to chain your dag runs, such as not starting the first task of
your dag run start until the last task of your previous dag runs completes,
you can use an external task sensor. The external task sensor would be the
first task in the dag and would depend on the last task in the same dag
from the previous dag run. This is strict dag chaining.

If you just don't want the same task in the subsequent dag run to get
scheduled unless the first task completes, depends_on_past=True helps
there. This is more a cascading effect in the tree view.
-s

On Thu, Oct 13, 2016 at 12:41 PM, Boris Tyukin <bo...@boristyukin.com>
wrote:

> This is not what I see actually. I posted below my test DAG and a
> screenshot.
>
> It does create DAGRuns on subsequent runs - I modeled that scenario by
> commenting one bash command and uncommenting another one with Exit 1.
>
> it does not create Task Instances on subsequent failed DAGs but it does
> create DAGRuns and the first successful run after failed ones would not
> have execution timestamp from last successful run
>
>
> [image: Inline image 1]
>
>
> here is my test DAG
>
>
>
> from datetime import datetime, timedelta
>
> # Determine schedule:
> dag_schedule_interval = timedelta(seconds=60)
> dag_start_date = datetime.now() - dag_schedule_interval
>
>
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': True,
>     'start_date': dag_start_date,
>     # 'start_date': datetime(2016, 10, 11, 17, 0),
>     'email': ['airf...@airflow.com'],
>     'email_on_failure': False,
>     'email_on_retry': False,
>     'retries': 0,
>     'retry_delay': timedelta(seconds=20),
>     ,'only_run_latest'=True,
>     # 'queue': 'bash_queue',
>     # 'pool': 'backfill',
>     # 'priority_weight': 10,
>     # 'end_date': datetime(2016, 1, 1),
> }
>
> # Change version number if schedule needs to be changed:
> dag = DAG(
>     'pipeline1_v8', default_args=default_args, schedule_interval=dag_
> schedule_interval)
>
> dag.doc_md = __doc__
>
> # t1, t2 and t3 are examples of tasks created by instantiating operators
> t1 = BashOperator(
>     task_id='t1',
>     bash_command='echo execution ts {{ ts }} & echo 1',
>     # bash_command='exit 1',
>     dag=dag)
>
> On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand <san...@apache.org>
> wrote:
>
>> If you use depends_on_past=True, it won't proceed to the next DAG Run if
>> the previous DAG Run failed. If Day 2 fails, Day 3 won't run.
>>
>> -s
>>
>> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand <san...@apache.org>
>> wrote:
>>
>> > Yes! It does work with Depends_on_past=True.
>> > -s
>> >
>> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin <bo...@boristyukin.com>
>> > wrote:
>> >
>> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" -
>> will
>> >> it work with depend_on_past = True? or it will assume that DAG is used
>> >> False?
>> >>
>> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand <san...@apache.org>
>> >> wrote:
>> >>
>> >> > Boris,
>> >> >
>> >> > *Question 1*
>> >> > Only_Run_Latest is in master -
>> >> > https://github.com/apache/incubator-airflow/commit/
>> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
>> >> > That will solve your problem.
>> >> >
>> >> > Releases come out one a quarter sometimes once every 2 quarters, so I
>> >> would
>> >> > recommend that you run off master or off your own fork.
>> >> >
>> >> > You could also achieve this yourself with the following code
>> snippet. It
>> >> > uses a ShortCircuitOperator that will skip downstream tasks if the
>> >> DagRun
>> >> > being executed is not the current one. It will work for any schedule.
>> >> The
>> >> > code below has essentially been implemented in the LatestOnlyOperator
>> >> above
>> >> > for convenience.
>> >> >
>> >> > def skip_to_current_job(ds, **kwargs):
>> >> >
>> >> >     now = datetime.now()
>> >> >
>> >> >     left_window = kwargs['dag'].following_schedu
>> le(kwargs['execution_
>> >> > date'])
>> >> >
>> >> >     right_window = kwargs['dag'].following_schedule(left_window)
>> >> >
>> >> >     logging.info(('Left Window {}, Now {}, Right Window {}'
>> >> > ).format(left_window,now,right_window))
>> >> >
>> >> >     if not now <= right_window:
>> >> >
>> >> >         logging.info('Not latest execution, skipping downstream.')
>> >> >
>> >> >         return False
>> >> >
>> >> >     return True
>> >> >
>> >> >
>> >> > t0 = ShortCircuitOperator(
>> >> >
>> >> >   task_id         = 'short_circuit_if_not_current,
>> >> >
>> >> >   provide_context = True,
>> >> >
>> >> >   python_callable = skip_to_current_job,
>> >> >
>> >> >   dag             = dag
>> >> >
>> >> > )
>> >> >
>> >> >
>> >> > -s
>> >> >
>> >> >
>> >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin <bo...@boristyukin.com
>> >
>> >> > wrote:
>> >> >
>> >> > > Hello all and thanks for such an amazing project! I have been
>> >> evaluating
>> >> > > Airflow and spent a few days reading about it and playing with it
>> and
>> >> I
>> >> > > have a few questions that I struggle to understand.
>> >> > >
>> >> > > Let's say I have a simple DAG that runs once a day and it is doing
>> a
>> >> full
>> >> > > reload of tables from the source database so the process is not
>> >> > > incremental.
>> >> > >
>> >> > > Let's consider this scenario:
>> >> > >
>> >> > > Day 1 - OK
>> >> > >
>> >> > > Day 2 - airflow scheduler or server with airflow is down for some
>> >> reason
>> >> > > ((or
>> >> > > DAG is paused)
>> >> > >
>> >> > > Day 3 - still down(or DAG is paused)
>> >> > >
>> >> > > Day 4 - server is up and now needs to run missing jobs.
>> >> > >
>> >> > >
>> >> > > How can I make airflow to run only Day 4 job and not backfill Day 2
>> >> and
>> >> > 3?
>> >> > >
>> >> > >
>> >> > > I tried to do depend_on_past = True but it does not seem to do this
>> >> > trick.
>> >> > >
>> >> > >
>> >> > > I also found in a roadmap doc this but seems it is not made to the
>> >> > release
>> >> > > yet:
>> >> > >
>> >> > >
>> >> > >  Only Run Latest - Champion : Sid
>> >> > >
>> >> > > • For cases where we need to only run the latest in a series of
>> task
>> >> > > instance runs and mark the others as skipped. For example, we may
>> have
>> >> > job
>> >> > > to execute a DB snapshot every day. If the DAG is paused for 5 days
>> >> and
>> >> > > then unpaused, we don’t want to run all 5, just the latest. With
>> this
>> >> > > feature, we will provide “cron” functionality for task scheduling
>> >> that is
>> >> > > not related to ETL
>> >> > >
>> >> > >
>> >> > > My second question, what if I have another DAG that does
>> incremental
>> >> > loads
>> >> > > from a source table:
>> >> > >
>> >> > >
>> >> > > Day 1 - OK, loaded new/changed data for previous day
>> >> > >
>> >> > > Day 2 - source system is down (or DAG is paused), Airflow DagRun
>> >> failed
>> >> > >
>> >> > > Day 3 - source system is down (or DAG is paused), Airflow DagRun
>> >> failed
>> >> > >
>> >> > > Day 4 - source system is up, Airflow Dagrun succeeded
>> >> > >
>> >> > >
>> >> > > My problem (unless I am missing something), Airflow on Day 4 would
>> use
>> >> > > execution time from Day 3, so the interval for incremental load
>> would
>> >> be
>> >> > > since the last run (which was Failed). My hope it would use the
>> last
>> >> > > _successful_ run so on Day 4 it would go back to Day 1. Is it
>> >> possible to
>> >> > > achieve this?
>> >> > >
>> >> > > I am aware of a manual backfill command via CLI but I am not sure I
>> >> want
>> >> > to
>> >> > > use due to all the issues and inconsistencies I've read about it.
>> >> > >
>> >> > > Thanks!
>> >> > >
>> >> >
>> >>
>> >
>> >
>>
>
>

Reply via email to