andreychernih opened a new issue #20460:
URL: https://github.com/apache/airflow/issues/20460


   ### Apache Airflow version
   
   2.2.2
   
   ### What happened
   
   I have a simple TimeDeltaSensorAsync task that is the very first task in the 
DAG:
   
   ```
   delay_sensor = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(hours=6))
   
   # ...
   
   delay_sensor.set_downstream(download_task)
   download_task.set_downstream(process_task)
   ```
   
   The intention of this task is to simply wait 6 hours before proceeding with 
the rest of the DAG. I expect this task to always be either in the "waiting" or 
"success" states. In almost 50% of the cases across my DAG-s in production, 
this task is failing with "up_for_retry" state. I do have all my components up 
and running correctly: scheduler and triggerer. They are healthy and not 
restarting. This is a production system where all other DAG-s and tasks are 
working correctly.
   
   The only suspicious lines that I found in the log related to the task are:
   
   ```
   [2021-12-22 13:41:06,921] {base_executor.py:85} ERROR 
- could not queue task 
TaskInstanceKey(dag_id='integration-choice-advantage-810', task_id='wait', 
run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2)
   ...
   [2021-12-22 13:41:06,932] {scheduler_job.py:572} ERROR 
- Executor reports task instance <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> finished (success) although the task says its queued. (Info: None) 
Was the task killed externally?
   [2021-12-22 13:41:06,936] {taskinstance.py:1705} ERROR 
- Executor reports task instance <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> finished (success) although the task says its queued. (Info: None) 
Was the task killed externally?
   ```
   
   
![image](https://user-images.githubusercontent.com/131281/147102191-29b22e70-bda6-41c9-a7a9-7e0cacdd8de2.png)
   
   
   Scheduler log:
   
   ```
   [2021-12-22 13:41:06,913] {scheduler_job.py:288} INFO 
- 1 tasks up for execution:
        <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [scheduled]>
   [2021-12-22 13:41:06,916] {scheduler_job.py:317} INFO 
- Figuring out tasks to run in Pool(name=default_pool) with 1280 open slots and 
1 task instances ready to be queued
   [2021-12-22 13:41:06,916] {scheduler_job.py:345} INFO 
- DAG integration-choice-advantage-810 has 0/2 running and queued tasks
   [2021-12-22 13:41:06,918] {scheduler_job.py:410} INFO 
- Setting the following tasks to queued state:
        <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [scheduled]>
   [2021-12-22 13:41:06,921] {scheduler_job.py:450} INFO 
- Sending TaskInstanceKey(dag_id='integration-choice-advantage-810', 
task_id='wait', run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2) to 
executor with priority 3 and queue default
   [2021-12-22 13:41:06,921] {base_executor.py:85} ERROR 
- could not queue task 
TaskInstanceKey(dag_id='integration-choice-advantage-810', task_id='wait', 
run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2)
   [2021-12-22 13:41:06,923] {base_executor.py:150} DEBUG 
- 1 running task instances
   [2021-12-22 13:41:06,923] {base_executor.py:151} DEBUG 
- 0 in queue
   [2021-12-22 13:41:06,923] {base_executor.py:152} DEBUG 
- 19 open slots
   [2021-12-22 13:41:06,923] {base_executor.py:161} DEBUG 
- Calling the <class 'airflow.executors.local_executor.LocalExecutor'> sync 
method
   [2021-12-22 13:41:06,924] {base_executor.py:198} DEBUG 
- Changing state: TaskInstanceKey(dag_id='integration-choice-advantage-810', 
task_id='wait', run_id='scheduled__2021-12-20T08:00:00+00:00', try_number=2)
   [2021-12-22 13:41:06,924] {scheduler_job.py:504} INFO 
- Executor reports execution of integration-choice-advantage-810.wait 
run_id=scheduled__2021-12-20T08:00:00+00:00 exited with status success for 
try_number 2
   [2021-12-22 13:41:06,932] {scheduler_job.py:547} INFO 
- TaskInstance Finished: dag_id=integration-choice-advantage-810, task_id=wait, 
run_id=scheduled__2021-12-20T08:00:00+00:00, run_start_date=2021-12-22 
13:41:01.836808+00:00, run_end_date=None, run_duration=0.328748, state=queued, 
executor_state=success, try_number=2, max_tries=4, job_id=4517, 
pool=default_pool, queue=default, priority_weight=3, 
operator=TimeDeltaSensorAsync
   [2021-12-22 13:41:06,932] {scheduler_job.py:572} ERROR 
- Executor reports task instance <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> finished (success) although the task says its queued. (Info: None) 
Was the task killed externally?
   [2021-12-22 13:41:06,936] {taskinstance.py:1705} ERROR 
- Executor reports task instance <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> finished (success) although the task says its queued. (Info: None) 
Was the task killed externally?
   [2021-12-22 13:41:06,936] {taskinstance.py:720} DEBUG 
- Refreshing TaskInstance <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [queued]> from DB
   [2021-12-22 13:41:06,943] {taskinstance.py:761} DEBUG 
- Refreshed TaskInstance <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [queued]>
   [2021-12-22 13:41:06,943] {taskinstance.py:2224} DEBUG 
- Task Duration set to 5.106632
   [2021-12-22 13:41:06,943] {taskinstance.py:1286} DEBUG 
- Clearing next_method and next_kwargs.
   [2021-12-22 13:41:06,944] {taskinstance.py:1270} INFO 
- Marking task as UP_FOR_RETRY. dag_id=integration-choice-advantage-810, 
task_id=wait, execution_date=20211220T080000, start_date=20211222T134101, 
end_date=20211222T134106
   ```
   
   Triggerer log:
   
   ```
   [2021-12-22 13:41:02,998] {triggerer_job.py:356} INFO - Trigger 
<airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00> 
(ID 499) starting
   [2021-12-22 13:41:02,998] {triggerer_job.py:359} INFO - Trigger 
<airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00> 
(ID 499) fired: TriggerEvent
   [2021-12-22 13:41:04,003] {triggerer_job.py:356} INFO - Trigger 
<airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00> 
(ID 499) starting
   [2021-12-22 13:41:04,003] {triggerer_job.py:359} INFO - Trigger 
<airflow.triggers.temporal.DateTimeTrigger moment=2021-12-21T14:00:00+00:00> 
(ID 499) fired: TriggerEvent
   ```
   
   Task instance log:
   
   ```
   [2021-12-22, 13:41:01 UTC] {base_task_runner.py:63} DEBUG - Planning to run 
as the  user
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:720} DEBUG - Refreshing 
TaskInstance <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [queued]> from DB
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:761} DEBUG - Refreshed 
TaskInstance <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [queued]>
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> dependency 'Task Instance State' PASSED: True, Task state queued was 
valid.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in 
running state.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not 
have depends_on_past set.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not 
have any upstream tasks.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was 
not marked for retrying.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1035} INFO - Dependencies all 
met for <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [queued]>
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not 
set.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> dependency 'Pool Slots Available' PASSED: True, ('There are enough 
open slots in %s to execute the task', 'default_pool')
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not 
have depends_on_past set.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> dependency 'Trigger Rule' PASSED: True, The task instance did not 
have any upstream tasks.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.wait scheduled__2021-12-20T08:00:00+00:00 
[queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was 
not marked for retrying.
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1035} INFO - Dependencies all 
met for <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [queued]>
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1241} INFO - 
   
--------------------------------------------------------------------------------
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1242} INFO - Starting attempt 2 
of 5
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1243} INFO - 
   
--------------------------------------------------------------------------------
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:1262} INFO - Executing 
<Task(TimeDeltaSensorAsync): wait> on 2021-12-20 08:00:00+00:00
   [2021-12-22, 13:41:01 UTC] {standard_task_runner.py:52} INFO - Started 
process 12810 to run task
   [2021-12-22, 13:41:01 UTC] {standard_task_runner.py:76} INFO - Running: 
['airflow', 'tasks', 'run', 'integration-choice-advantage-810', 'wait', 
'scheduled__2021-12-20T08:00:00+00:00', '--job-id', '4517', '--raw', 
'--subdir', 'DAGS_FOLDER/airflow-dag.py', '--cfg-path', '/tmp/tmpxq9fz1pw', 
'--error-file', '/tmp/tmpc0cq8o26']
   [2021-12-22, 13:41:01 UTC] {standard_task_runner.py:77} INFO - Job 4517: 
Subtask wait
   [2021-12-22, 13:41:01 UTC] {cli_action_loggers.py:66} DEBUG - Calling 
callbacks: [<function default_action_log at 0x7ff646e4b670>]
   [2021-12-22, 13:41:01 UTC] {settings.py:210} DEBUG - Setting up DB 
connection pool (PID 12810)
   [2021-12-22, 13:41:01 UTC] {settings.py:267} DEBUG - 
settings.prepare_engine_args(): Using NullPool
   [2021-12-22, 13:41:01 UTC] {logging_mixin.py:109} INFO - Running 
<TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [running]> on host 
airflow-scheduler-589549ff66-vnps5
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:720} DEBUG - Refreshing 
TaskInstance <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [running]> from DB
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:761} DEBUG - Refreshed 
TaskInstance <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [running]>
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:790} DEBUG - Clearing XCom data
   [2021-12-22, 13:41:01 UTC] {taskinstance.py:797} DEBUG - XCom data cleared
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1427} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=integration-choice-advantage-810
   AIRFLOW_CTX_TASK_ID=wait
   AIRFLOW_CTX_EXECUTION_DATE=2021-12-20T08:00:00+00:00
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-20T08:00:00+00:00
   [2021-12-22, 13:41:02 UTC] {__init__.py:146} DEBUG - Preparing lineage 
inlets and outlets
   [2021-12-22, 13:41:02 UTC] {__init__.py:190} DEBUG - inlets: [], outlets: []
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1340} INFO - Pausing task as 
DEFERRED. dag_id=integration-choice-advantage-810, task_id=wait, 
execution_date=20211220T080000, start_date=20211222T134101
   [2021-12-22, 13:41:02 UTC] {cli_action_loggers.py:84} DEBUG - Calling 
callbacks: []
   [2021-12-22, 13:41:02 UTC] {local_task_job.py:154} INFO - Task exited with 
return code 0
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:720} DEBUG - Refreshing 
TaskInstance <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [running]> from DB
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:761} DEBUG - Refreshed 
TaskInstance <TaskInstance: integration-choice-advantage-810.wait 
scheduled__2021-12-20T08:00:00+00:00 [deferred]>
   [2021-12-22, 13:41:02 UTC] {dagrun.py:619} DEBUG - number of tis tasks for 
<DagRun integration-choice-advantage-810 @ 2021-12-20 08:00:00+00:00: 
scheduled__2021-12-20T08:00:00+00:00, externally triggered: False>: 3 task(s)
   [2021-12-22, 13:41:02 UTC] {dagrun.py:634} DEBUG - number of scheduleable 
tasks for <DagRun integration-choice-advantage-810 @ 2021-12-20 08:00:00+00:00: 
scheduled__2021-12-20T08:00:00+00:00, externally triggered: False>: 2 task(s)
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.download_data 
scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Previous Dagrun State' 
PASSED: True, The task did not have depends_on_past set.
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.download_data 
scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Trigger Rule' PASSED: 
False, Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'wait'}
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: integration-choice-advantage-810.download_data 
scheduled__2021-12-20T08:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'wait'}
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.download_data 
scheduled__2021-12-20T08:00:00+00:00 [None]> dependency 'Not In Retry Period' 
PASSED: True, The task instance was not marked for retrying.
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.process scheduled__2021-12-20T08:00:00+00:00 
[None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have 
depends_on_past set.
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.process scheduled__2021-12-20T08:00:00+00:00 
[None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 
0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'download_data'}
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1025} DEBUG - Dependencies not 
met for <TaskInstance: integration-choice-advantage-810.process 
scheduled__2021-12-20T08:00:00+00:00 [None]>, dependency 'Trigger Rule' FAILED: 
Task's trigger rule 'all_success' requires all upstream tasks to have 
succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 
'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, 
upstream_task_ids={'download_data'}
   [2021-12-22, 13:41:02 UTC] {taskinstance.py:1045} DEBUG - <TaskInstance: 
integration-choice-advantage-810.process scheduled__2021-12-20T08:00:00+00:00 
[None]> dependency 'Not In Retry Period' PASSED: True, The task instance was 
not marked for retrying.
   [2021-12-22, 13:41:02 UTC] {local_task_job.py:264} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   ### What you expected to happen
   
   TimeDeltaSensorAsync task should never fail
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   Airflow Docker image
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to