rafalkozik commented on issue #10790:
URL: https://github.com/apache/airflow/issues/10790#issuecomment-702598519
We have just introduced _ExternalTaskSensor_ into our pipeline and faced the
same issue. When initially tested on our dev instance (~200 DAGs) it worked
fine, after running it on our prod environment (~400 DAGs) it was always
failing after reschedule.
After digging into the code, it looks that this is simply race condition in
the scheduler.
We have _child_dag.parent_dag_completed_ task that waits for business
process to complete calculations in _parent_dag_, task execution logs:
```
[2020-10-01 11:48:03,038] {taskinstance.py:669} INFO - Dependencies all met
for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00
[queued]>
[2020-10-01 11:48:03,065] {taskinstance.py:669} INFO - Dependencies all met
for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00
[queued]>
[2020-10-01 11:48:03,066] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-10-01 11:48:03,066] {taskinstance.py:880} INFO - Starting attempt 1 of
1
[2020-10-01 11:48:03,066] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-10-01 11:48:03,095] {taskinstance.py:900} INFO - Executing
<Task(ExternalTaskSensor): parent_dag_completed> on 2020-09-30T11:45:00+00:00
[2020-10-01 11:48:03,100] {standard_task_runner.py:53} INFO - Started
process 26131 to run task
[2020-10-01 11:48:03,235] {logging_mixin.py:112} INFO - Running %s on host
%s <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00
[running]> ip-10-200-100-113.eu-west-1.compute.internal
[2020-10-01 11:48:03,318] {external_task_sensor.py:117} INFO - Poking for
parent_dag on 2020-09-30T11:45:00+00:00 ...
[2020-10-01 11:48:03,397] {taskinstance.py:1136} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2020-10-01 11:48:12,994] {logging_mixin.py:112} INFO - [2020-10-01
11:48:12,993] {local_task_job.py:103} INFO - Task exited with return code 0
[2020-10-01 11:50:53,744] {taskinstance.py:663} INFO - Dependencies not met
for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00
[failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed'
state which is not a valid state for execution. The task must be cleared in
order to be run.
[2020-10-01 11:50:53,747] {logging_mixin.py:112} INFO - [2020-10-01
11:50:53,747] {local_task_job.py:91} INFO - Task is not able to be run
```
Scheduler logs:
```
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00
[scheduled]>
[2020-10-01 11:47:59,428] {scheduler_job.py:1010} INFO - DAG child_dag has
0/16 running and queued tasks
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00
[scheduled]>
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00
[queued]>
[2020-10-01 11:47:59,565] {scheduler_job.py:1170} INFO - Sending
('child_dag', 'parent_dag_completed', datetime.datetime(2020, 9, 30, 11, 45,
tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2020-10-01 11:47:59,565] {base_executor.py:58} INFO - Adding to queue:
['airflow', 'run', 'child_dag', 'parent_dag_completed',
'2020-09-30T11:45:00+00:00', '--local', '--pool', 'default_pool', '-sd',
'/usr/local/airflow/dags/291a327d-5d46-4cf5-87cf-4bad036f56fa_1.py']
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00
[scheduled]>
[2020-10-01 11:50:50,118] {scheduler_job.py:1010} INFO - DAG child_dag has
0/16 running and queued tasks
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00
[scheduled]>
<TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00
[queued]>
[2020-10-01 11:50:50,148] {scheduler_job.py:1170} INFO - Sending
('child_dag', 'parent_dag_completed', datetime.datetime(2020, 9, 30, 11, 45,
tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2020-10-01 11:50:50,148] {base_executor.py:58} INFO - Adding to queue:
['airflow', 'run', 'child_dag', 'parent_dag_completed',
'2020-09-30T11:45:00+00:00', '--local', '--pool', 'default_pool', '-sd',
'/usr/local/airflow/dags/291a327d-5d46-4cf5-87cf-4bad036f56fa_1.py']
[2020-10-01 11:50:50,595] {scheduler_job.py:1313} INFO - Executor reports
execution of child_dag.parent_dag_completed execution_date=2020-09-30
11:45:00+00:00 exited with status success for try_number 1
[2020-10-01 11:50:50,599] {scheduler_job.py:1330} ERROR - Executor reports
task instance <TaskInstance: child_dag.parent_dag_completed 2020-09-30
11:45:00+00:00 [queued]> finished (success) although the task says its queued.
Was the task killed externally?
[2020-10-01 11:50:50,803] {taskinstance.py:1145} ERROR - Executor reports
task instance <TaskInstance: child_dag.parent_dag_completed 2020-09-30
11:45:00+00:00 [queued]> finished (success) although the task says its queued.
Was the task killed externally?
[2020-10-01 11:50:50,804] {taskinstance.py:1202} INFO - Marking task as
FAILED.dag_id=child_dag, task_id=parent_dag_completed,
execution_date=20200930T114500, start_date=20201001T114803,
end_date=20201001T115050
```
From scheduler log it's visible that event from executor is processed after
task is already queued for the second time.
Logic related to those logs is here:
```python
def _validate_and_run_task_instances(self, simple_dag_bag):
if len(simple_dag_bag.simple_dags) > 0:
try:
self._process_and_execute_tasks(simple_dag_bag) # <-- task
state is changed to queued here
except Exception as e:
self.log.error("Error queuing tasks")
self.log.exception(e)
return False
# Call heartbeats
self.log.debug("Heartbeating the executor")
self.executor.heartbeat()
self._change_state_for_tasks_failed_to_execute()
# Process events from the executor
self._process_executor_events(simple_dag_bag) # <-- notification of
previous execution is processed and there is state mismatch
return True
```
This is the place where task state is changes:
```python
def _process_executor_events(self, simple_dag_bag, session=None):
# ...
if ti.try_number == try_number and ti.state == State.QUEUED:
msg = ("Executor reports task instance {} finished ({}) "
"although the task says its {}. Was the task "
"killed externally?".format(ti, state, ti.state))
Stats.incr('scheduler.tasks.killed_externally')
self.log.error(msg)
try:
simple_dag = simple_dag_bag.get_dag(dag_id)
dagbag = models.DagBag(simple_dag.full_filepath)
dag = dagbag.get_dag(dag_id)
ti.task = dag.get_task(task_id)
ti.handle_failure(msg)
except Exception:
self.log.error("Cannot load the dag bag to handle
failure for %s"
". Setting task to FAILED without
callbacks or "
"retries. Do you have enough
resources?", ti)
ti.state = State.FAILED
session.merge(ti)
session.commit()
```
Unfortunately I think that moving __process_executor_events_ before
__process_and_execute_tasks_ would not solve the issue as event might arrive
from executor while __process_and_execute_tasks_ is executing. Increasing
_poke_interval_ reduces chance of this race condition happening when scheduler
is under a heavy load.
I'm not too familiar with Airflow code base, but it seems that the root
cause is the way how reschedule works and the fact that _try_number_ is not
changing. Because of that scheduler thinks that event for past execution is for
the ongoing one.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]