This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7737ece0af3 Add `select_from` to the min serialize check query (#48572)
7737ece0af3 is described below

commit 7737ece0af340314e72391320721dde4d8a527a5
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Apr 1 12:32:27 2025 +0100

    Add `select_from` to the min serialize check query (#48572)
    
    * Add `select_from` to the min serialize check query
    
    This clarifies the model we are running the query on and also updated
    the query to use last_updated which is the right field to compare with
    
    * Add test for min_update_interval
    
    * Update airflow-core/src/airflow/config_templates/config.yml
---
 airflow-core/src/airflow/models/serialized_dag.py    |  6 ++++--
 .../tests/unit/models/test_serialized_dag.py         | 20 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index b1defa8f9dd..a00a5c424f2 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -380,10 +380,12 @@ class SerializedDagModel(Base):
         # If No or the DAG does not exists, updates / writes Serialized DAG to 
DB
         if min_update_interval is not None:
             if session.scalar(
-                select(literal(True)).where(
+                select(literal(True))
+                .where(
                     cls.dag_id == dag.dag_id,
-                    (timezone.utcnow() - 
timedelta(seconds=min_update_interval)) < cls.created_at,
+                    (timezone.utcnow() - 
timedelta(seconds=min_update_interval)) < cls.last_updated,
                 )
+                .select_from(cls)
             ):
                 return False
 
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index c68be9eea05..57060a34db3 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -391,3 +391,23 @@ class TestSerializedDagModel:
         session.execute(update(DagModel).where(DagModel.dag_id == 
dag_id).values(is_active=False))
         dependencies = SDM.get_dag_dependencies(session=session)
         assert dag_id not in dependencies
+
+    @pytest.mark.parametrize("min_update_interval", [0, 10])
+    @mock.patch.object(DagVersion, "get_latest_version")
+    def test_min_update_interval_is_respected(
+        self, mock_dv_get_latest_version, min_update_interval, dag_maker
+    ):
+        mock_dv_get_latest_version.return_value = None
+        with dag_maker("dag1") as dag:
+            PythonOperator(task_id="task1", python_callable=lambda: None)
+        dag.sync_to_db()
+        SDM.write_dag(dag, bundle_name="testing")
+        # new task
+        PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag)
+        SDM.write_dag(dag, bundle_name="testing", 
min_update_interval=min_update_interval)
+        if min_update_interval:
+            # Because min_update_interval is 10, DagVersion.get_latest_version 
would
+            # be called only once:
+            mock_dv_get_latest_version.assert_called_once()
+        else:
+            assert mock_dv_get_latest_version.call_count == 2

Reply via email to