[ 
https://issues.apache.org/jira/browse/AIRFLOW-3136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vardan Gupta reassigned AIRFLOW-3136:
-------------------------------------

    Assignee: Vardan Gupta

> Scheduler Failing the Task retries run while processing Executor Events
> -----------------------------------------------------------------------
>
>                 Key: AIRFLOW-3136
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3136
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.9.0
>            Reporter: raman
>            Assignee: Vardan Gupta
>            Priority: Major
>             Fix For: 2.0.0, 1.10.1
>
>
> Following behaviour is observed with Airflow 1.9 with LocalExecutor mode
>  
> Airflow scheduler processes the executor events in 
> "_process_executor_events(self, simple_dag_bag, session=None)" function of 
> jobs.py.
> The events are identified by key which is composed of dag id, task id, 
> execution date. So all retries of a task have the same key.
> If task retry interval is very small like 30 seconds than scheduler might 
> schedule the next retry run while the previous task run result is still in 
> the executor event queue.
> Current task run might be in queued state while scheduler is processing the 
> executor's previous events Which might make scheduler to fail the current run 
> because of following code in the jobs.py file
> def _process_executor_events(self, simple_dag_bag, session=None):
>  """
>  Respond to executor events.
>  """
>  # TODO: this shares quite a lot of code with _manage_executor_state
> TI = models.TaskInstance
>  *for key, state in 
> list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)*
>  *.items()):*
>  dag_id, task_id, execution_date = key
>  self.log.info(
>  "Executor reports %s.%s execution_date=%s as %s",
>  dag_id, task_id, execution_date, state
>  )
>  if state == State.FAILED or state == State.SUCCESS:
>  qry = session.query(TI).filter(TI.dag_id == dag_id,
>  TI.task_id == task_id,
>  TI.execution_date == execution_date)
>  ti = qry.first()
>  if not ti:
>  self.log.warning("TaskInstance %s went missing from the database", ti)
>  continue
> TODO: should we fail RUNNING as well, as we do in Backfills?
>  *if ti.state == State.QUEUED:*
>  msg = ("Executor reports task instance %s finished (%s) "
>  "although the task says its %s. Was the task "
>  "killed externally?".format(ti, state, ti.state))
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to