ciancolo opened a new issue, #54939:
URL: https://github.com/apache/airflow/issues/54939

   ### Apache Airflow version
   
   3.0.5
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   We are running Airflow 3.0.4 (latest version available on PyPI) with 
LocalExecutor and a PostgreSQL 16 database hosted on another machine in the 
same network as the metastore.
   
   Since upgrading from Airflow 2.11 to Airflow 3, we’ve been encountering an 
issue where sensors are killed before their configured timeout. This only 
happens during periods of high load, when multiple DAGs and sensors are running 
in parallel.
   
   The sensors are configured as follows:
   
   ```
   ExternalTaskSensor(task_id='my_sensor_Task',
                                              external_dag_id='external_dag',
                                              external_task_id='external_task',
                                              allowed_states=['success'],
                                              
execution_delta=timedelta(seconds=0),
                                              poke_interval=60,
                                              mode='reschedule',
                                              timeout=60 * 60 * 3,
                                              dag=dag)
   ```
   
   Here is our Airflow configuration:
   ```
   [core]
   parallelism = 32
   max_active_tasks_per_dag = 16
   max_active_runs_per_dag = 16
   
   [database]
   sql_alchemy_pool_size = 20
   sql_alchemy_max_overflow = 40
   sql_alchemy_pool_recycle = 1800
   ```
   Yesterday we increased the pool size and the max overflow parameters for the 
database. The issue has improved, but it is still not fully resolved.
   
   We also checked the database to see if it was overloaded or unable to handle 
all the requests, but it looks fine.
   
   Looking at the logs, we find the following errors in the scheduler log:
   
   ```
   {base_executor.py:512} DEBUG - Changing state: 
TaskInstanceKey(dag_id='sensor_race_test', task_id='test_sensor_38', 
run_id='scheduled__2024-01-10T00:00:00+00:00', try_number=2, map_index=-1)
   {base_executor.py:517} DEBUG - Could not find key: 
TaskInstanceKey(dag_id='sensor_race_test', task_id='test_sensor_38', 
run_id='scheduled__2024-01-10T00:00:00+00:00', try_number=2, map_index=-1)
   {scheduler_job_runner.py:811} INFO - Received executor event with state 
success for task instance TaskInstanceKey(dag_id='sensor_race_test', 
task_id='test_sensor_38', run_id='scheduled__2024-01-10T00:00:00+00:00', 
try_number=2, map_index=-1)
   {scheduler_job_runner.py:853} INFO - TaskInstance Finished: 
dag_id=sensor_race_test, task_id=test_sensor_38, 
run_id=scheduled__2024-01-10T00:00:00+00:00, map_index=-1, 
run_start_date=2025-08-26 09:33:41.915710+00:00, run_end_date=2025-08-26 
09:33:48.797037+00:00, run_duration=6.881327, state=scheduled, 
executor=LocalExecutor(parallelism=32), executor_state=success, try_number=2, 
max_tries=1, pool=default_pool, queue=default, priority_weight=1, 
operator=PythonSensor, queued_dttm=2025-08-26 09:33:41.700085+00:00, 
scheduled_dttm=2025-08-26 09:33:54.249313+00:00,queued_by_job_id=10439623, 
pid=1969715
   {scheduler_job_runner.py:926} ERROR - DAG 'sensor_race_test' for task 
instance <TaskInstance: sensor_race_test.test_sensor_38 
scheduled__2024-01-10T00:00:00+00:00 [scheduled]> not found in serialized_dag 
table
   {taskinstance.py:1882} ERROR - Executor LocalExecutor(parallelism=32) 
reported that the task instance <TaskInstance: sensor_race_test.test_sensor_38 
scheduled__2024-01-10T00:00:00+00:00 [scheduled]> finished with state success, 
but the task instance's state attribute is scheduled. Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
   {listener.py:37} DEBUG - Calling 'on_task_instance_failed' with 
{'previous_state': <TaskInstanceState.RUNNING: 'running'>, 'task_instance': 
<TaskInstance: sensor_race_test.test_sensor_38 
scheduled__2024-01-10T00:00:00+00:00 [failed]>, 'error': "Executor 
LocalExecutor(parallelism=32) reported that the task instance <TaskInstance: 
sensor_race_test.test_sensor_38 scheduled__2024-01-10T00:00:00+00:00 
[scheduled]> finished with state success, but the task instance's state 
attribute is scheduled. Learn more: 
https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally"}
   {taskinstance.py:2005} INFO - Marking task as FAILED. 
dag_id=sensor_race_test, task_id=test_sensor_38, 
run_id=scheduled__2024-01-10T00:00:00+00:00, logical_date=20240110T000000, 
start_date=20250826T093341, end_date=20250826T093354
   ```
   
   No errors appear in the task logs or any other logs.
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   We can reproduce the issue using this minimal DAG that creates 50 Python 
sensors:
   
   ```
   from airflow import DAG
   from airflow.sensors.base import PokeReturnValue
   from airflow.sensors.python import PythonSensor
   from datetime import timedelta, datetime
   import random
   import time
   
   # Funzione fittizia che "simula" una condizione esterna
   # Restituisce False quasi sempre, così il sensore rimane in reschedule
   # e genera tante richieste allo scheduler
   
   def fake_condition():
       # Simula una probabilità bassa di "successo"
       return random.random() < 0.01
   
   
   def sensor_fn():
       return PokeReturnValue(is_done=fake_condition())
   
   
   def make_sensor(task_id):
       return PythonSensor(
           task_id=task_id,
           python_callable=sensor_fn,
           poke_interval=5,        # molto breve per stressare
           mode="reschedule",      # fondamentale per generare load su scheduler
           timeout=600,             # dopo 10 minuti si interrompe
       )
   
   
   def make_dag(dag_id):
       with DAG(
           dag_id=dag_id,
           start_date=datetime(2024, 1, 10),
           schedule="@once",
           catchup=False,
           default_args={"owner": "airflow", "retries": 0},
           tags=["test", "sensor"],
       ) as dag:
   
           # Genera molti sensori paralleli
           for i in range(50):  # puoi aumentare a 50-100 per stressare di più
               make_sensor(f"test_sensor_{i}")
   
           return dag
   
   
   globals()["sensor_race_test"] = make_dag("sensor_race_test")
   ```
   
   ### Operating System
   
   Ubuntu 22.04.5 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to