laurentpellegrino opened a new issue, #65044:
URL: https://github.com/apache/airflow/issues/65044

   ### Under which category would you file this issue?
   
   Airflow Core
   
   ### Apache Airflow version
   
   3.2.0
   
   ### What happened and how to reproduce it?
   
   TaskInstance.get_task_instance accepts a required dag_id argument but does 
not include it in the query's filter_by. The query filters only by (run_id, 
task_id, map_index). Any caller that 
     passes dag_id expecting disambiguation silently gets the wrong row when 
the triple is unique across DAGs, and gets sqlalchemy.exc.MultipleResultsFound 
when it isn't.
                                                                                
                                                                                
                                 
     airflow-core/src/airflow/models/taskinstance.py on main, L784–L807:        
                                                                                
                                 
     
     @classmethod                                                               
                                                                                
                                 
     @provide_session
     def get_task_instance(
         cls,
         dag_id: str,
         run_id: str,                                                           
                                                                                
                                 
         task_id: str,
         map_index: int,                                                        
                                                                                
                                 
         lock_for_update: bool = False,
         session: Session = NEW_SESSION,                                        
                                                                                
                                 
     ) -> TaskInstance | None:
         query = (                                                              
                                                                                
                                 
             select(TaskInstance)
             .options(lazyload(TaskInstance.dag_run))                           
                                                                                
                                 
             .filter_by(
                 run_id=run_id,                                                 
                                                                                
                                 
                 task_id=task_id,
                 map_index=map_index,
             )                                                                  
                                                                                
                                 
         )
                                                                                
                                                                                
                                 
         if lock_for_update:
             for attempt in run_with_db_retries(logger=cls.logger()):
                 with attempt:                                                  
                                                                                
                                 
                     return 
session.execute(query.with_for_update()).scalar_one_or_none()
         else:                                                                  
                                                                                
                                 
             return session.execute(query).scalar_one_or_none()
                                                                                
                                                                                
                                 
     dag_id is unused.
                                                                                
                                                                                
                                 
     Real-world impact: permanent EdgeExecutor scheduler crash loop             
                                                                                
                                 
     
     EdgeExecutor._update_orphaned_jobs 
(providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py on 
main, L189, crashing call at L203) calls this on every scheduler sync for     
     every lifeless EdgeJobModel row:
                                                                                
                                                                                
                                 
     for job in lifeless_jobs:
         ti = TaskInstance.get_task_instance(
             dag_id=job.dag_id,                                                 
                                                                                
                                 
             run_id=job.run_id,
             task_id=job.task_id,                                               
                                                                                
                                 
             map_index=job.map_index,
             session=session,
         )                                                                      
                                                                                
                                 
         job.state = ti.state if ti and ti.state else TaskInstanceState.REMOVED
         ...                                                                    
                                                                                
                                 
                     
     If any orphaned edge_job row has a (run_id, task_id, map_index) triple 
shared with a TaskInstance in a different DAG, this query raises 
MultipleResultsFound. The exception propagates up   
     _update_orphaned_jobs → EdgeExecutor.sync → executor.heartbeat() → 
_run_scheduler_loop, the scheduler process exits, its supervisor (Kubernetes, 
systemd, etc.) restarts it, the exact same
     stale edge_job row is picked up on the next sync, scheduler crashes again 
— a permanent crash loop. The scheduler never dispatches another queued task to 
the edge worker until the         
     offending row is manually removed from the metadata DB.
   
     Triggering the collision is easy in practice:                              
                                                                                
                                 
     
     - Multiple DAGs on the same cron (e.g. @daily) → identical 
scheduled__<logical_date> run_ids                                               
                                                 
     - Shared generic task_ids across those DAGs (done, cleanup, notify, end, 
start, …)
     - Same map_index (typically -1 for non-mapped tasks)                       
                                                                                
                                 
                                                                                
                                                                                
                                 
     Any edge worker event that leaves a row stuck in state=RUNNING past 
[scheduler] task_instance_heartbeat_timeout (default 300s) — a worker OOM, a 
SIGKILL, a network blip on the edge API, a 
     missed heartbeat while the edge_job completion callback was in flight — is 
enough to arm the landmine. The scheduler then crashes on its next sync.        
                                 
                                                                                
                                                                                
                                 
     Traceback                                                                  
                                                                                
                                 
     
     Traceback (most recent call last):                                         
                                                                                
                                 
       File ".../airflow/cli/commands/scheduler_command.py", line 48, in 
_run_scheduler_job
         run_job(job=job_runner.job, execute_callable=job_runner._execute)      
                                                                                
                                 
       File ".../airflow/jobs/job.py", line 355, in run_job                     
                                                                                
                                 
         return execute_job(job, execute_callable=execute_callable)             
                                                                                
                                 
       File ".../airflow/jobs/job.py", line 384, in execute_job                 
                                                                                
                                 
         ret = execute_callable()
       File ".../airflow/jobs/scheduler_job_runner.py", line 1463, in _execute  
                                                                                
                                 
         self._run_scheduler_loop()                                             
                                                                                
                                 
       File ".../airflow/jobs/scheduler_job_runner.py", line 1610, in 
_run_scheduler_loop                                                             
                                           
         executor.heartbeat()                                                   
                                                                                
                                 
       File ".../airflow/executors/base_executor.py", line 310, in heartbeat
         self.sync()                                                            
                                                                                
                                 
       File ".../airflow/providers/edge3/executors/edge_executor.py", line 312, 
in sync
         orphaned = self._update_orphaned_jobs(session)                         
                                                                                
                                 
       File ".../airflow/providers/edge3/executors/edge_executor.py", line 203, 
in _update_orphaned_jobs
         ti = TaskInstance.get_task_instance(                                   
                                                                                
                                 
             dag_id=job.dag_id,
             run_id=job.run_id,                                                 
                                                                                
                                 
             task_id=job.task_id,                                               
                                                                                
                                 
             map_index=job.map_index,
             session=session,                                                   
                                                                                
                                 
         )           
       File ".../airflow/models/taskinstance.py", line 806, in get_task_instance
         return session.execute(query).scalar_one_or_none()                     
                                                                                
                                 
       File ".../sqlalchemy/engine/result.py", line 1504, in scalar_one_or_none
         return self._only_one_row(raise_for_second_row=True, 
raise_for_none=False, scalar=True)                                              
                                                   
       File ".../sqlalchemy/engine/result.py", line 825, in _only_one_row       
                                                                                
                                 
         raise exc.MultipleResultsFound(...)                                    
                                                                                
                                 
     sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one or 
none was required                                                               
                                  
                     
     Reproducer                                                                 
                                                                                
                                 
                     
     Two DAGs on the same schedule sharing a task_id:                           
                                                                                
                                 
                     
     from datetime import datetime                                              
                                                                                
                                 
     from airflow import DAG
     from airflow.operators.empty import EmptyOperator
                                                                                
                                                                                
                                 
     for name in ("dag_a", "dag_b"):                                            
                                                                                
                                 
         with DAG(name, start_date=datetime(2026, 1, 1), schedule="@daily", 
catchup=False):                                                                 
                                     
             EmptyOperator(task_id="done")                                      
                                                                                
                                 
                                                                                
                                                                                
                                 
     Steps:
                                                                                
                                                                                
                                 
     1. Configure the EdgeExecutor and run an edge worker.
     2. Trigger both DAGs so each has a TaskInstance for 
(run_id='scheduled__<logical_date>', task_id='done', map_index=-1).
     3. Leave an edge_job row orphaned: state=running, last_update older than 
[scheduler] task_instance_heartbeat_timeout (default 300s). Easiest way: kill 
-9 the edge worker mid-task, or drop 
     the network between worker and API long enough to miss heartbeats.         
                                                                                
                                 
     4. On the next scheduler sync, _update_orphaned_jobs picks up that row and 
calls get_task_instance. Because dag_id is ignored in the filter, the query 
returns both TIs →                   
     MultipleResultsFound → scheduler process exits.                            
                                                                                
                                 
     5. The supervisor restarts the scheduler; the same stale row is still in 
the DB; go to step 4.
                                                                                
                                                                                
                                 
     Direct DB proof                                                            
                                                                                
                                 
      
     I ran the exact call the scheduler makes, with the exact arguments, 
against the live metadata DB:                                                   
                                        
                     
     >>> from airflow.models.taskinstance import TaskInstance
     >>> from airflow.utils.session import create_session                       
                                                                                
                                 
     >>> with create_session() as s:
     ...     TaskInstance.get_task_instance(                                    
                                                                                
                                 
     ...         dag_id='dag_a',
     ...         run_id='scheduled__2026-04-11T01:00:00+00:00',                 
                                                                                
                                 
     ...         task_id='done',
     ...         map_index=-1,                                                  
                                                                                
                                 
     ...         session=s,
     ...     )
     sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when one or 
none was required                                                               
                                  
      
     The two matching rows:                                                     
                                                                                
                                 
                     
     ┌────────┬─────────┬──────────────────────────────────────┬─────────┐      
                                                                                
                                 
     │ dag_id │ task_id │                run_id                │  state  │
     ├────────┼─────────┼──────────────────────────────────────┼─────────┤      
                                                                                
                                 
     │ dag_a  │ done    │ scheduled__2026-04-11T01:00:00+00:00 │ success │
     ├────────┼─────────┼──────────────────────────────────────┼─────────┤
     │ dag_b  │ done    │ scheduled__2026-04-11T01:00:00+00:00 │ success │      
                                                                                
                                 
     └────────┴─────────┴──────────────────────────────────────┴─────────┘
                                                                                
                                                                                
                                 
     Both TaskInstances had already finished successfully hours earlier. The 
edge_job rows were stale leftovers whose completion callback never updated 
edge_job.state away from running, which  
     is what kept them eligible for orphan handling and re-triggered the crash 
on every sync cycle.
                                                                                
                                                                                
                                 
     Workaround      
   
     1. Manually delete the stale edge_job rows whose (run_id, task_id, 
map_index) triples collide across DAGs.                                         
                                         
     2. Restart the scheduler.
     3. Rename colliding task_ids in DAGs on identical schedules to be 
DAG-unique (done → <dag_name>_done, etc.) so the collision cannot reoccur while 
the core bug is unfixed.
   
   ### What you think should happen instead?
   
   TaskInstance.get_task_instance should include dag_id in its filter. The 
parameter is part of the method signature, it's required (not optional), and 
every call site passes it expecting
     disambiguation. The current behavior is silently wrong: for non-colliding 
data it returns the "right" row by accident, and for colliding data it raises 
MultipleResultsFound and surfaces as
      an exception in whichever caller happens to trigger it first.
                                                                                
                                                                                
                                 
     Minimal fix in airflow-core/src/airflow/models/taskinstance.py:            
                                                                                
                                 
      
     query = (                                                                  
                                                                                
                                 
         select(TaskInstance)
         .options(lazyload(TaskInstance.dag_run))
         .filter_by(
             dag_id=dag_id,                                                     
                                                                                
                                 
             run_id=run_id,
             task_id=task_id,                                                   
                                                                                
                                 
             map_index=map_index,
         )
     )
   
     Other call sites of get_task_instance that pass dag_id under the same 
assumption should be audited — any place that pre-filters a set of candidate 
jobs by dag_id and then looks up their   
     TIs is latently exposed to the same class of bug (silent wrong row in the 
happy path, crash in the collision path).
                                                                                
                                                                                
                                 
     Independent defensive hardening in providers/edge3
   
     Even with the core fix merged, the scheduler loop should not be killable 
by a single poisoned edge_job row. A stray database inconsistency, a future 
provider regression, or an unrelated   
     IntegrityError/DataError on one row shouldn't take down the whole 
scheduler — the loss of the entire dispatch pipeline is wildly disproportionate 
to the blast radius of one stale job.
                                                                                
                                                                                
                                 
     Wrap the per-row lookup in EdgeExecutor._update_orphaned_jobs 
(providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py on 
main, L203) in a try/except, log+skip on failure,  
     and mark the offending row as REMOVED so it doesn't come back on the next 
sync:
                                                                                
                                                                                
                                 
     from sqlalchemy.exc import MultipleResultsFound, SQLAlchemyError
   
     for job in lifeless_jobs:
         try:
             ti = TaskInstance.get_task_instance(
                 dag_id=job.dag_id,                                             
                                                                                
                                 
                 run_id=job.run_id,
                 task_id=job.task_id,                                           
                                                                                
                                 
                 map_index=job.map_index,
                 session=session,
             )                                                                  
                                                                                
                                 
         except (MultipleResultsFound, SQLAlchemyError):
             self.log.exception(                                                
                                                                                
                                 
                 "Failed to resolve TaskInstance for orphaned edge_job "
                 "(dag_id=%s task_id=%s run_id=%s map_index=%s); marking as 
REMOVED",                                                                       
                                     
                 job.dag_id, job.task_id, job.run_id, job.map_index,
             )                                                                  
                                                                                
                                 
             job.state = TaskInstanceState.REMOVED
             continue                                                           
                                                                                
                                 
                     
         job.state = ti.state if ti and ti.state else TaskInstanceState.REMOVED
         ...
                                                                                
                                                                                
                                 
     Both changes are small and independently mergeable. The core fix closes 
the root cause; the edge3 hardening is the backstop that prevents a future 
variant of the same class of bug from    
     causing another outage.
   
   ### Operating System
   
   Talos Linux v1.11.6 (Kubernetes 1.33.10)
   
   ### Deployment
   
   Other
   
   ### Apache Airflow Provider(s)
   
   edge3
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-edge3==3.3.0
   
   ### Official Helm Chart version
   
   Not Applicable
   
   ### Kubernetes Version
   
   _No response_
   
   ### Helm Chart configuration
   
   Not Applicable
   
   ### Docker Image customizations
   
   Base image: apache/airflow:3.2.0
     Added packages: elaunira-airflow, elaunira-airflow-providers-r2index, 
elaunira-r2index, openplanetdata-airflow                                        
                                      
     No modifications to Airflow core or the edge3 provider.
   
   ### Anything else?
   
   Frequency: deterministic once the DB holds a poisoned `edge_job` row. Our 
scheduler accumulated 75 restarts in ~9 hours and never recovered on its own. 
Workaround was to delete the 4 stale `edge_job` rows whose `(run_id, task_id, 
map_index)` triples collided across DAGs, then restart the scheduler pod. 
Renaming shared task_ids (`done`, `cleanup`) to DAG-unique names        
     prevents recurrence.
                                                                                
                                                                                
                                 
     Collision surface is broader than it looks: any two DAGs on the same cron 
schedule that share a task_id will produce colliding TIs on every run. Generic 
names like `done`, `cleanup`, `notify`, `start`, `end` are common in template 
DAGs.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to