andreychernih opened a new issue #20460:
URL: https://github.com/apache/airflow/issues/20460
### Apache Airflow version
2.2.2
### What happened
I have a simple TimeDeltaSensorAsync task that is the very first task in the
DAG:
```
delay_sensor = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(hours=6))
# ...
delay_sensor.set_downstream(download_task)
download_task.set_downstream(process_task)
```
The intention of this task is to simply wait 6 hours before proceeding with
the rest of the DAG. I expect this task to always be either in the "waiting" or
"success" states. In almost 50% of the cases across my DAG-s in production,
this task is failing with "up_for_retry" state. I do have all my components up
and running correctly: scheduler and triggerer. They are healthy and not
restarting. This is a production system where all other DAG-s and tasks are
working correctly.
The only suspicious lines that I found in the log related to the task are:
```
[[34m2021-12-22 13:41:06,921[0m] {[34mbase_executor.py:[0m85} ERROR[0m
- could not queue task
TaskInstanceKey(dag_id='integration-choice-advantage-810', task_id='wait',
run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2)[0m
...
[[34m2021-12-22 13:41:06,932[0m] {[34mscheduler_job.py:[0m572} ERROR[0m
- Executor reports task instance <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> finished (success) although the task says its queued. (Info: None)
Was the task killed externally?[0m
[[34m2021-12-22 13:41:06,936[0m] {[34mtaskinstance.py:[0m1705} ERROR[0m
- Executor reports task instance <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> finished (success) although the task says its queued. (Info: None)
Was the task killed externally?[0m
```

Scheduler log:
```
[[34m2021-12-22 13:41:06,913[0m] {[34mscheduler_job.py:[0m288} INFO[0m
- 1 tasks up for execution:
<TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [scheduled]>[0m
[[34m2021-12-22 13:41:06,916[0m] {[34mscheduler_job.py:[0m317} INFO[0m
- Figuring out tasks to run in Pool(name=default_pool) with 1280 open slots and
1 task instances ready to be queued[0m
[[34m2021-12-22 13:41:06,916[0m] {[34mscheduler_job.py:[0m345} INFO[0m
- DAG integration-choice-advantage-810 has 0/2 running and queued tasks[0m
[[34m2021-12-22 13:41:06,918[0m] {[34mscheduler_job.py:[0m410} INFO[0m
- Setting the following tasks to queued state:
<TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [scheduled]>[0m
[[34m2021-12-22 13:41:06,921[0m] {[34mscheduler_job.py:[0m450} INFO[0m
- Sending TaskInstanceKey(dag_id='integration-choice-advantage-810',
task_id='wait', run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2) to
executor with priority 3 and queue default[0m
[[34m2021-12-22 13:41:06,921[0m] {[34mbase_executor.py:[0m85} ERROR[0m
- could not queue task
TaskInstanceKey(dag_id='integration-choice-advantage-810', task_id='wait',
run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2)[0m
[[34m2021-12-22 13:41:06,923[0m] {[34mbase_executor.py:[0m150} DEBUG[0m
- 1 running task instances[0m
[[34m2021-12-22 13:41:06,923[0m] {[34mbase_executor.py:[0m151} DEBUG[0m
- 0 in queue[0m
[[34m2021-12-22 13:41:06,923[0m] {[34mbase_executor.py:[0m152} DEBUG[0m
- 19 open slots[0m
[[34m2021-12-22 13:41:06,923[0m] {[34mbase_executor.py:[0m161} DEBUG[0m
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync
method[0m
[[34m2021-12-22 13:41:06,924[0m] {[34mbase_executor.py:[0m198} DEBUG[0m
- Changing state: TaskInstanceKey(dag_id='integration-choice-advantage-810',
task_id='wait', run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2)[0m
[[34m2021-12-22 13:41:06,924[0m] {[34mscheduler_job.py:[0m504} INFO[0m
- Executor reports execution of integration-choice-advantage-810.wait
run_id=scheduled__2021-12-20T08:00:00+00:00 exited with status success for
try_number 2[0m
[[34m2021-12-22 13:41:06,932[0m] {[34mscheduler_job.py:[0m547} INFO[0m
- TaskInstance Finished: dag_id=integration-choice-advantage-810, task_id=wait,
run_id=scheduled__2021-12-20T08:00:00+00:00, run_start_date=2021-12-22
13:41:01.836808+00:00, run_end_date=None, run_duration=0.328748, state=queued,
executor_state=success, try_number=2, max_tries=4, job_id=4517,
pool=default_pool, queue=default, priority_weight=3,
operator=TimeDeltaSensorAsync[0m
[[34m2021-12-22 13:41:06,932[0m] {[34mscheduler_job.py:[0m572} ERROR[0m
- Executor reports task instance <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> finished (success) although the task says its queued. (Info: None)
Was the task killed externally?[0m
[[34m2021-12-22 13:41:06,936[0m] {[34mtaskinstance.py:[0m1705} ERROR[0m
- Executor reports task instance <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> finished (success) although the task says its queued. (Info: None)
Was the task killed externally?[0m
[[34m2021-12-22 13:41:06,936[0m] {[34mtaskinstance.py:[0m720} DEBUG[0m
- Refreshing TaskInstance <TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [queued]> from DB[0m
[[34m2021-12-22 13:41:06,943[0m] {[34mtaskinstance.py:[0m761} DEBUG[0m
- Refreshed TaskInstance <TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [queued]>[0m
[[34m2021-12-22 13:41:06,943[0m] {[34mtaskinstance.py:[0m2224} DEBUG[0m
- Task Duration set to 5.106632[0m
[[34m2021-12-22 13:41:06,943[0m] {[34mtaskinstance.py:[0m1286} DEBUG[0m
- Clearing next_method and next_kwargs.[0m
[[34m2021-12-22 13:41:06,944[0m] {[34mtaskinstance.py:[0m1270} INFO[0m
- Marking task as UP_FOR_RETRY. dag_id=integration-choice-advantage-810,
task_id=wait, execution_date=20211220T080000, start_date=20211222T134101,
end_date=20211222T134106[0m
```
Triggerer log:
```
[2021-12-22 13:41:02,998] {triggerer_job.py:356} INFO - Trigger
<airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00>
(ID 499) starting
[2021-12-22 13:41:02,998] {triggerer_job.py:359} INFO - Trigger
<airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00>
(ID 499) fired: TriggerEvent
[2021-12-22 13:41:04,003] {triggerer_job.py:356} INFO - Trigger
<airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00>
(ID 499) starting
[2021-12-22 13:41:04,003] {triggerer_job.py:359} INFO - Trigger
<airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00>
(ID 499) fired: TriggerEvent
```
Task instance log:
```
[2021-12-22, 13:41:01 UTC] {base_task_runner.py:63} DEBUG - Planning to run
as the user
[2021-12-22, 13:41:01 UTC] {taskinstance.py:720} DEBUG - Refreshing
TaskInstance <TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [queued]> from DB
[2021-12-22, 13:41:01 UTC] {taskinstance.py:761} DEBUG - Refreshed
TaskInstance <TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [queued]>
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> dependency 'Task Instance State' PASSED: True, Task state queued was
valid.
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in
running state.
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not
have depends_on_past set.
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not
have any upstream tasks.
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was
not marked for retrying.
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1035} INFO - Dependencies all
met for <TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [queued]>
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not
set.
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> dependency 'Pool Slots Available' PASSED: True, ('There are enough
open slots in %s to execute the task', 'default_pool')
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not
have depends_on_past set.
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not
have any upstream tasks.
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00
[queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was
not marked for retrying.
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1035} INFO - Dependencies all
met for <TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [queued]>
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1241} INFO -
--------------------------------------------------------------------------------
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1242} INFO - Starting attempt 2
of 5
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1243} INFO -
--------------------------------------------------------------------------------
[2021-12-22, 13:41:01 UTC] {taskinstance.py:1262} INFO - Executing
<Task(TimeDeltaSensorAsync): wait> on 2021-12-20 08:00:00+00:00
[2021-12-22, 13:41:01 UTC] {standard_task_runner.py:52} INFO - Started
process 12810 to run task
[2021-12-22, 13:41:01 UTC] {standard_task_runner.py:76} INFO - Running:
['airflow', 'tasks', 'run', 'integration-choice-advantage-810', 'wait',
'scheduled__2021-12-20T08:00:00+00:00', '--job-id', '4517', '--raw',
'--subdir', 'DAGS_FOLDER/airflow-dag.py', '--cfg-path', '/tmp/tmpxq9fz1pw',
'--error-file', '/tmp/tmpc0cq8o26']
[2021-12-22, 13:41:01 UTC] {standard_task_runner.py:77} INFO - Job 4517:
Subtask wait
[2021-12-22, 13:41:01 UTC] {cli_action_loggers.py:66} DEBUG - Calling
callbacks: [<function default_action_log at 0x7ff646e4b670>]
[2021-12-22, 13:41:01 UTC] {settings.py:210} DEBUG - Setting up DB
connection pool (PID 12810)
[2021-12-22, 13:41:01 UTC] {settings.py:267} DEBUG -
settings.prepare_engine_args(): Using NullPool
[2021-12-22, 13:41:01 UTC] {logging_mixin.py:109} INFO - Running
<TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [running]> on host
airflow-scheduler-589549ff66-vnps5
[2021-12-22, 13:41:01 UTC] {taskinstance.py:720} DEBUG - Refreshing
TaskInstance <TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [running]> from DB
[2021-12-22, 13:41:01 UTC] {taskinstance.py:761} DEBUG - Refreshed
TaskInstance <TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [running]>
[2021-12-22, 13:41:01 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
[2021-12-22, 13:41:01 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
[2021-12-22, 13:41:02 UTC] {taskinstance.py:1427} INFO - Exporting the
following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=integration-choice-advantage-810
AIRFLOW_CTX_TASK_ID=wait
AIRFLOW_CTX_EXECUTION_DATE=2021-12-20T08:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-20T08:00:00+00:00
[2021-12-22, 13:41:02 UTC] {__init__.py:146} DEBUG - Preparing lineage
inlets and outlets
[2021-12-22, 13:41:02 UTC] {__init__.py:190} DEBUG - inlets: [], outlets: []
[2021-12-22, 13:41:02 UTC] {taskinstance.py:1340} INFO - Pausing task as
DEFERRED. dag_id=integration-choice-advantage-810, task_id=wait,
execution_date=20211220T080000, start_date=20211222T134101
[2021-12-22, 13:41:02 UTC] {cli_action_loggers.py:84} DEBUG - Calling
callbacks: []
[2021-12-22, 13:41:02 UTC] {local_task_job.py:154} INFO - Task exited with
return code 0
[2021-12-22, 13:41:02 UTC] {taskinstance.py:720} DEBUG - Refreshing
TaskInstance <TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [running]> from DB
[2021-12-22, 13:41:02 UTC] {taskinstance.py:761} DEBUG - Refreshed
TaskInstance <TaskInstance: integration-choice-advantage-810.wait
scheduled__2021-12-20T08:00:00+00:00 [deferred]>
[2021-12-22, 13:41:02 UTC] {dagrun.py:619} DEBUG - number of tis tasks for
<DagRun integration-choice-advantage-810 @ 2021-12-20 08:00:00+00:00:
scheduled__2021-12-20T08:00:00+00:00, externally triggered: False>: 3 task(s)
[2021-12-22, 13:41:02 UTC] {dagrun.py:634} DEBUG - number of scheduleable
tasks for <DagRun integration-choice-advantage-810 @ 2021-12-20 08:00:00+00:00:
scheduled__2021-12-20T08:00:00+00:00, externally triggered: False>: 2 task(s)
[2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.download_data
scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Previous Dagrun State'
PASSED: True, The task did not have depends_on_past set.
[2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.download_data
scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED:
False, Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'wait'}
[2021-12-22, 13:41:02 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: integration-choice-advantage-810.download_data
scheduled__2021-12-20T08:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'wait'}
[2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.download_data
scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Not In Retry Period'
PASSED: True, The task instance was not marked for retrying.
[2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.process scheduled__2021-12-20T08:00:00+00:00
[None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have
depends_on_past set.
[2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.process scheduled__2021-12-20T08:00:00+00:00
[None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule
'all_success' requires all upstream tasks to have succeeded, but found 1
non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped':
0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'download_data'}
[2021-12-22, 13:41:02 UTC] {taskinstance.py:1025} DEBUG - Dependencies not
met for <TaskInstance: integration-choice-advantage-810.process
scheduled__2021-12-20T08:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED:
Task's trigger rule 'all_success' requires all upstream tasks to have
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1,
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0},
upstream_task_ids={'download_data'}
[2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance:
integration-choice-advantage-810.process scheduled__2021-12-20T08:00:00+00:00
[None]> dependency 'Not In Retry Period' PASSED: True, The task instance was
not marked for retrying.
[2021-12-22, 13:41:02 UTC] {local_task_job.py:264} INFO - 0 downstream tasks
scheduled from follow-on schedule check
```
### What you expected to happen
TimeDeltaSensorAsync task should never fail
### How to reproduce
_No response_
### Operating System
Airflow Docker image
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other Docker-based deployment
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]