This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a32c45  Don't Update Serialized DAGs in DB if DAG didn't change 
(#9850)
1a32c45 is described below

commit 1a32c45126f1086a52eeee52d4c19427af06274b
Author: Kaxil Naik <[email protected]>
AuthorDate: Mon Jul 20 12:31:05 2020 +0100

    Don't Update Serialized DAGs in DB if DAG didn't change (#9850)
    
    We should not update the "last_updated" column unnecessarily. This is first 
of  few optimizations to DAG Serialization that would also aid in DAG Versioning
---
 airflow/models/serialized_dag.py    | 38 ++++++++++++++++++++++++++-----------
 tests/models/test_serialized_dag.py | 32 ++++++++++++++++++++++++++++++-
 2 files changed, 58 insertions(+), 12 deletions(-)

diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 5592127..0e3b732 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Serialzed DAG table in database."""
+"""Serialized DAG table in database."""
 
 import logging
 from datetime import timedelta
@@ -24,6 +24,7 @@ from typing import Any, Dict, List, Optional
 
 import sqlalchemy_jsonfield
 from sqlalchemy import BigInteger, Column, Index, String, and_
+from sqlalchemy.orm import Session
 from sqlalchemy.sql import exists
 
 from airflow.models.base import ID_LEN, Base
@@ -81,8 +82,10 @@ class SerializedDagModel(Base):
 
     @classmethod
     @provide_session
-    def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, 
session=None):
+    def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, 
session: Session = None):
         """Serializes a DAG and writes it into database.
+        If the record already exists, it checks if the Serialized DAG changed 
or not. If it is
+        changed, it updates the record, ignores otherwise.
 
         :param dag: a DAG to be written into database
         :param min_update_interval: minimal interval in seconds to update 
serialized DAG
@@ -97,13 +100,21 @@ class SerializedDagModel(Base):
                      (timezone.utcnow() - 
timedelta(seconds=min_update_interval)) < cls.last_updated))
             ).scalar():
                 return
-        log.debug("Writing DAG: %s to the DB", dag.dag_id)
-        session.merge(cls(dag))
+
+        log.debug("Checking if DAG (%s) changed", dag.dag_id)
+        serialized_dag_from_db: SerializedDagModel = 
session.query(cls).get(dag.dag_id)
+        new_serialized_dag = cls(dag)
+        if serialized_dag_from_db and (serialized_dag_from_db.data == 
new_serialized_dag.data):
+            log.debug("Serialized DAG (%s) is unchanged. Skipping writing to 
DB", dag.dag_id)
+            return
+
+        log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
+        session.merge(new_serialized_dag)
         log.debug("DAG: %s written to the DB", dag.dag_id)
 
     @classmethod
     @provide_session
-    def read_all_dags(cls, session=None) -> Dict[str, 'SerializedDAG']:
+    def read_all_dags(cls, session: Session = None) -> Dict[str, 
'SerializedDAG']:
         """Reads all DAGs in serialized_dag table.
 
         :param session: ORM Session
@@ -137,7 +148,7 @@ class SerializedDagModel(Base):
 
     @classmethod
     @provide_session
-    def remove_dag(cls, dag_id: str, session=None):
+    def remove_dag(cls, dag_id: str, session: Session = None):
         """Deletes a DAG with given dag_id.
 
         :param dag_id: dag_id to be deleted
@@ -148,7 +159,7 @@ class SerializedDagModel(Base):
 
     @classmethod
     @provide_session
-    def remove_stale_dags(cls, expiration_date, session=None):
+    def remove_stale_dags(cls, expiration_date, session: Session = None):
         """
         Deletes Serialized DAGs that were last touched by the scheduler before
         the expiration date. These DAGs were likely deleted.
@@ -156,6 +167,8 @@ class SerializedDagModel(Base):
         :param expiration_date: set inactive DAGs that were touched before this
             time
         :type expiration_date: datetime
+        :param session: ORM Session
+        :type session: Session
         :return: None
         """
         log.debug("Deleting Serialized DAGs that haven't been touched by the "
@@ -168,7 +181,7 @@ class SerializedDagModel(Base):
 
     @classmethod
     @provide_session
-    def has_dag(cls, dag_id: str, session=None) -> bool:
+    def has_dag(cls, dag_id: str, session: Session = None) -> bool:
         """Checks a DAG exist in serialized_dag table.
 
         :param dag_id: the DAG to check
@@ -178,7 +191,7 @@ class SerializedDagModel(Base):
 
     @classmethod
     @provide_session
-    def get(cls, dag_id: str, session=None) -> Optional['SerializedDagModel']:
+    def get(cls, dag_id: str, session: Session = None) -> 
Optional['SerializedDagModel']:
         """
         Get the SerializedDAG for the given dag ID.
         It will cope with being passed the ID of a subdag by looking up the
@@ -200,12 +213,15 @@ class SerializedDagModel(Base):
 
     @staticmethod
     @provide_session
-    def bulk_sync_to_db(dags: List[DAG], session=None):
+    def bulk_sync_to_db(dags: List[DAG], session: Session = None):
         """
-        Saves DAGs as Seralized DAG objects in the database. Each DAG is saved 
in a separate database query.
+        Saves DAGs as Serialized DAG objects in the database. Each
+        DAG is saved in a separate database query.
 
         :param dags: the DAG objects to save to the DB
         :type dags: List[airflow.models.dag.DAG]
+        :param session: ORM Session
+        :type session: Session
         :return: None
         """
         for dag in dags:
diff --git a/tests/models/test_serialized_dag.py 
b/tests/models/test_serialized_dag.py
index fe96d73..79e71a8 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -76,6 +76,36 @@ class SerializedDagModelTest(unittest.TestCase):
                 # Verifies JSON schema.
                 SerializedDAG.validate_schema(result.data)
 
+    def test_serialized_dag_is_updated_only_if_dag_is_changed(self):
+        """Test Serialized DAG is updated if DAG is changed"""
+
+        example_dags = make_example_dags(example_dags_module)
+        example_bash_op_dag = example_dags.get("example_bash_operator")
+        SDM.write_dag(dag=example_bash_op_dag)
+
+        with create_session() as session:
+            last_updated = session.query(
+                SDM.last_updated).filter(SDM.dag_id == 
example_bash_op_dag.dag_id).one_or_none()
+
+            # Test that if DAG is not changed, Serialized DAG is not 
re-written and last_updated
+            # column is not updated
+            SDM.write_dag(dag=example_bash_op_dag)
+            last_updated_1 = session.query(
+                SDM.last_updated).filter(SDM.dag_id == 
example_bash_op_dag.dag_id).one_or_none()
+
+            self.assertEqual(last_updated, last_updated_1)
+
+            # Update DAG
+            example_bash_op_dag.tags += ["new_tag"]
+            self.assertCountEqual(example_bash_op_dag.tags, ["example", 
"new_tag"])
+
+            SDM.write_dag(dag=example_bash_op_dag)
+            new_s_dag = session.query(SDM.last_updated, SDM.data).filter(
+                SDM.dag_id == example_bash_op_dag.dag_id).one_or_none()
+
+            self.assertNotEqual(last_updated, new_s_dag.last_updated)
+            self.assertEqual(new_s_dag.data["dag"]["tags"], ["example", 
"new_tag"])
+
     def test_read_dags(self):
         """DAGs can be read from database."""
         example_dags = self._write_example_dags()
@@ -120,5 +150,5 @@ class SerializedDagModelTest(unittest.TestCase):
         dags = [
             DAG("dag_1"), DAG("dag_2"), DAG("dag_3"),
         ]
-        with assert_queries_count(7):
+        with assert_queries_count(10):
             SDM.bulk_sync_to_db(dags)

Reply via email to