You are still using a dynamic start_date. Basically you are saying that the start_date is 5 minutes from now every time it is evaluated, so it will always be true:
First iteration: Start_date: Current time 2016-01-01 00:05:00 - 5 minutes = 2016-01-01 00:00:00 Second iteration Start_date: Current time 2016-01-01 00:05:01 - 5 minutes = 2016-01-01 00:00:01 Set your start_date to something absolute, that evaluates to something fixed every time it is evaluated. Bolke > Op 26 aug. 2016, om 10:02 heeft David Montgomery <[email protected]> > het volgende geschreven: > > I changed to code and justs keeps running. The start date is 5 minutes ago > and the cron is set to run every 5 mins. Ever second the dag is > triggered. Wow what am I missing in the docs? I have a dag that runs a > python script that outputs to a log file 1,2,3 in order for testing an d > expecting this to happen every 5 mins. Yet its not. > > All I see in the below once a sec > 1 > 2 > 3 > 1 > 2 > 3 > 1 > 2 > 3 > on and on one line a second or two. > > > > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > "start_date": datetime.now()-timedelta(minutes=5), > 'email': ['[email protected]'], > 'email_on_failure': True, > 'email_on_retry': True, > 'retries': 1, > 'retry_delay': timedelta(minutes=5), > # 'queue': 'bash_queue', > # 'pool': 'backfill', > # 'priority_weight': 10, > # 'end_date': datetime(2016, 1, 1), > } > > # */5 * * * * > dag = DAG('first_test', schedule_interval="*/5 * * * *", > default_args=default_args) > > > node_0 = PythonOperator( > task_id='isnewdata', > provide_context=False, > python_callable=checkfornewdata, > dag=dag) > > > node_0_1 = PythonOperator( > task_id='fetchdata', > provide_context=False, > python_callable=fetchdata, > dag=dag) > > node_0_1_2 = PythonOperator( > task_id='uploadtoes', > provide_context=False, > python_callable= uploadtoes, > dag=dag) > > > node_0_1.set_upstream(node_0) > node_0_1_2.set_upstream(node_0_1) > > > > > > > > On Wed, Aug 24, 2016 at 11:04 PM, Laura Lorenz <[email protected]> > wrote: > >> I don't think this necessarily answers your question, but one thing I >> noticed is that you are using a dynamic start_date, when you should be >> using a fixed one. From the FAQs >> <https://pythonhosted.org/airflow/faq.html#what-s-the-deal-with-start-date >>> : >> >> We recommend against using dynamic values as start_date, especially >>> datetime.now() as it can be quite confusing. The task is triggered once >> the >>> period closes, and in theory an @hourly DAG would never get to an hour >>> after now as now() moves along. >> >> >> More to the point, what specifically do you mean by "always running" and >> "fires every cycle"? For example is what you are seeing a new task instance >> with a new execution date every run of the scheduler i.e. from the Browse > >> Task Instances UI? >> >> On Tue, Aug 23, 2016 at 5:27 PM, David Montgomery < >> [email protected] >>> wrote: >> >>> even @hourly is not working. Fires every cycle. wow >>> >>> On Wed, Aug 24, 2016 at 5:09 AM, David Montgomery < >>> [email protected] >>>> wrote: >>> >>>> I updated the dag. In thje UI I see 0 * * * * in the schedule field >>>> >>>> >>>> >>>> default_args = { >>>> 'owner': 'airflow', >>>> 'depends_on_past': False, >>>> "start_date": datetime.now(), >>>> 'email': ['[email protected]'], >>>> 'email_on_failure': True, >>>> 'email_on_retry': True, >>>> 'retries': 1, >>>> 'retry_delay': timedelta(minutes=5) >>>> } >>>> >>>> >>>> >>>> dag = DAG('first_test', schedule_interval="0 * * * *", >>>> default_args=default_args) >>>> >>>> node_0 = PythonOperator( >>>> task_id='isnewdata', >>>> provide_context=False, >>>> python_callable=checkfornewdata, >>>> dag=dag) >>>> >>>> >>>> node_0_1 = PythonOperator( >>>> task_id='fetchdata', >>>> provide_context=False, >>>> python_callable=fetchdata, >>>> dag=dag) >>>> >>>> node_0_1_2 = PythonOperator( >>>> task_id='uploadtoes', >>>> provide_context=False, >>>> python_callable= uploadtoes, >>>> dag=dag) >>>> >>>> >>> >>
