raman created AIRFLOW-3136:
------------------------------

             Summary: Scheduler Failing the Task retries run while processing 
Executor Events
                 Key: AIRFLOW-3136
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3136
             Project: Apache Airflow
          Issue Type: Bug
          Components: scheduler
    Affects Versions: 1.9.0
            Reporter: raman


Following behaviour is observed with Airflow 1.9 with LocalExecutor mode

 

Airflow scheduler processes the executor events in 
"_process_executor_events(self, simple_dag_bag, session=None)" function of 
jobs.py.
The events are identified by key which is composed of dag id, task id, 
execution date. So all retries of a task have the same key.

If task retry interval is very small like 30 seconds than scheduler might 
schedule the next retry run while the previous task run result is still in the 
executor event queue.
Current task run might be in queued state while scheduler is processing the 
executor's previous events Which might make scheduler to fail the current run 
because of following code in the jobs.py file

def _process_executor_events(self, simple_dag_bag, session=None):
 """
 Respond to executor events.
 """
 # TODO: this shares quite a lot of code with _manage_executor_state

TI = models.TaskInstance
 *for key, state in list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)*
 *.items()):*
 dag_id, task_id, execution_date = key
 self.log.info(
 "Executor reports %s.%s execution_date=%s as %s",
 dag_id, task_id, execution_date, state
 )
 if state == State.FAILED or state == State.SUCCESS:
 qry = session.query(TI).filter(TI.dag_id == dag_id,
 TI.task_id == task_id,
 TI.execution_date == execution_date)
 ti = qry.first()
 if not ti:
 self.log.warning("TaskInstance %s went missing from the database", ti)
 continue

TODO: should we fail RUNNING as well, as we do in Backfills?
 *if ti.state == State.QUEUED:*
 msg = ("Executor reports task instance %s finished (%s) "
 "although the task says its %s. Was the task "
 "killed externally?".format(ti, state, ti.state))



 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to