you rock, Sid! thanks for taking your time explaining it for me On Thu, Oct 13, 2016 at 6:10 PM, siddharth anand <[email protected]> wrote:
> I can't see an image. > > We run most of our dags with depends_on_past=True. > > If you want to chain your dag runs, such as not starting the first task of > your dag run start until the last task of your previous dag runs completes, > you can use an external task sensor. The external task sensor would be the > first task in the dag and would depend on the last task in the same dag > from the previous dag run. This is strict dag chaining. > > If you just don't want the same task in the subsequent dag run to get > scheduled unless the first task completes, depends_on_past=True helps > there. This is more a cascading effect in the tree view. > -s > > On Thu, Oct 13, 2016 at 12:41 PM, Boris Tyukin <[email protected]> > wrote: > > > This is not what I see actually. I posted below my test DAG and a > > screenshot. > > > > It does create DAGRuns on subsequent runs - I modeled that scenario by > > commenting one bash command and uncommenting another one with Exit 1. > > > > it does not create Task Instances on subsequent failed DAGs but it does > > create DAGRuns and the first successful run after failed ones would not > > have execution timestamp from last successful run > > > > > > [image: Inline image 1] > > > > > > here is my test DAG > > > > > > > > from datetime import datetime, timedelta > > > > # Determine schedule: > > dag_schedule_interval = timedelta(seconds=60) > > dag_start_date = datetime.now() - dag_schedule_interval > > > > > > default_args = { > > 'owner': 'airflow', > > 'depends_on_past': True, > > 'start_date': dag_start_date, > > # 'start_date': datetime(2016, 10, 11, 17, 0), > > 'email': ['[email protected]'], > > 'email_on_failure': False, > > 'email_on_retry': False, > > 'retries': 0, > > 'retry_delay': timedelta(seconds=20), > > ,'only_run_latest'=True, > > # 'queue': 'bash_queue', > > # 'pool': 'backfill', > > # 'priority_weight': 10, > > # 'end_date': datetime(2016, 1, 1), > > } > > > > # Change version number if schedule needs to be changed: > > dag = DAG( > > 'pipeline1_v8', default_args=default_args, schedule_interval=dag_ > > schedule_interval) > > > > dag.doc_md = __doc__ > > > > # t1, t2 and t3 are examples of tasks created by instantiating operators > > t1 = BashOperator( > > task_id='t1', > > bash_command='echo execution ts {{ ts }} & echo 1', > > # bash_command='exit 1', > > dag=dag) > > > > On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand <[email protected]> > > wrote: > > > >> If you use depends_on_past=True, it won't proceed to the next DAG Run if > >> the previous DAG Run failed. If Day 2 fails, Day 3 won't run. > >> > >> -s > >> > >> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand <[email protected]> > >> wrote: > >> > >> > Yes! It does work with Depends_on_past=True. > >> > -s > >> > > >> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin <[email protected] > > > >> > wrote: > >> > > >> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" - > >> will > >> >> it work with depend_on_past = True? or it will assume that DAG is > used > >> >> False? > >> >> > >> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand <[email protected]> > >> >> wrote: > >> >> > >> >> > Boris, > >> >> > > >> >> > *Question 1* > >> >> > Only_Run_Latest is in master - > >> >> > https://github.com/apache/incubator-airflow/commit/ > >> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a. > >> >> > That will solve your problem. > >> >> > > >> >> > Releases come out one a quarter sometimes once every 2 quarters, > so I > >> >> would > >> >> > recommend that you run off master or off your own fork. > >> >> > > >> >> > You could also achieve this yourself with the following code > >> snippet. It > >> >> > uses a ShortCircuitOperator that will skip downstream tasks if the > >> >> DagRun > >> >> > being executed is not the current one. It will work for any > schedule. > >> >> The > >> >> > code below has essentially been implemented in the > LatestOnlyOperator > >> >> above > >> >> > for convenience. > >> >> > > >> >> > def skip_to_current_job(ds, **kwargs): > >> >> > > >> >> > now = datetime.now() > >> >> > > >> >> > left_window = kwargs['dag'].following_schedu > >> le(kwargs['execution_ > >> >> > date']) > >> >> > > >> >> > right_window = kwargs['dag'].following_schedule(left_window) > >> >> > > >> >> > logging.info(('Left Window {}, Now {}, Right Window {}' > >> >> > ).format(left_window,now,right_window)) > >> >> > > >> >> > if not now <= right_window: > >> >> > > >> >> > logging.info('Not latest execution, skipping downstream.') > >> >> > > >> >> > return False > >> >> > > >> >> > return True > >> >> > > >> >> > > >> >> > t0 = ShortCircuitOperator( > >> >> > > >> >> > task_id = 'short_circuit_if_not_current, > >> >> > > >> >> > provide_context = True, > >> >> > > >> >> > python_callable = skip_to_current_job, > >> >> > > >> >> > dag = dag > >> >> > > >> >> > ) > >> >> > > >> >> > > >> >> > -s > >> >> > > >> >> > > >> >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin < > [email protected] > >> > > >> >> > wrote: > >> >> > > >> >> > > Hello all and thanks for such an amazing project! I have been > >> >> evaluating > >> >> > > Airflow and spent a few days reading about it and playing with it > >> and > >> >> I > >> >> > > have a few questions that I struggle to understand. > >> >> > > > >> >> > > Let's say I have a simple DAG that runs once a day and it is > doing > >> a > >> >> full > >> >> > > reload of tables from the source database so the process is not > >> >> > > incremental. > >> >> > > > >> >> > > Let's consider this scenario: > >> >> > > > >> >> > > Day 1 - OK > >> >> > > > >> >> > > Day 2 - airflow scheduler or server with airflow is down for some > >> >> reason > >> >> > > ((or > >> >> > > DAG is paused) > >> >> > > > >> >> > > Day 3 - still down(or DAG is paused) > >> >> > > > >> >> > > Day 4 - server is up and now needs to run missing jobs. > >> >> > > > >> >> > > > >> >> > > How can I make airflow to run only Day 4 job and not backfill > Day 2 > >> >> and > >> >> > 3? > >> >> > > > >> >> > > > >> >> > > I tried to do depend_on_past = True but it does not seem to do > this > >> >> > trick. > >> >> > > > >> >> > > > >> >> > > I also found in a roadmap doc this but seems it is not made to > the > >> >> > release > >> >> > > yet: > >> >> > > > >> >> > > > >> >> > > Only Run Latest - Champion : Sid > >> >> > > > >> >> > > • For cases where we need to only run the latest in a series of > >> task > >> >> > > instance runs and mark the others as skipped. For example, we may > >> have > >> >> > job > >> >> > > to execute a DB snapshot every day. If the DAG is paused for 5 > days > >> >> and > >> >> > > then unpaused, we don’t want to run all 5, just the latest. With > >> this > >> >> > > feature, we will provide “cron” functionality for task scheduling > >> >> that is > >> >> > > not related to ETL > >> >> > > > >> >> > > > >> >> > > My second question, what if I have another DAG that does > >> incremental > >> >> > loads > >> >> > > from a source table: > >> >> > > > >> >> > > > >> >> > > Day 1 - OK, loaded new/changed data for previous day > >> >> > > > >> >> > > Day 2 - source system is down (or DAG is paused), Airflow DagRun > >> >> failed > >> >> > > > >> >> > > Day 3 - source system is down (or DAG is paused), Airflow DagRun > >> >> failed > >> >> > > > >> >> > > Day 4 - source system is up, Airflow Dagrun succeeded > >> >> > > > >> >> > > > >> >> > > My problem (unless I am missing something), Airflow on Day 4 > would > >> use > >> >> > > execution time from Day 3, so the interval for incremental load > >> would > >> >> be > >> >> > > since the last run (which was Failed). My hope it would use the > >> last > >> >> > > _successful_ run so on Day 4 it would go back to Day 1. Is it > >> >> possible to > >> >> > > achieve this? > >> >> > > > >> >> > > I am aware of a manual backfill command via CLI but I am not > sure I > >> >> want > >> >> > to > >> >> > > use due to all the issues and inconsistencies I've read about it. > >> >> > > > >> >> > > Thanks! > >> >> > > > >> >> > > >> >> > >> > > >> > > >> > > > > >
