Filed https://issues.apache.org/jira/browse/AIRFLOW-1178 for @once being scheduled twice.
-- Ruslan Dautkhanov On Sat, May 6, 2017 at 9:30 PM, Ruslan Dautkhanov <[email protected]> wrote: > Thanks for the follow up Chris. > It used to work for me with catchup=False in a month-old version of > Airflow. That's why I mentioned it as a regression. > > Tried today catchup=True with @once seems actually tries to "catchup" > which does not make sense for @once schedule, > notice there is one active run and one pending/"scheduled": > [image: Inline image 1] > > So we can't really use @once with catchup=True and it's not a workaround > for this problem. > > Thanks. > > > > -- > Ruslan Dautkhanov > > On Sat, May 6, 2017 at 10:47 AM, Chris Fei <[email protected]> wrote: > >> I wonder if your issue is the same root cause as AIRFLOW-1013[1] (which >> you seem to have reported) and AIRFLOW-1055[2]. I haven't tried it >> myself, but that second ticket seems to indicate that a workaround >> could be setting catchup = True on your DAG. Not sure if that's an >> option for you. >> On Sat, May 6, 2017, at 12:29 PM, Ruslan Dautkhanov wrote: >> > I've upgraded Airflow to today's master branch. >> > >> > Got following regression in attempt to start a DAG: >> > >> > Process DagFileProcessor209-Process: >> >> Traceback (most recent call last): >> >> File >> >> "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/proce- >> >> ss.py",>> line 258, in _bootstrap >> >> self.run() >> >> File >> >> "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/proce- >> >> ss.py",>> line 114, in run >> >> self._target(*self._args, **self._kwargs) >> >> File "/opt/airflow/airflow- >> >> 20170506/src/airflow/airflow/jobs.py", line>> 346, in helper >> >> pickle_dags) >> >> File "/opt/airflow/airflow-20170506/src/airflow/airflow/utils/db.py",>> >> line 48, in wrapper >> >> result = func(*args, **kwargs) >> >> File "/opt/airflow/airflow- >> >> 20170506/src/airflow/airflow/jobs.py", line>> 1584, in process_file >> >> self._process_dags(dagbag, dags, ti_keys_to_schedule) >> >> File "/opt/airflow/airflow- >> >> 20170506/src/airflow/airflow/jobs.py", line>> 1173, in _process_dags >> >> dag_run = self.create_dag_run(dag) >> >> File "/opt/airflow/airflow-20170506/src/airflow/airflow/utils/db.py",>> >> line 48, in wrapper >> >> result = func(*args, **kwargs) >> >> File "/opt/airflow/airflow- >> >> 20170506/src/airflow/airflow/jobs.py", line>> 776, in create_dag_run >> >> if next_start <= now: >> >> TypeError: can't compare datetime.datetime to NoneType >> > >> > >> > >> > DAG definition: >> > >> > main_dag = DAG( >> >> dag_id = 'DISCOVER-Oracle-Load-Mar2017-v1',>> >> default_args = default_args, # >> >> dafeult operators' arguments - see above>> user_defined_macros >> = dag_macros, # I do not get >> >> different between>> ## params = >> dag_macros, # >> >> ## user_defined_macros and params>> # >> >> start_date = datetime.now(), # >> >> or e.g. datetime(2015, 6, 1)>> # 'end_date' = >> datetime(2016, 1, 1), >> >> catchup = False, # >> >> Perform scheduler catchup (or only run latest)?>> >> # - >> defaults to True>> schedule_interval = '@once', >> # >> >> '@once'=None?>> >> # >> doesn't create multiple dag runs automatically>> concurrency >> = 3, # >> >> task instances allowed to run concurrently>> max_active_runs >> = 1, # >> >> only one DAG run at a time>> dagrun_timeout = >> timedelta(days=4), # >> >> no way this dag should ran for 4 days>> orientation >> = 'TB', # >> >> default graph view>> ) >> > >> > >> > default_args: >> > >> > default_args = { >> >> # Security: >> >> 'owner' : 'rdautkha', # >> >> owner of the task, using the unix username is recommended>> # >> 'run_as_user' : None # >> >> # unix username to impersonate while running the task>> # >> Scheduling: >> >> 'start_date' : None, # >> >> don't confuse with DAG's start_date>> 'depends_on_past' >> : False, # >> >> True makes sense... but there are bugs around that code>> >> 'wait_for_downstream' : False, # >> >> depends_on_past is forced to True if wait_for_downstream>> >> 'trigger_rule' : 'all_success', # >> >> all_succcess is default anyway>> # Retries >> >> 'retries' : 0, # >> >> No retries>> # 'retry_delay' : >> timedelta(minutes=5), # >> >> # check retry_exponential_backoff and max_retry_delay too>> # >> Timeouts and SLAs >> >> # 'sla' : timedelta(hours=1), # >> >> # default tasks' sla - normally don't run longer>> >> 'execution_timeout' : timedelta(hours=3), # >> >> no single task runs 3 hours or more>> # 'sla_miss_callback' >> # - >> >> # function to call when reporting SLA timeouts>> # Notifications: >> >> 'email' : ['[email protected]'], >> >> 'email_on_failure' : True, >> >> 'email_on_retry' : True, >> >> # Resource usage: >> >> 'pool' : 'DISCOVER-Prod', # >> >> can increase this pool's concurrency>> # 'queue' >> : 'some_queue', >> >> # 'priority_weight' : 10, >> >> # Miscellaneous: >> >> # on_failure_callback=None, on_success_callback=None, >> >> # on_retry_callback=None>> } >> > >> > >> > The DAG itself has a bunch of Oracle operators. >> > >> > Any ideas? >> > >> > That's a regression from a month old Airflow. >> > No changes in DAG. >> > >> > >> > >> > Thank you, >> > Ruslan Dautkhanov >> >> >> Links: >> >> 1. https://issues.apache.org/jira/browse/AIRFLOW-1013 >> 2. https://issues.apache.org/jira/browse/AIRFLOW-1055 >> > >
