This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b9d2aaf4f901f738f9df22628a6ed8ba4208107c Author: Kaxil Naik <[email protected]> AuthorDate: Mon Jul 20 12:31:05 2020 +0100 Don't Update Serialized DAGs in DB if DAG didn't change (#9850) We should not update the "last_updated" column unnecessarily. This is first of few optimizations to DAG Serialization that would also aid in DAG Versioning (cherry picked from commit 1a32c45126f1086a52eeee52d4c19427af06274b) --- airflow/models/serialized_dag.py | 22 ++++++++++++++++++---- tests/models/test_serialized_dag.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index c655e34..1313cac 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -"""Serialzed DAG table in database.""" +"""Serialized DAG table in database.""" import logging from datetime import timedelta @@ -25,6 +25,7 @@ from typing import Any, Optional import sqlalchemy_jsonfield from sqlalchemy import BigInteger, Column, Index, String, and_ +from sqlalchemy.orm import Session # noqa: F401 from sqlalchemy.sql import exists from airflow.models.base import ID_LEN, Base @@ -86,12 +87,13 @@ class SerializedDagModel(Base): min_update_interval=None, # type: Optional[int] session=None): """Serializes a DAG and writes it into database. + If the record already exists, it checks if the Serialized DAG changed or not. If it is + changed, it updates the record, ignores otherwise. :param dag: a DAG to be written into database :param min_update_interval: minimal interval in seconds to update serialized DAG :param session: ORM Session """ - log.debug("Writing DAG: %s to the DB", dag) # Checks if (Current Time - Time when the DAG was written to DB) < min_update_interval # If Yes, does nothing # If No or the DAG does not exists, updates / writes Serialized DAG to DB @@ -102,8 +104,15 @@ class SerializedDagModel(Base): ).scalar(): return - log.debug("Writing DAG: %s to the DB", dag.dag_id) - session.merge(cls(dag)) + log.debug("Checking if DAG (%s) changed", dag.dag_id) + serialized_dag_from_db = session.query(cls).get(dag.dag_id) # type: SerializedDagModel + new_serialized_dag = cls(dag) + if serialized_dag_from_db and (serialized_dag_from_db.data == new_serialized_dag.data): + log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id) + return + + log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id) + session.merge(new_serialized_dag) log.debug("DAG: %s written to the DB", dag.dag_id) @classmethod @@ -112,6 +121,7 @@ class SerializedDagModel(Base): """Reads all DAGs in serialized_dag table. :param session: ORM Session + :type session: Session :returns: a dict of DAGs read from database """ serialized_dags = session.query(cls) @@ -148,6 +158,7 @@ class SerializedDagModel(Base): :param dag_id: dag_id to be deleted :type dag_id: str :param session: ORM Session + :type session: Session """ session.execute(cls.__table__.delete().where(cls.dag_id == dag_id)) @@ -159,6 +170,7 @@ class SerializedDagModel(Base): :param alive_dag_filelocs: file paths of alive DAGs :type alive_dag_filelocs: list :param session: ORM Session + :type session: Session """ alive_fileloc_hashes = [ DagCode.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs] @@ -179,6 +191,7 @@ class SerializedDagModel(Base): :param dag_id: the DAG to check :type dag_id: str :param session: ORM Session + :type session: Session :rtype: bool """ return session.query(exists().where(cls.dag_id == dag_id)).scalar() @@ -193,6 +206,7 @@ class SerializedDagModel(Base): :param dag_id: the DAG to fetch :param session: ORM Session + :type session: Session """ from airflow.models.dag import DagModel row = session.query(cls).filter(cls.dag_id == dag_id).one_or_none() diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index c70db78..4e0e8ea 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -19,6 +19,7 @@ """Unit tests for SerializedDagModel.""" +import six import unittest from airflow import example_dags as example_dags_module @@ -75,6 +76,36 @@ class SerializedDagModelTest(unittest.TestCase): # Verifies JSON schema. SerializedDAG.validate_schema(result.data) + def test_serialized_dag_is_updated_only_if_dag_is_changed(self): + """Test Serialized DAG is updated if DAG is changed""" + + example_dags = make_example_dags(example_dags_module) + example_bash_op_dag = example_dags.get("example_bash_operator") + SDM.write_dag(dag=example_bash_op_dag) + + with db.create_session() as session: + last_updated = session.query( + SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none() + + # Test that if DAG is not changed, Serialized DAG is not re-written and last_updated + # column is not updated + SDM.write_dag(dag=example_bash_op_dag) + last_updated_1 = session.query( + SDM.last_updated).filter(SDM.dag_id == example_bash_op_dag.dag_id).one_or_none() + + self.assertEqual(last_updated, last_updated_1) + + # Update DAG + example_bash_op_dag.tags += ["new_tag"] + six.assertCountEqual(self, example_bash_op_dag.tags, ["example", "new_tag"]) + + SDM.write_dag(dag=example_bash_op_dag) + new_s_dag = session.query(SDM.last_updated, SDM.data).filter( + SDM.dag_id == example_bash_op_dag.dag_id).one_or_none() + + self.assertNotEqual(last_updated, new_s_dag.last_updated) + self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", "new_tag"]) + def test_read_dags(self): """DAGs can be read from database.""" example_dags = self._write_example_dags()
