You are still using a dynamic start_date.

Basically you are saying that the start_date is 5 minutes from now every time 
it is evaluated, so it will always be true:

First iteration:
Start_date: Current time 2016-01-01 00:05:00 - 5 minutes = 2016-01-01 00:00:00

Second iteration
Start_date: Current time 2016-01-01 00:05:01 - 5 minutes = 2016-01-01 00:00:01

Set your start_date to something absolute, that evaluates to something fixed 
every time it is evaluated.

Bolke

> Op 26 aug. 2016, om 10:02 heeft David Montgomery <[email protected]> 
> het volgende geschreven:
> 
> I changed to code and justs keeps running.  The start date is 5 minutes ago
> and the cron  is set to run every 5 mins.  Ever second the dag is
> triggered. Wow what am I missing in the docs?  I have a dag that runs a
> python script that outputs to a log file 1,2,3 in order for testing an d
> expecting this to happen every 5 mins.  Yet its not.
> 
> All I see in the below once a sec
> 1
> 2
> 3
> 1
> 2
> 3
> 1
> 2
> 3
> on and on one line a second or two.
> 
> 
> 
> default_args = {
>    'owner': 'airflow',
>    'depends_on_past': False,
>    "start_date":  datetime.now()-timedelta(minutes=5),
>    'email': ['[email protected]'],
>    'email_on_failure': True,
>    'email_on_retry': True,
>    'retries': 1,
>    'retry_delay': timedelta(minutes=5),
>    # 'queue': 'bash_queue',
>    # 'pool': 'backfill',
>    # 'priority_weight': 10,
>    # 'end_date': datetime(2016, 1, 1),
> }
> 
> # */5 * * * *
> dag = DAG('first_test', schedule_interval="*/5 * * * *",
> default_args=default_args)
> 
> 
> node_0 = PythonOperator(
>    task_id='isnewdata',
>    provide_context=False,
>    python_callable=checkfornewdata,
>    dag=dag)
> 
> 
> node_0_1 = PythonOperator(
>    task_id='fetchdata',
>    provide_context=False,
>    python_callable=fetchdata,
>    dag=dag)
> 
> node_0_1_2 = PythonOperator(
>    task_id='uploadtoes',
>    provide_context=False,
>    python_callable= uploadtoes,
>    dag=dag)
> 
> 
> node_0_1.set_upstream(node_0)
> node_0_1_2.set_upstream(node_0_1)
> 
> 
> 
> 
> 
> 
> 
> On Wed, Aug 24, 2016 at 11:04 PM, Laura Lorenz <[email protected]>
> wrote:
> 
>> I don't think this necessarily answers your question, but one thing I
>> noticed is that you are using a dynamic start_date, when you should be
>> using a fixed one. From the FAQs
>> <https://pythonhosted.org/airflow/faq.html#what-s-the-deal-with-start-date
>>> :
>> 
>> We recommend against using dynamic values as start_date, especially
>>> datetime.now() as it can be quite confusing. The task is triggered once
>> the
>>> period closes, and in theory an @hourly DAG would never get to an hour
>>> after now as now() moves along.
>> 
>> 
>> More to the point, what specifically do you mean by "always running" and
>> "fires every cycle"? For example is what you are seeing a new task instance
>> with a new execution date every run of the scheduler i.e. from the Browse >
>> Task Instances UI?
>> 
>> On Tue, Aug 23, 2016 at 5:27 PM, David Montgomery <
>> [email protected]
>>> wrote:
>> 
>>> even @hourly is not working.  Fires every cycle. wow
>>> 
>>> On Wed, Aug 24, 2016 at 5:09 AM, David Montgomery <
>>> [email protected]
>>>> wrote:
>>> 
>>>> I updated the dag.  In thje UI I see 0 * * * * in the schedule field
>>>> 
>>>> 
>>>> 
>>>> default_args = {
>>>>    'owner': 'airflow',
>>>>    'depends_on_past': False,
>>>>    "start_date": datetime.now(),
>>>>    'email': ['[email protected]'],
>>>>    'email_on_failure': True,
>>>>    'email_on_retry': True,
>>>>    'retries': 1,
>>>>    'retry_delay': timedelta(minutes=5)
>>>> }
>>>> 
>>>> 
>>>> 
>>>> dag = DAG('first_test', schedule_interval="0 * * * *",
>>>> default_args=default_args)
>>>> 
>>>> node_0 = PythonOperator(
>>>>    task_id='isnewdata',
>>>>    provide_context=False,
>>>>    python_callable=checkfornewdata,
>>>>    dag=dag)
>>>> 
>>>> 
>>>> node_0_1 = PythonOperator(
>>>>    task_id='fetchdata',
>>>>    provide_context=False,
>>>>    python_callable=fetchdata,
>>>>    dag=dag)
>>>> 
>>>> node_0_1_2 = PythonOperator(
>>>>    task_id='uploadtoes',
>>>>    provide_context=False,
>>>>    python_callable= uploadtoes,
>>>>    dag=dag)
>>>> 
>>>> 
>>> 
>> 

Reply via email to