rafalkozik commented on issue #10790:
URL: https://github.com/apache/airflow/issues/10790#issuecomment-702598519


   We have just introduced _ExternalTaskSensor_ into our pipeline and faced the 
same issue. When initially tested on our dev instance (~200 DAGs) it worked 
fine, after running it on our prod environment (~400 DAGs) it was always 
failing after reschedule.
   
   After digging into the code, it looks that this is simply race condition in 
the scheduler.
   
   We have _child_dag.parent_dag_completed_ task that waits for business 
process to complete calculations in _parent_dag_, task execution logs:
   
   ```
   [2020-10-01 11:48:03,038] {taskinstance.py:669} INFO - Dependencies all met 
for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 
[queued]>
   [2020-10-01 11:48:03,065] {taskinstance.py:669} INFO - Dependencies all met 
for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 
[queued]>
   [2020-10-01 11:48:03,066] {taskinstance.py:879} INFO - 
   
--------------------------------------------------------------------------------
   [2020-10-01 11:48:03,066] {taskinstance.py:880} INFO - Starting attempt 1 of 
1
   [2020-10-01 11:48:03,066] {taskinstance.py:881} INFO - 
   
--------------------------------------------------------------------------------
   [2020-10-01 11:48:03,095] {taskinstance.py:900} INFO - Executing 
<Task(ExternalTaskSensor): parent_dag_completed> on 2020-09-30T11:45:00+00:00
   [2020-10-01 11:48:03,100] {standard_task_runner.py:53} INFO - Started 
process 26131 to run task
   [2020-10-01 11:48:03,235] {logging_mixin.py:112} INFO - Running %s on host 
%s <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 
[running]> ip-10-200-100-113.eu-west-1.compute.internal
   [2020-10-01 11:48:03,318] {external_task_sensor.py:117} INFO - Poking for 
parent_dag on 2020-09-30T11:45:00+00:00 ... 
   [2020-10-01 11:48:03,397] {taskinstance.py:1136} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2020-10-01 11:48:12,994] {logging_mixin.py:112} INFO - [2020-10-01 
11:48:12,993] {local_task_job.py:103} INFO - Task exited with return code 0
   [2020-10-01 11:50:53,744] {taskinstance.py:663} INFO - Dependencies not met 
for <TaskInstance: child_dag.parent_dag_completed 2020-09-30T11:45:00+00:00 
[failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' 
state which is not a valid state for execution. The task must be cleared in 
order to be run.
   [2020-10-01 11:50:53,747] {logging_mixin.py:112} INFO - [2020-10-01 
11:50:53,747] {local_task_job.py:91} INFO - Task is not able to be run
   ```
   
   Scheduler logs:
   
   ```
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 
[scheduled]>
   [2020-10-01 11:47:59,428] {scheduler_job.py:1010} INFO - DAG child_dag has 
0/16 running and queued tasks
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 
[scheduled]>
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 
[queued]>
   [2020-10-01 11:47:59,565] {scheduler_job.py:1170} INFO - Sending 
('child_dag', 'parent_dag_completed', datetime.datetime(2020, 9, 30, 11, 45, 
tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
   [2020-10-01 11:47:59,565] {base_executor.py:58} INFO - Adding to queue: 
['airflow', 'run', 'child_dag', 'parent_dag_completed', 
'2020-09-30T11:45:00+00:00', '--local', '--pool', 'default_pool', '-sd', 
'/usr/local/airflow/dags/291a327d-5d46-4cf5-87cf-4bad036f56fa_1.py']
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 
[scheduled]>
   [2020-10-01 11:50:50,118] {scheduler_job.py:1010} INFO - DAG child_dag has 
0/16 running and queued tasks
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 
[scheduled]>
   <TaskInstance: child_dag.parent_dag_completed 2020-09-30 11:45:00+00:00 
[queued]>
   [2020-10-01 11:50:50,148] {scheduler_job.py:1170} INFO - Sending 
('child_dag', 'parent_dag_completed', datetime.datetime(2020, 9, 30, 11, 45, 
tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
   [2020-10-01 11:50:50,148] {base_executor.py:58} INFO - Adding to queue: 
['airflow', 'run', 'child_dag', 'parent_dag_completed', 
'2020-09-30T11:45:00+00:00', '--local', '--pool', 'default_pool', '-sd', 
'/usr/local/airflow/dags/291a327d-5d46-4cf5-87cf-4bad036f56fa_1.py']
   [2020-10-01 11:50:50,595] {scheduler_job.py:1313} INFO - Executor reports 
execution of child_dag.parent_dag_completed execution_date=2020-09-30 
11:45:00+00:00 exited with status success for try_number 1
   [2020-10-01 11:50:50,599] {scheduler_job.py:1330} ERROR - Executor reports 
task instance <TaskInstance: child_dag.parent_dag_completed 2020-09-30 
11:45:00+00:00 [queued]> finished (success) although the task says its queued. 
Was the task killed externally?
   [2020-10-01 11:50:50,803] {taskinstance.py:1145} ERROR - Executor reports 
task instance <TaskInstance: child_dag.parent_dag_completed 2020-09-30 
11:45:00+00:00 [queued]> finished (success) although the task says its queued. 
Was the task killed externally?
   [2020-10-01 11:50:50,804] {taskinstance.py:1202} INFO - Marking task as 
FAILED.dag_id=child_dag, task_id=parent_dag_completed, 
execution_date=20200930T114500, start_date=20201001T114803, 
end_date=20201001T115050
   ```
   
   From scheduler log it's visible that event from executor is processed after 
task is already queued for the second time.
   
   Logic related to those logs is here:
   
   ```python
       def _validate_and_run_task_instances(self, simple_dag_bag):
           if len(simple_dag_bag.simple_dags) > 0:
               try:
                   self._process_and_execute_tasks(simple_dag_bag) # <-- task 
state is changed to queued here
               except Exception as e:
                   self.log.error("Error queuing tasks")
                   self.log.exception(e)
                   return False
   
           # Call heartbeats
           self.log.debug("Heartbeating the executor")
           self.executor.heartbeat()
   
           self._change_state_for_tasks_failed_to_execute()
   
           # Process events from the executor
           self._process_executor_events(simple_dag_bag) # <-- notification of 
previous execution is processed and there is state mismatch 
           return True
   ```
           
   This is the place where task state is changes:
   
   ```python
       def _process_executor_events(self, simple_dag_bag, session=None):
          
          # ...
          
                   if ti.try_number == try_number and ti.state == State.QUEUED:
                       msg = ("Executor reports task instance {} finished ({}) "
                              "although the task says its {}. Was the task "
                              "killed externally?".format(ti, state, ti.state))
                       Stats.incr('scheduler.tasks.killed_externally')
                       self.log.error(msg)
                       try:
                           simple_dag = simple_dag_bag.get_dag(dag_id)
                           dagbag = models.DagBag(simple_dag.full_filepath)
                           dag = dagbag.get_dag(dag_id)
                           ti.task = dag.get_task(task_id)
                           ti.handle_failure(msg)
                       except Exception:
                           self.log.error("Cannot load the dag bag to handle 
failure for %s"
                                          ". Setting task to FAILED without 
callbacks or "
                                          "retries. Do you have enough 
resources?", ti)
                           ti.state = State.FAILED
                           session.merge(ti)
                           session.commit()
   ```
                  
   Unfortunately I think that moving __process_executor_events_ before 
__process_and_execute_tasks_ would not solve the issue as event might arrive 
from executor while __process_and_execute_tasks_ is executing. Increasing 
_poke_interval_ reduces chance of this race condition happening when scheduler 
is under a heavy load.
   
   I'm not too familiar with Airflow code base, but it seems that the root 
cause is the way how reschedule works and the fact that _try_number_ is not 
changing. Because of that scheduler thinks that event for past execution is for 
the ongoing one.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to