broholens opened a new issue, #24313:
URL: https://github.com/apache/airflow/issues/24313

   ### Apache Airflow version
   
   2.3.2 (latest released)
   
   ### What happened
   
   There is a dag with pool1,and task in task_instance of this dag works on 
pool1,when i update the pool of the dag to pool2,  task in task_instance of 
this dag still works on pool1.
   
   I checked `serialized_dag` table of this dag, and the data already updated 
to pool2. 
   
   So i read the code, and i find there may have a bug in function 
`_add_dag_from_db` of `dagbay.py`.  And it happens in a very low probability.
   ```python
       def _add_dag_from_db(self, dag_id: str, session: Session):
           """Add DAG to DagBag from DB"""
           from airflow.models.serialized_dag import SerializedDagModel
   
           row = SerializedDagModel.get(dag_id, session)
           if not row:
               return None
   
           row.load_op_links = self.load_op_links
           dag = row.dag
           for subdag in dag.subdags:
               self.dags[subdag.dag_id] = subdag
           self.dags[dag.dag_id] = dag
           self.dags_last_fetched[dag.dag_id] = timezone.utcnow()
           self.dags_hash[dag.dag_id] = row.dag_hash
   ```
   Let's say there's a situation:
   when `row = SerializedDagModel.get(dag_id, session)` executed and 
`self.dags_last_fetched[dag.dag_id] = timezone.utcnow()`  not executed, other 
process update the table `serialized_dag` of this dag,  `self.dags` will store 
the old dag, and self.dags_last_fetched will be bigger than `serialized_dag`  
update_time,then code below will never execute:
   ```python
   if sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
       self._add_dag_from_db(dag_id=dag_id, session=session)
   ```
   #### so the dag in memory will never be updated.
   
   
   And i think this code will fix the bug:
   ```python
       def _add_dag_from_db(self, dag_id: str, session: Session):
           """Add DAG to DagBag from DB"""
           from airflow.models.serialized_dag import SerializedDagModel
           
           last_fetched_time = timezone.utcnow()
           row = SerializedDagModel.get(dag_id, session)
           if not row:
               return None
   
           row.load_op_links = self.load_op_links
           dag = row.dag
           for subdag in dag.subdags:
               self.dags[subdag.dag_id] = subdag
           self.dags[dag.dag_id] = dag
           self.dags_last_fetched[dag.dag_id] = last_fetched_time
           self.dags_hash[dag.dag_id] = row.dag_hash
   ```
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to