This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a10965fed6 Re-serialize all DAGs on 'airflow db upgrade' (#24518)
a10965fed6 is described below

commit a10965fed6946caa954bbffa57a19e33d07d6ff2
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Mon Jun 20 06:42:08 2022 +0800

    Re-serialize all DAGs on 'airflow db upgrade' (#24518)
---
 airflow/cli/commands/dag_command.py |  5 +++--
 airflow/utils/db.py                 | 15 ++++++++++++---
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/airflow/cli/commands/dag_command.py 
b/airflow/cli/commands/dag_command.py
index a219c94da9..b193ad729a 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -26,6 +26,7 @@ import sys
 from typing import Optional
 
 from graphviz.dot import Dot
+from sqlalchemy.orm import Session
 from sqlalchemy.sql.functions import func
 
 from airflow import settings
@@ -479,10 +480,10 @@ def dag_test(args, session=None):
 
 @provide_session
 @cli_utils.action_cli
-def dag_reserialize(args, session=None):
+def dag_reserialize(args, session: Session = NEW_SESSION):
     session.query(SerializedDagModel).delete(synchronize_session=False)
 
     if not args.clear_only:
         dagbag = DagBag()
         dagbag.collect_dags(only_if_updated=False, safe_mode=False)
-        dagbag.sync_to_db()
+        dagbag.sync_to_db(session=session)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index b4ef33043b..bb2c2a6bae 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -59,7 +59,7 @@ from airflow.models import (  # noqa: F401
 )
 
 # We need to add this model manually to get reset working well
-from airflow.models.serialized_dag import SerializedDagModel  # noqa: F401
+from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.tasklog import LogTemplate
 from airflow.utils import helpers
 
@@ -788,6 +788,14 @@ def check_and_run_migrations():
         sys.exit(1)
 
 
+@provide_session
+def reserialize_dags(*, session: Session = NEW_SESSION) -> None:
+    session.query(SerializedDagModel).delete(synchronize_session=False)
+    dagbag = DagBag()
+    dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+    dagbag.sync_to_db(session=session)
+
+
 @provide_session
 def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
     """Synchronize log template configs with table.
@@ -1473,8 +1481,9 @@ def upgradedb(
     with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
         log.info("Creating tables")
         command.upgrade(config, revision=to_revision or 'heads')
-    add_default_pool_if_not_exists()
-    synchronize_log_template()
+    reserialize_dags(session=session)
+    add_default_pool_if_not_exists(session=session)
+    synchronize_log_template(session=session)
 
 
 @provide_session

Reply via email to