[
https://issues.apache.org/jira/browse/AIRFLOW-3136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vardan Gupta reassigned AIRFLOW-3136:
-------------------------------------
Assignee: Vardan Gupta
> Scheduler Failing the Task retries run while processing Executor Events
> -----------------------------------------------------------------------
>
> Key: AIRFLOW-3136
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3136
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: 1.9.0
> Reporter: raman
> Assignee: Vardan Gupta
> Priority: Major
> Fix For: 2.0.0, 1.10.1
>
>
> Following behaviour is observed with Airflow 1.9 with LocalExecutor mode
>
> Airflow scheduler processes the executor events in
> "_process_executor_events(self, simple_dag_bag, session=None)" function of
> jobs.py.
> The events are identified by key which is composed of dag id, task id,
> execution date. So all retries of a task have the same key.
> If task retry interval is very small like 30 seconds than scheduler might
> schedule the next retry run while the previous task run result is still in
> the executor event queue.
> Current task run might be in queued state while scheduler is processing the
> executor's previous events Which might make scheduler to fail the current run
> because of following code in the jobs.py file
> def _process_executor_events(self, simple_dag_bag, session=None):
> """
> Respond to executor events.
> """
> # TODO: this shares quite a lot of code with _manage_executor_state
> TI = models.TaskInstance
> *for key, state in
> list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)*
> *.items()):*
> dag_id, task_id, execution_date = key
> self.log.info(
> "Executor reports %s.%s execution_date=%s as %s",
> dag_id, task_id, execution_date, state
> )
> if state == State.FAILED or state == State.SUCCESS:
> qry = session.query(TI).filter(TI.dag_id == dag_id,
> TI.task_id == task_id,
> TI.execution_date == execution_date)
> ti = qry.first()
> if not ti:
> self.log.warning("TaskInstance %s went missing from the database", ti)
> continue
> TODO: should we fail RUNNING as well, as we do in Backfills?
> *if ti.state == State.QUEUED:*
> msg = ("Executor reports task instance %s finished (%s) "
> "although the task says its %s. Was the task "
> "killed externally?".format(ti, state, ti.state))
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)