This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a10965fed6 Re-serialize all DAGs on 'airflow db upgrade' (#24518)
a10965fed6 is described below
commit a10965fed6946caa954bbffa57a19e33d07d6ff2
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Mon Jun 20 06:42:08 2022 +0800
Re-serialize all DAGs on 'airflow db upgrade' (#24518)
---
airflow/cli/commands/dag_command.py | 5 +++--
airflow/utils/db.py | 15 ++++++++++++---
2 files changed, 15 insertions(+), 5 deletions(-)
diff --git a/airflow/cli/commands/dag_command.py
b/airflow/cli/commands/dag_command.py
index a219c94da9..b193ad729a 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -26,6 +26,7 @@ import sys
from typing import Optional
from graphviz.dot import Dot
+from sqlalchemy.orm import Session
from sqlalchemy.sql.functions import func
from airflow import settings
@@ -479,10 +480,10 @@ def dag_test(args, session=None):
@provide_session
@cli_utils.action_cli
-def dag_reserialize(args, session=None):
+def dag_reserialize(args, session: Session = NEW_SESSION):
session.query(SerializedDagModel).delete(synchronize_session=False)
if not args.clear_only:
dagbag = DagBag()
dagbag.collect_dags(only_if_updated=False, safe_mode=False)
- dagbag.sync_to_db()
+ dagbag.sync_to_db(session=session)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index b4ef33043b..bb2c2a6bae 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -59,7 +59,7 @@ from airflow.models import ( # noqa: F401
)
# We need to add this model manually to get reset working well
-from airflow.models.serialized_dag import SerializedDagModel # noqa: F401
+from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.tasklog import LogTemplate
from airflow.utils import helpers
@@ -788,6 +788,14 @@ def check_and_run_migrations():
sys.exit(1)
+@provide_session
+def reserialize_dags(*, session: Session = NEW_SESSION) -> None:
+ session.query(SerializedDagModel).delete(synchronize_session=False)
+ dagbag = DagBag()
+ dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+ dagbag.sync_to_db(session=session)
+
+
@provide_session
def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
"""Synchronize log template configs with table.
@@ -1473,8 +1481,9 @@ def upgradedb(
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
log.info("Creating tables")
command.upgrade(config, revision=to_revision or 'heads')
- add_default_pool_if_not_exists()
- synchronize_log_template()
+ reserialize_dags(session=session)
+ add_default_pool_if_not_exists(session=session)
+ synchronize_log_template(session=session)
@provide_session