I wonder if your issue is the same root cause as AIRFLOW-1013[1] (which
you seem to have reported) and AIRFLOW-1055[2]. I haven't tried it
myself, but that second ticket seems to indicate that a workaround
could be setting catchup = True on your DAG. Not sure if that's an
option for you.
On Sat, May 6, 2017, at 12:29 PM, Ruslan Dautkhanov wrote:
> I've upgraded Airflow to today's master branch.
> 
> Got following regression in attempt to start a DAG:
> 
> Process DagFileProcessor209-Process:
>> Traceback (most recent call last):
>> File
>> "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/proce-
>> ss.py",>> line 258, in _bootstrap
>>   self.run()
>> File
>> "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/proce-
>> ss.py",>> line 114, in run
>>   self._target(*self._args, **self._kwargs)
>> File "/opt/airflow/airflow-
>> 20170506/src/airflow/airflow/jobs.py", line>> 346, in helper
>>   pickle_dags)
>> File "/opt/airflow/airflow-20170506/src/airflow/airflow/utils/db.py",>> line 
>> 48, in wrapper
>>   result = func(*args, **kwargs)
>> File "/opt/airflow/airflow-
>> 20170506/src/airflow/airflow/jobs.py", line>> 1584, in process_file
>>   self._process_dags(dagbag, dags, ti_keys_to_schedule)
>> File "/opt/airflow/airflow-
>> 20170506/src/airflow/airflow/jobs.py", line>> 1173, in _process_dags
>>   dag_run = self.create_dag_run(dag)
>> File "/opt/airflow/airflow-20170506/src/airflow/airflow/utils/db.py",>> line 
>> 48, in wrapper
>>   result = func(*args, **kwargs)
>> File "/opt/airflow/airflow-
>> 20170506/src/airflow/airflow/jobs.py", line>> 776, in create_dag_run
>>   if next_start <= now:
>> TypeError: can't compare datetime.datetime to NoneType
> 
> 
> 
> DAG definition:
> 
> main_dag = DAG(
>>   dag_id                         = 'DISCOVER-Oracle-Load-Mar2017-v1',>>   
>> default_args                   = default_args,                  #
>>   dafeult operators' arguments - see above>>   user_defined_macros           
>>  = dag_macros,       # I do not get
>>   different between>>   ## params                         = dag_macros,      
>>  #
>>   ## user_defined_macros and params>>   #
>>   start_date                     = datetime.now(),                #
>>   or e.g. datetime(2015, 6, 1)>>   # 'end_date'                   = 
>> datetime(2016, 1, 1),
>>   catchup                        = False,                         #
>>   Perform scheduler catchup (or only run latest)?>>                          
>>                                              # -
defaults to True>>   schedule_interval              = '@once',                  
     #
>>   '@once'=None?>>                                                            
>>          #
doesn't create multiple dag runs automatically>>   concurrency                  
  = 3,                             #
>>   task instances allowed to run concurrently>>   max_active_runs             
>>    = 1,                             #
>>   only one DAG run at a time>>   dagrun_timeout                 = 
>> timedelta(days=4),             #
>>   no way this dag should ran for 4 days>>   orientation                    = 
>> 'TB',                          #
>>   default graph view>> )
> 
> 
> default_args:
> 
> default_args = {
>>   # Security:
>>   'owner'                        : 'rdautkha',                    #
>>   owner of the task, using the unix username is recommended>>   # 
>> 'run_as_user'                : None                           #
>>   # unix username to impersonate while running the task>>   # Scheduling:
>>   'start_date'                   : None,                          #
>>   don't confuse with DAG's start_date>>   'depends_on_past'              : 
>> False,                         #
>>   True makes sense... but there are bugs around that code>>   
>> 'wait_for_downstream'          : False,                         #
>>   depends_on_past is forced to True if wait_for_downstream>>   
>> 'trigger_rule'                 : 'all_success',                 #
>>   all_succcess is default anyway>>   # Retries
>>   'retries'                      : 0,                             #
>>   No retries>>   # 'retry_delay'                : timedelta(minutes=5),      
>>     #
>>   # check retry_exponential_backoff and max_retry_delay too>>   # Timeouts 
>> and SLAs
>>   # 'sla'                        : timedelta(hours=1),            #
>>   # default tasks' sla - normally don't run longer>>   'execution_timeout'   
>>          : timedelta(hours=3),            #
>>   no single task runs 3 hours or more>>   # 'sla_miss_callback'              
>>                              # -
>>   # function to call when reporting SLA timeouts>>   # Notifications:
>>   'email'                        : ['[email protected]'],
>>   'email_on_failure'             : True,
>>   'email_on_retry'               : True,
>>   # Resource usage:
>>   'pool'                         : 'DISCOVER-Prod',               #
>>   can increase this pool's concurrency>>   # 'queue'                      : 
>> 'some_queue',
>>   # 'priority_weight'            : 10,
>>   # Miscellaneous:
>>   # on_failure_callback=None, on_success_callback=None,
>>   # on_retry_callback=None>> }
> 
> 
> The DAG itself has a bunch of Oracle operators.
> 
> Any ideas?
> 
> That's a regression from a month old Airflow.
> No changes in DAG.
> 
> 
> 
> Thank you,
> Ruslan Dautkhanov


Links:

  1. https://issues.apache.org/jira/browse/AIRFLOW-1013
  2. https://issues.apache.org/jira/browse/AIRFLOW-1055

Reply via email to