[ https://issues.apache.org/jira/browse/AIRFLOW-1055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954080#comment-15954080 ]
Bolke de Bruin commented on AIRFLOW-1055: ----------------------------------------- Sid, I don't think this is in create_dag_run() (the traceback doe not list that function) and it only happens when a DAG with a @once schedule has a sla associated with it. Which is not very common. I don't think this should be a blocker (but nevertheless a fix would be very nice) [~criccomini] > airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False > ----------------------------------------------------------------------------- > > Key: AIRFLOW-1055 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1055 > Project: Apache Airflow > Issue Type: Bug > Affects Versions: Airflow 1.8 > Reporter: Siddharth Anand > Assignee: Siddharth Anand > Priority: Blocker > Labels: dagrun, once, scheduler, sla > Fix For: 1.8.1 > > > Getting following exception > {noformat} > [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an > exception! Propagating... > Traceback (most recent call last): > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 346, in helper > pickle_dags) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 1581, in process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 1175, in _process_dags > self.manage_slas(dag) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 595, in manage_slas > while dttm < datetime.now(): > TypeError: can't compare datetime.datetime to NoneType > {noformat} > Exception is in airflow/jobs.py:manage_slas() : > https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595 > {code} > ts = datetime.now() > SlaMiss = models.SlaMiss > for ti in max_tis: > task = dag.get_task(ti.task_id) > dttm = ti.execution_date > if task.sla: > dttm = dag.following_schedule(dttm) > >>> while dttm < datetime.now(): <<< here > following_schedule = dag.following_schedule(dttm) > if following_schedule + task.sla < datetime.now(): > session.merge(models.SlaMiss( > task_id=ti.task_id, > {code} > It seems that dag.following_schedule() returns None for @once dag? > Here's how dag is defined: > {code} > import datetime as dt > import airflow > from airflow.models import DAG > from airflow.operators.dummy_operator import DummyOperator > from datetime import datetime, timedelta > def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis): > print('Executing SLA miss callback') > now = datetime.now() > now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, > microsecond=0) > START_DATE = now_to_the_hour + timedelta(hours=-3) > DAG_NAME = 'manage_sla_once_dag' > default_args = { > 'owner': 'sanand', > 'depends_on_past': False, > 'start_date': START_DATE, > 'sla': timedelta(hours=2) > } > dag = DAG( > dag_id = 'manage_sla_once_dag', > default_args = default_args, > catchup = False, > schedule_interval = '@once', > sla_miss_callback = sla_alert_func > ) > task1 = DummyOperator(task_id='task1', dag=dag) > {code} > This issue works if catchup = True. -- This message was sent by Atlassian JIRA (v6.3.15#6346)