freedom1989 commented on issue #10790:
URL: https://github.com/apache/airflow/issues/10790#issuecomment-727719402
The cause is clear as @rafalkozik mentioned. After scheduler schedule the
task at the second time(put it in queue) and then it start process the executor
events of the task's first-try. It occurs when the scheduling loop time >
sensor task reschedule interval.
Either reducing the scheduler looping time(dag processing time, etc) or
increasing the sensor task reschedule interval will work.
The bug can also be fixed if the rescheduled task instance use a different
try number, but this will cause a lot of log files.
```
def _process_executor_events(self, simple_dag_bag, session=None):
# ...
if ti.try_number == try_number and ti.state == State.QUEUED:
# <-- try number for a sensor task is always the same
msg = ("Executor reports task instance {} finished ({}) "
"although the task says its {}. Was the task "
"killed externally?".format(ti, state, ti.state))
Stats.incr('scheduler.tasks.killed_externally')
self.log.error(msg)
try:
simple_dag = simple_dag_bag.get_dag(dag_id)
dagbag = models.DagBag(simple_dag.full_filepath)
dag = dagbag.get_dag(dag_id)
ti.task = dag.get_task(task_id)
ti.handle_failure(msg)
except Exception:
self.log.error("Cannot load the dag bag to handle
failure for %s"
". Setting task to FAILED without
callbacks or "
"retries. Do you have enough
resources?", ti)
ti.state = State.FAILED
session.merge(ti)
session.commit()
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]