I am surprised by the sample. But this should work: start_date = datetime.date(2016,8,26)
> Op 26 aug. 2016, om 10:20 heeft David Montgomery <[email protected]> > het volgende geschreven: > > How? That is one example in one of the examples > > seven_days_ago = datetime.combine(datetime.today() - timedelta(7), > datetime.min.time()) > > I tired this... > mytime = datetime.combine(datetime.now()-timedelta(minutes=5), > datetime.min.time()) > > What is an example that will work? > > Thnaks > > On Fri, Aug 26, 2016 at 4:08 PM, Bolke de Bruin <[email protected]> wrote: > >> You are still using a dynamic start_date. >> >> Basically you are saying that the start_date is 5 minutes from now every >> time it is evaluated, so it will always be true: >> >> First iteration: >> Start_date: Current time 2016-01-01 00:05:00 - 5 minutes = 2016-01-01 >> 00:00:00 >> >> Second iteration >> Start_date: Current time 2016-01-01 00:05:01 - 5 minutes = 2016-01-01 >> 00:00:01 >> >> Set your start_date to something absolute, that evaluates to something >> fixed every time it is evaluated. >> >> Bolke >> >>> Op 26 aug. 2016, om 10:02 heeft David Montgomery < >> [email protected]> het volgende geschreven: >>> >>> I changed to code and justs keeps running. The start date is 5 minutes >> ago >>> and the cron is set to run every 5 mins. Ever second the dag is >>> triggered. Wow what am I missing in the docs? I have a dag that runs a >>> python script that outputs to a log file 1,2,3 in order for testing an d >>> expecting this to happen every 5 mins. Yet its not. >>> >>> All I see in the below once a sec >>> 1 >>> 2 >>> 3 >>> 1 >>> 2 >>> 3 >>> 1 >>> 2 >>> 3 >>> on and on one line a second or two. >>> >>> >>> >>> default_args = { >>> 'owner': 'airflow', >>> 'depends_on_past': False, >>> "start_date": datetime.now()-timedelta(minutes=5), >>> 'email': ['[email protected]'], >>> 'email_on_failure': True, >>> 'email_on_retry': True, >>> 'retries': 1, >>> 'retry_delay': timedelta(minutes=5), >>> # 'queue': 'bash_queue', >>> # 'pool': 'backfill', >>> # 'priority_weight': 10, >>> # 'end_date': datetime(2016, 1, 1), >>> } >>> >>> # */5 * * * * >>> dag = DAG('first_test', schedule_interval="*/5 * * * *", >>> default_args=default_args) >>> >>> >>> node_0 = PythonOperator( >>> task_id='isnewdata', >>> provide_context=False, >>> python_callable=checkfornewdata, >>> dag=dag) >>> >>> >>> node_0_1 = PythonOperator( >>> task_id='fetchdata', >>> provide_context=False, >>> python_callable=fetchdata, >>> dag=dag) >>> >>> node_0_1_2 = PythonOperator( >>> task_id='uploadtoes', >>> provide_context=False, >>> python_callable= uploadtoes, >>> dag=dag) >>> >>> >>> node_0_1.set_upstream(node_0) >>> node_0_1_2.set_upstream(node_0_1) >>> >>> >>> >>> >>> >>> >>> >>> On Wed, Aug 24, 2016 at 11:04 PM, Laura Lorenz <[email protected] >>> >>> wrote: >>> >>>> I don't think this necessarily answers your question, but one thing I >>>> noticed is that you are using a dynamic start_date, when you should be >>>> using a fixed one. From the FAQs >>>> <https://pythonhosted.org/airflow/faq.html#what-s-the- >> deal-with-start-date >>>>> : >>>> >>>> We recommend against using dynamic values as start_date, especially >>>>> datetime.now() as it can be quite confusing. The task is triggered once >>>> the >>>>> period closes, and in theory an @hourly DAG would never get to an hour >>>>> after now as now() moves along. >>>> >>>> >>>> More to the point, what specifically do you mean by "always running" and >>>> "fires every cycle"? For example is what you are seeing a new task >> instance >>>> with a new execution date every run of the scheduler i.e. from the >> Browse > >>>> Task Instances UI? >>>> >>>> On Tue, Aug 23, 2016 at 5:27 PM, David Montgomery < >>>> [email protected] >>>>> wrote: >>>> >>>>> even @hourly is not working. Fires every cycle. wow >>>>> >>>>> On Wed, Aug 24, 2016 at 5:09 AM, David Montgomery < >>>>> [email protected] >>>>>> wrote: >>>>> >>>>>> I updated the dag. In thje UI I see 0 * * * * in the schedule field >>>>>> >>>>>> >>>>>> >>>>>> default_args = { >>>>>> 'owner': 'airflow', >>>>>> 'depends_on_past': False, >>>>>> "start_date": datetime.now(), >>>>>> 'email': ['[email protected]'], >>>>>> 'email_on_failure': True, >>>>>> 'email_on_retry': True, >>>>>> 'retries': 1, >>>>>> 'retry_delay': timedelta(minutes=5) >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> dag = DAG('first_test', schedule_interval="0 * * * *", >>>>>> default_args=default_args) >>>>>> >>>>>> node_0 = PythonOperator( >>>>>> task_id='isnewdata', >>>>>> provide_context=False, >>>>>> python_callable=checkfornewdata, >>>>>> dag=dag) >>>>>> >>>>>> >>>>>> node_0_1 = PythonOperator( >>>>>> task_id='fetchdata', >>>>>> provide_context=False, >>>>>> python_callable=fetchdata, >>>>>> dag=dag) >>>>>> >>>>>> node_0_1_2 = PythonOperator( >>>>>> task_id='uploadtoes', >>>>>> provide_context=False, >>>>>> python_callable= uploadtoes, >>>>>> dag=dag) >>>>>> >>>>>> >>>>> >>>> >> >>
