broholens opened a new issue, #24313:
URL: https://github.com/apache/airflow/issues/24313
### Apache Airflow version
2.3.2 (latest released)
### What happened
There is a dag with pool1,and task in task_instance of this dag works on
pool1,when i update the pool of the dag to pool2, task in task_instance of
this dag still works on pool1.
I checked `serialized_dag` table of this dag, and the data already updated
to pool2.
So i read the code, and i find there may have a bug in function
`_add_dag_from_db` of `dagbay.py`. And it happens in a very low probability.
```python
def _add_dag_from_db(self, dag_id: str, session: Session):
"""Add DAG to DagBag from DB"""
from airflow.models.serialized_dag import SerializedDagModel
row = SerializedDagModel.get(dag_id, session)
if not row:
return None
row.load_op_links = self.load_op_links
dag = row.dag
for subdag in dag.subdags:
self.dags[subdag.dag_id] = subdag
self.dags[dag.dag_id] = dag
self.dags_last_fetched[dag.dag_id] = timezone.utcnow()
self.dags_hash[dag.dag_id] = row.dag_hash
```
Let's say there's a situation:
when `row = SerializedDagModel.get(dag_id, session)` executed and
`self.dags_last_fetched[dag.dag_id] = timezone.utcnow()` not executed, other
process update the table `serialized_dag` of this dag, `self.dags` will store
the old dag, and self.dags_last_fetched will be bigger than `serialized_dag`
update_time,then code below will never execute:
```python
if sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
self._add_dag_from_db(dag_id=dag_id, session=session)
```
#### so the dag in memory will never be updated.
And i think this code will fix the bug:
```python
def _add_dag_from_db(self, dag_id: str, session: Session):
"""Add DAG to DagBag from DB"""
from airflow.models.serialized_dag import SerializedDagModel
last_fetched_time = timezone.utcnow()
row = SerializedDagModel.get(dag_id, session)
if not row:
return None
row.load_op_links = self.load_op_links
dag = row.dag
for subdag in dag.subdags:
self.dags[subdag.dag_id] = subdag
self.dags[dag.dag_id] = dag
self.dags_last_fetched[dag.dag_id] = last_fetched_time
self.dags_hash[dag.dag_id] = row.dag_hash
```
### What you think should happen instead
_No response_
### How to reproduce
_No response_
### Operating System
linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]