mkhalaf2 opened a new issue #20047:
URL: https://github.com/apache/airflow/issues/20047


   ### Apache Airflow version
   
   2.2.2 (latest released)
   
   ### Operating System
   
   Red Hat Enterprise Linux Server 7.6 (Maipo)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-celery==2.1.0
   apache-airflow-providers-ftp==2.0.1
   apache-airflow-providers-http==2.0.1
   apache-airflow-providers-imap==2.0.1
   apache-airflow-providers-postgres==2.3.0
   apache-airflow-providers-sqlite==2.0.1
   
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   * ExternalTaskSensor tasks remain in queued state and never runs. Logs 
indicate error:
   `base_executor.py:85 ERROR - could not queue task 
TaskInstanceKey(dag_id='test_dag_1_dag', task_id='feed_sensor', 
run_id='manual__2021-12-01T22:50:55+00:00', try_number=1)`
   
   In our test we have ~80 dags that we trigger and each DAG has an 
ExternalTaskSensor that waits on a task on a different DAG.
   Investigating the logs we noticed that the ExternalTaskSesnsor executed 
once, then moved to `[up_for_reschedule]` and then when moving back to 
`[queued]` we got the error.
   
   2 hours later we restart the scheduler and it identifies the "queued" tasks 
as from " dead executor" and kills them
   ```
   INFO - Adopted the following 30 tasks from a dead executor
           <TaskInstance: test_dag_1_dag.feed_sensor 
manual__2021-12-01T22:50:55+00:00 [queued]> in state PENDING
           <TaskInstance: test_dag_4_dag.feed_sensor 
manual__2021-12-01T23:37:26+00:00 [queued]> in state PENDING
           
   ERROR - Adopted tasks were still pending after 0:10:00, assuming they never 
made it to celery and clearing:
           TaskInstanceKey(dag_id='test_dag_1_dag', task_id='feed_sensor', 
run_id='manual__2021-12-01T22:50:55+00:00', try_number=1)
           TaskInstanceKey(dag_id='test_dag_4_dag', task_id='feed_sensor', 
run_id='manual__2021-12-01T23:37:26+00:00', try_number=1)
           
           
   INFO - Executor reports execution of test_dag_1_dag.feed_sensor 
run_id=manual__2021-12-01T22:50:55+00:00 exited with status failed for 
try_number 1
   [2021-12-02 07:27:39,527] {scheduler_job.py:504} INFO - Executor reports 
execution of test_dag_4_dag.feed_sensor 
run_id=manual__2021-12-01T23:37:26+00:00 exited with status failed for 
try_number 1
   
   ERROR - Executor reports task instance <TaskInstance: 
test_dag_1_dag.feed_sensor manual__2021-12-01T22:50:55+00:00 [queued]> finished 
(failed) although the task says its queued. (Info: None) Was the task killed 
externally?
   [2021-12-02 07:27:39,577] {taskinstance.py:1705} ERROR - Executor reports 
task instance <TaskInstance: test_dag_4_dag.feed_sensor 
manual__2021-12-01T23:37:26+00:00 [queued]> finished (failed) although the task 
says its queued. (Info: None) Was the task killed externally?
   
   ```
   
   
   
   
   
   ### What you expected to happen
   
   Tasks should be queued and run successfully.
   
   ### How to reproduce
   
   In my test I have 80 DAGs with the same definition as bellow: called A1, A2, 
A80 respectively
   ```python
   with DAG(dag_id=dag_id,
                default_args=default_args,
                schedule_interval=None,
                max_active_runs=1,
                render_template_as_native_obj=True,
                ) as dag:
           feed_trigger = TriggerDagRunOperator(
               task_id="feed_trigger",
               trigger_dag_id="B",
               execution_date="{{ execution_date }}",
               conf="{{ dag_run.conf }}",
           )
   
           feed_sensor = ExternalTaskSensor(
               task_id="feed_sensor",
               external_dag_id="feeds_dag_test",
               external_task_id="generic_feed_loader",
               mode='reschedule',
               allowed_states=[State.SUCCESS, State.SKIPPED],
               failed_states=[State.FAILED, State.UPSTREAM_FAILED],
               poke_interval=182, 
           )
    ```
   
   DAGs A[1-80] all trigger DAG B which is a simple DAG with one task that 
sleeps randomly between 1-2 minutes
   In my test I've triggered DAGS A1-80 sequentially in a for loop.
   We can consistently reproduce this error but a different number DAGs fail 
each time.
   
   We use CeleryExecutor, with PostgresSql backend, and rabbitmq queue.
   
   We've run Airflow it DEBUG logging, it didn't show any extra warning/error. 
When trying to grep the task ID in the worker logs we couldn't find any log for 
the failed DAGs in the worker logs, for succeeded DAGs we could see their 
worker logs.
   
   ### Anything else
   
   We have tested the same flow in Airflow 1.10.15 and it ran successfully.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to