romainfage commented on issue #19329:
URL: https://github.com/apache/airflow/issues/19329#issuecomment-968737040


   Hello all!
   
   When looking at the issue and logs, We may have understood why the task is 
set to `upstream_failed` whereas the task is retrying. (we are using Airflow 
2.1.3)
   
   We are getting this logs inside our scheduler:
   
   ```bash
   {kubernetes_executor.py:368} INFO - Attempting to finish pod; pod_id: 
POD_ID_1; state: failed; annotations: {'dag_id': 'DAG1', 'task_id': 'TASK_ID', 
'execution_date': '2021-10-29T13:05:00+00:00', 'try_number': '1'}
   {kubernetes_executor.py:546} INFO - Changing state of 
(TaskInstanceKey(dag_id='DAG1', task_id='TASK_ID', 
execution_date=datetime.datetime(2021, 10, 29, 13, 5, tzinfo=tzlocal()), 
try_number=1), <TaskInstanceState.FAILED: 'failed'>, 'POD_ID_1', 'airflow', 
'164023816') to failed
   {scheduler_job.py:611} INFO - Executor reports execution of DAG1.TASK_ID 
execution_date=2021-10-29 13:05:00+00:00 exited with status failed for 
try_number 1
   {scheduler_job.py:647} ERROR - Executor reports task instance <TaskInstance: 
DAG1.TASK_ID 2021-10-29 13:05:00+00:00 [queued]> finished (failed) although the 
task says its queued. (Info: None) Was the task killed externally?
   {scheduler_job.py:654} INFO - Setting task instance <TaskInstance: 
DAG1.TASK_ID 2021-10-29 13:05:00+00:00 [queued]> state to failed as reported by 
executor
   ```
   
   Our understanding is (as demonstrate @sweco), that K8S executors try to 
finished the pod that is not launched yet (maybe a timeout of any kind, I 
cannot currently pin point where and why the executors do that). That is why we 
see this log:
   
   ```bash
   {kubernetes_executor.py:368} INFO - Attempting to finish pod; pod_id: 
POD_ID_1; state: failed; annotations: {'dag_id': 'DAG1', 'task_id': 'TASK_ID', 
'execution_date': '2021-10-29T13:05:00+00:00', 'try_number': '1'}
   {kubernetes_executor.py:546} INFO - Changing state of 
(TaskInstanceKey(dag_id='DAG1', task_id='TASK_ID', 
execution_date=datetime.datetime(2021, 10, 29, 13, 5, tzinfo=tzlocal()), 
try_number=1), <TaskInstanceState.FAILED: 'failed'>, 'POD_ID_1', 'airflow', 
'164023816') to failed
   ```
   
   Afterward, the scheduler received an event from the scheduler that change 
the state of the dag to failed. We can see the reception of the event here:
   
   ```bash
   {scheduler_job.py:611} INFO - Executor reports execution of DAG1.TASK_ID 
execution_date=2021-10-29 13:05:00+00:00 exited with status failed for 
try_number 1
   ````
   
   Afterward, the scheduler try to handle the event, to do so it look at the 
current state inside the metadata db of the task. However, since the pod did 
not start yet, the state of the task is still `queued`. The scheduler think 
because the old state is `queued` and the new state is `failed`, it is not a 
classic behavior so someone external should have done something. That is why we 
see the log:
   
   ```bash
   {scheduler_job.py:647} ERROR - Executor reports task instance <TaskInstance: 
DAG1.TASK_ID 2021-10-29 13:05:00+00:00 [queued]> finished (failed) although the 
task says its queued. (Info: None) Was the task killed externally?
   ```
   We can notice the question from the scheduler `Was the task killed 
externally?`, meaning that it is did not sure to handle well the case.
   
   When looking at the code, we can pinpoint 
[here](https://github.com/apache/airflow/blob/2.1.3/airflow/jobs/scheduler_job.py#L641-L656).
  Preview:
   ```python
       @provide_session
       def _process_executor_events(self, session: Session = None) -> int:
               [...]
               if ti.try_number == buffer_key.try_number and ti.state == 
State.QUEUED:
                   Stats.incr('scheduler.tasks.killed_externally')
                   msg = (
                       "Executor reports task instance %s finished (%s) 
although the "
                       "task says its %s. (Info: %s) Was the task killed 
externally?"
                   )
                   self.log.error(msg, ti, state, ti.state, info)
   
                   request = TaskCallbackRequest(
                       full_filepath=ti.dag_model.fileloc,
                       simple_task_instance=SimpleTaskInstance(ti),
                       msg=msg % (ti, state, ti.state, info),
                   )
                   self.log.info('Setting task instance %s state to %s as 
reported by executor', ti, state)
                   ti.set_state(state)
                   self.processor_agent.send_callback_to_execute(request)
   ```
   
   By consequence, as the scheduler think something external marks the task as 
`failed` and the scheduler marks downstream tasks as `upstream_failed`. Then, 
afterward the task is retried, finished in success and is green in Airflow but 
no downstream tasks are launched since they are in state `upstream_failed`.
   
   I don't have currently enough insight on how the Airflow scheduler code 
really work, so I cannot provide a proper fix to this problem, but to fix the 
problem, may we do:
   
   * Provide another failure state to make difference between external task 
kill and airflow task kill?
   * Provide more information in the event received by the scheduler from the 
executor on the nature of the kill?
   * When the task retries, it clear the down stream task state, therefore they 
will not be in `upstream_failed` state?
   
   Thank you for reading me
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to