This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7737ece0af3 Add `select_from` to the min serialize check query (#48572)
7737ece0af3 is described below
commit 7737ece0af340314e72391320721dde4d8a527a5
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Apr 1 12:32:27 2025 +0100
Add `select_from` to the min serialize check query (#48572)
* Add `select_from` to the min serialize check query
This clarifies the model we are running the query on and also updated
the query to use last_updated which is the right field to compare with
* Add test for min_update_interval
* Update airflow-core/src/airflow/config_templates/config.yml
---
airflow-core/src/airflow/models/serialized_dag.py | 6 ++++--
.../tests/unit/models/test_serialized_dag.py | 20 ++++++++++++++++++++
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index b1defa8f9dd..a00a5c424f2 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -380,10 +380,12 @@ class SerializedDagModel(Base):
# If No or the DAG does not exists, updates / writes Serialized DAG to
DB
if min_update_interval is not None:
if session.scalar(
- select(literal(True)).where(
+ select(literal(True))
+ .where(
cls.dag_id == dag.dag_id,
- (timezone.utcnow() -
timedelta(seconds=min_update_interval)) < cls.created_at,
+ (timezone.utcnow() -
timedelta(seconds=min_update_interval)) < cls.last_updated,
)
+ .select_from(cls)
):
return False
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index c68be9eea05..57060a34db3 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -391,3 +391,23 @@ class TestSerializedDagModel:
session.execute(update(DagModel).where(DagModel.dag_id ==
dag_id).values(is_active=False))
dependencies = SDM.get_dag_dependencies(session=session)
assert dag_id not in dependencies
+
+ @pytest.mark.parametrize("min_update_interval", [0, 10])
+ @mock.patch.object(DagVersion, "get_latest_version")
+ def test_min_update_interval_is_respected(
+ self, mock_dv_get_latest_version, min_update_interval, dag_maker
+ ):
+ mock_dv_get_latest_version.return_value = None
+ with dag_maker("dag1") as dag:
+ PythonOperator(task_id="task1", python_callable=lambda: None)
+ dag.sync_to_db()
+ SDM.write_dag(dag, bundle_name="testing")
+ # new task
+ PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag)
+ SDM.write_dag(dag, bundle_name="testing",
min_update_interval=min_update_interval)
+ if min_update_interval:
+ # Because min_update_interval is 10, DagVersion.get_latest_version
would
+ # be called only once:
+ mock_dv_get_latest_version.assert_called_once()
+ else:
+ assert mock_dv_get_latest_version.call_count == 2