you rock, Sid! thanks for taking your time explaining it for me

On Thu, Oct 13, 2016 at 6:10 PM, siddharth anand <san...@apache.org> wrote:

> I can't see an image.
>
> We run most of our dags with depends_on_past=True.
>
> If you want to chain your dag runs, such as not starting the first task of
> your dag run start until the last task of your previous dag runs completes,
> you can use an external task sensor. The external task sensor would be the
> first task in the dag and would depend on the last task in the same dag
> from the previous dag run. This is strict dag chaining.
>
> If you just don't want the same task in the subsequent dag run to get
> scheduled unless the first task completes, depends_on_past=True helps
> there. This is more a cascading effect in the tree view.
> -s
>
> On Thu, Oct 13, 2016 at 12:41 PM, Boris Tyukin <bo...@boristyukin.com>
> wrote:
>
> > This is not what I see actually. I posted below my test DAG and a
> > screenshot.
> >
> > It does create DAGRuns on subsequent runs - I modeled that scenario by
> > commenting one bash command and uncommenting another one with Exit 1.
> >
> > it does not create Task Instances on subsequent failed DAGs but it does
> > create DAGRuns and the first successful run after failed ones would not
> > have execution timestamp from last successful run
> >
> >
> > [image: Inline image 1]
> >
> >
> > here is my test DAG
> >
> >
> >
> > from datetime import datetime, timedelta
> >
> > # Determine schedule:
> > dag_schedule_interval = timedelta(seconds=60)
> > dag_start_date = datetime.now() - dag_schedule_interval
> >
> >
> > default_args = {
> >     'owner': 'airflow',
> >     'depends_on_past': True,
> >     'start_date': dag_start_date,
> >     # 'start_date': datetime(2016, 10, 11, 17, 0),
> >     'email': ['airf...@airflow.com'],
> >     'email_on_failure': False,
> >     'email_on_retry': False,
> >     'retries': 0,
> >     'retry_delay': timedelta(seconds=20),
> >     ,'only_run_latest'=True,
> >     # 'queue': 'bash_queue',
> >     # 'pool': 'backfill',
> >     # 'priority_weight': 10,
> >     # 'end_date': datetime(2016, 1, 1),
> > }
> >
> > # Change version number if schedule needs to be changed:
> > dag = DAG(
> >     'pipeline1_v8', default_args=default_args, schedule_interval=dag_
> > schedule_interval)
> >
> > dag.doc_md = __doc__
> >
> > # t1, t2 and t3 are examples of tasks created by instantiating operators
> > t1 = BashOperator(
> >     task_id='t1',
> >     bash_command='echo execution ts {{ ts }} & echo 1',
> >     # bash_command='exit 1',
> >     dag=dag)
> >
> > On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand <san...@apache.org>
> > wrote:
> >
> >> If you use depends_on_past=True, it won't proceed to the next DAG Run if
> >> the previous DAG Run failed. If Day 2 fails, Day 3 won't run.
> >>
> >> -s
> >>
> >> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand <san...@apache.org>
> >> wrote:
> >>
> >> > Yes! It does work with Depends_on_past=True.
> >> > -s
> >> >
> >> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin <bo...@boristyukin.com
> >
> >> > wrote:
> >> >
> >> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" -
> >> will
> >> >> it work with depend_on_past = True? or it will assume that DAG is
> used
> >> >> False?
> >> >>
> >> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand <san...@apache.org>
> >> >> wrote:
> >> >>
> >> >> > Boris,
> >> >> >
> >> >> > *Question 1*
> >> >> > Only_Run_Latest is in master -
> >> >> > https://github.com/apache/incubator-airflow/commit/
> >> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> >> >> > That will solve your problem.
> >> >> >
> >> >> > Releases come out one a quarter sometimes once every 2 quarters,
> so I
> >> >> would
> >> >> > recommend that you run off master or off your own fork.
> >> >> >
> >> >> > You could also achieve this yourself with the following code
> >> snippet. It
> >> >> > uses a ShortCircuitOperator that will skip downstream tasks if the
> >> >> DagRun
> >> >> > being executed is not the current one. It will work for any
> schedule.
> >> >> The
> >> >> > code below has essentially been implemented in the
> LatestOnlyOperator
> >> >> above
> >> >> > for convenience.
> >> >> >
> >> >> > def skip_to_current_job(ds, **kwargs):
> >> >> >
> >> >> >     now = datetime.now()
> >> >> >
> >> >> >     left_window = kwargs['dag'].following_schedu
> >> le(kwargs['execution_
> >> >> > date'])
> >> >> >
> >> >> >     right_window = kwargs['dag'].following_schedule(left_window)
> >> >> >
> >> >> >     logging.info(('Left Window {}, Now {}, Right Window {}'
> >> >> > ).format(left_window,now,right_window))
> >> >> >
> >> >> >     if not now <= right_window:
> >> >> >
> >> >> >         logging.info('Not latest execution, skipping downstream.')
> >> >> >
> >> >> >         return False
> >> >> >
> >> >> >     return True
> >> >> >
> >> >> >
> >> >> > t0 = ShortCircuitOperator(
> >> >> >
> >> >> >   task_id         = 'short_circuit_if_not_current,
> >> >> >
> >> >> >   provide_context = True,
> >> >> >
> >> >> >   python_callable = skip_to_current_job,
> >> >> >
> >> >> >   dag             = dag
> >> >> >
> >> >> > )
> >> >> >
> >> >> >
> >> >> > -s
> >> >> >
> >> >> >
> >> >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin <
> bo...@boristyukin.com
> >> >
> >> >> > wrote:
> >> >> >
> >> >> > > Hello all and thanks for such an amazing project! I have been
> >> >> evaluating
> >> >> > > Airflow and spent a few days reading about it and playing with it
> >> and
> >> >> I
> >> >> > > have a few questions that I struggle to understand.
> >> >> > >
> >> >> > > Let's say I have a simple DAG that runs once a day and it is
> doing
> >> a
> >> >> full
> >> >> > > reload of tables from the source database so the process is not
> >> >> > > incremental.
> >> >> > >
> >> >> > > Let's consider this scenario:
> >> >> > >
> >> >> > > Day 1 - OK
> >> >> > >
> >> >> > > Day 2 - airflow scheduler or server with airflow is down for some
> >> >> reason
> >> >> > > ((or
> >> >> > > DAG is paused)
> >> >> > >
> >> >> > > Day 3 - still down(or DAG is paused)
> >> >> > >
> >> >> > > Day 4 - server is up and now needs to run missing jobs.
> >> >> > >
> >> >> > >
> >> >> > > How can I make airflow to run only Day 4 job and not backfill
> Day 2
> >> >> and
> >> >> > 3?
> >> >> > >
> >> >> > >
> >> >> > > I tried to do depend_on_past = True but it does not seem to do
> this
> >> >> > trick.
> >> >> > >
> >> >> > >
> >> >> > > I also found in a roadmap doc this but seems it is not made to
> the
> >> >> > release
> >> >> > > yet:
> >> >> > >
> >> >> > >
> >> >> > >  Only Run Latest - Champion : Sid
> >> >> > >
> >> >> > > • For cases where we need to only run the latest in a series of
> >> task
> >> >> > > instance runs and mark the others as skipped. For example, we may
> >> have
> >> >> > job
> >> >> > > to execute a DB snapshot every day. If the DAG is paused for 5
> days
> >> >> and
> >> >> > > then unpaused, we don’t want to run all 5, just the latest. With
> >> this
> >> >> > > feature, we will provide “cron” functionality for task scheduling
> >> >> that is
> >> >> > > not related to ETL
> >> >> > >
> >> >> > >
> >> >> > > My second question, what if I have another DAG that does
> >> incremental
> >> >> > loads
> >> >> > > from a source table:
> >> >> > >
> >> >> > >
> >> >> > > Day 1 - OK, loaded new/changed data for previous day
> >> >> > >
> >> >> > > Day 2 - source system is down (or DAG is paused), Airflow DagRun
> >> >> failed
> >> >> > >
> >> >> > > Day 3 - source system is down (or DAG is paused), Airflow DagRun
> >> >> failed
> >> >> > >
> >> >> > > Day 4 - source system is up, Airflow Dagrun succeeded
> >> >> > >
> >> >> > >
> >> >> > > My problem (unless I am missing something), Airflow on Day 4
> would
> >> use
> >> >> > > execution time from Day 3, so the interval for incremental load
> >> would
> >> >> be
> >> >> > > since the last run (which was Failed). My hope it would use the
> >> last
> >> >> > > _successful_ run so on Day 4 it would go back to Day 1. Is it
> >> >> possible to
> >> >> > > achieve this?
> >> >> > >
> >> >> > > I am aware of a manual backfill command via CLI but I am not
> sure I
> >> >> want
> >> >> > to
> >> >> > > use due to all the issues and inconsistencies I've read about it.
> >> >> > >
> >> >> > > Thanks!
> >> >> > >
> >> >> >
> >> >>
> >> >
> >> >
> >>
> >
> >
>

Reply via email to