raman created AIRFLOW-3136:
------------------------------
Summary: Scheduler Failing the Task retries run while processing
Executor Events
Key: AIRFLOW-3136
URL: https://issues.apache.org/jira/browse/AIRFLOW-3136
Project: Apache Airflow
Issue Type: Bug
Components: scheduler
Affects Versions: 1.9.0
Reporter: raman
Following behaviour is observed with Airflow 1.9 with LocalExecutor mode
Airflow scheduler processes the executor events in
"_process_executor_events(self, simple_dag_bag, session=None)" function of
jobs.py.
The events are identified by key which is composed of dag id, task id,
execution date. So all retries of a task have the same key.
If task retry interval is very small like 30 seconds than scheduler might
schedule the next retry run while the previous task run result is still in the
executor event queue.
Current task run might be in queued state while scheduler is processing the
executor's previous events Which might make scheduler to fail the current run
because of following code in the jobs.py file
def _process_executor_events(self, simple_dag_bag, session=None):
"""
Respond to executor events.
"""
# TODO: this shares quite a lot of code with _manage_executor_state
TI = models.TaskInstance
*for key, state in list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)*
*.items()):*
dag_id, task_id, execution_date = key
self.log.info(
"Executor reports %s.%s execution_date=%s as %s",
dag_id, task_id, execution_date, state
)
if state == State.FAILED or state == State.SUCCESS:
qry = session.query(TI).filter(TI.dag_id == dag_id,
TI.task_id == task_id,
TI.execution_date == execution_date)
ti = qry.first()
if not ti:
self.log.warning("TaskInstance %s went missing from the database", ti)
continue
TODO: should we fail RUNNING as well, as we do in Backfills?
*if ti.state == State.QUEUED:*
msg = ("Executor reports task instance %s finished (%s) "
"although the task says its %s. Was the task "
"killed externally?".format(ti, state, ti.state))
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)