mkhalaf2 opened a new issue #20047:
URL: https://github.com/apache/airflow/issues/20047
### Apache Airflow version
2.2.2 (latest released)
### Operating System
Red Hat Enterprise Linux Server 7.6 (Maipo)
### Versions of Apache Airflow Providers
apache-airflow-providers-celery==2.1.0
apache-airflow-providers-ftp==2.0.1
apache-airflow-providers-http==2.0.1
apache-airflow-providers-imap==2.0.1
apache-airflow-providers-postgres==2.3.0
apache-airflow-providers-sqlite==2.0.1
### Deployment
Virtualenv installation
### Deployment details
_No response_
### What happened
* ExternalTaskSensor tasks remain in queued state and never runs. Logs
indicate error:
`base_executor.py:85 ERROR - could not queue task
TaskInstanceKey(dag_id='test_dag_1_dag', task_id='feed_sensor',
run_id='manual__2021-12-01T22:50:55+00:00', try_number=1)`
In our test we have ~80 dags that we trigger and each DAG has an
ExternalTaskSensor that waits on a task on a different DAG.
Investigating the logs we noticed that the ExternalTaskSesnsor executed
once, then moved to `[up_for_reschedule]` and then when moving back to
`[queued]` we got the error.
2 hours later we restart the scheduler and it identifies the "queued" tasks
as from " dead executor" and kills them
```
INFO - Adopted the following 30 tasks from a dead executor
<TaskInstance: test_dag_1_dag.feed_sensor
manual__2021-12-01T22:50:55+00:00 [queued]> in state PENDING
<TaskInstance: test_dag_4_dag.feed_sensor
manual__2021-12-01T23:37:26+00:00 [queued]> in state PENDING
ERROR - Adopted tasks were still pending after 0:10:00, assuming they never
made it to celery and clearing:
TaskInstanceKey(dag_id='test_dag_1_dag', task_id='feed_sensor',
run_id='manual__2021-12-01T22:50:55+00:00', try_number=1)
TaskInstanceKey(dag_id='test_dag_4_dag', task_id='feed_sensor',
run_id='manual__2021-12-01T23:37:26+00:00', try_number=1)
INFO - Executor reports execution of test_dag_1_dag.feed_sensor
run_id=manual__2021-12-01T22:50:55+00:00 exited with status failed for
try_number 1
[2021-12-02 07:27:39,527] {scheduler_job.py:504} INFO - Executor reports
execution of test_dag_4_dag.feed_sensor
run_id=manual__2021-12-01T23:37:26+00:00 exited with status failed for
try_number 1
ERROR - Executor reports task instance <TaskInstance:
test_dag_1_dag.feed_sensor manual__2021-12-01T22:50:55+00:00 [queued]> finished
(failed) although the task says its queued. (Info: None) Was the task killed
externally?
[2021-12-02 07:27:39,577] {taskinstance.py:1705} ERROR - Executor reports
task instance <TaskInstance: test_dag_4_dag.feed_sensor
manual__2021-12-01T23:37:26+00:00 [queued]> finished (failed) although the task
says its queued. (Info: None) Was the task killed externally?
```
### What you expected to happen
Tasks should be queued and run successfully.
### How to reproduce
In my test I have 80 DAGs with the same definition as bellow: called A1, A2,
A80 respectively
```python
with DAG(dag_id=dag_id,
default_args=default_args,
schedule_interval=None,
max_active_runs=1,
render_template_as_native_obj=True,
) as dag:
feed_trigger = TriggerDagRunOperator(
task_id="feed_trigger",
trigger_dag_id="B",
execution_date="{{ execution_date }}",
conf="{{ dag_run.conf }}",
)
feed_sensor = ExternalTaskSensor(
task_id="feed_sensor",
external_dag_id="feeds_dag_test",
external_task_id="generic_feed_loader",
mode='reschedule',
allowed_states=[State.SUCCESS, State.SKIPPED],
failed_states=[State.FAILED, State.UPSTREAM_FAILED],
poke_interval=182,
)
```
DAGs A[1-80] all trigger DAG B which is a simple DAG with one task that
sleeps randomly between 1-2 minutes
In my test I've triggered DAGS A1-80 sequentially in a for loop.
We can consistently reproduce this error but a different number DAGs fail
each time.
We use CeleryExecutor, with PostgresSql backend, and rabbitmq queue.
We've run Airflow it DEBUG logging, it didn't show any extra warning/error.
When trying to grep the task ID in the worker logs we couldn't find any log for
the failed DAGs in the worker logs, for succeeded DAGs we could see their
worker logs.
### Anything else
We have tested the same flow in Airflow 1.10.15 and it ran successfully.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]