This is an automated email from the ASF dual-hosted git repository. jscheffl pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow-mssql-migration.git
commit d6dbdb123ffe5ce2385463740425f0c63ef72938 Author: Jens Scheffler <[email protected]> AuthorDate: Fri Dec 29 21:18:57 2023 +0100 initial commit --- README.md | 5 ++ migrate_script.py | 254 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+) diff --git a/README.md b/README.md new file mode 100644 index 0000000..97abbb3 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# airflow-mssql-migration - Script to migrate Apache Airflow off MS SQL-Server + +This repository holds a script to migrate Apache Airflow meta database off from +Microsoft SQL-Server into other database engines as support for SQL-Server +ended in Airflow 2.8.0. diff --git a/migrate_script.py b/migrate_script.py new file mode 100644 index 0000000..6e073ae --- /dev/null +++ b/migrate_script.py @@ -0,0 +1,254 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Migration script to port an Apache Airflow metadata DB to another DB engine. + +Call it with `python migrate_script.py --extract` on the source environment +to dump the configured airflow metadata database to a SQLite file called +`migration.db`. Then you can copy this file to the target environment +(or re-configure your database backend) and run `python migrate_script.py --restore`. + +Note that it is common sense probably that this script is assuming that schedulers, +workers, triggerer and webserver are stopped while running this script. It is also +advised to halt all running jobs and DAG runs. + +Database schema must match. + +Note that this script is made for Airflow 2.7.3 and is provided without any warranty. +""" + +# TODO test 2.7.3 +# TODO docs +from __future__ import annotations + +import argparse +import logging +import os +import subprocess + +from sqlalchemy import create_engine, delete, select, text +from sqlalchemy.orm import Session + +from airflow.auth.managers.fab.models import Action, Permission, RegisterUser, Resource, Role, User +from airflow.jobs.job import Job +from airflow.models.connection import Connection +from airflow.models.dag import DagModel, DagOwnerAttributes, DagTag +from airflow.models.dagcode import DagCode +from airflow.models.dagpickle import DagPickle +from airflow.models.dagrun import DagRun, DagRunNote +from airflow.models.dagwarning import DagWarning +from airflow.models.dataset import ( + DagScheduleDatasetReference, + DatasetDagRunQueue, + DatasetEvent, + DatasetModel, + TaskOutletDatasetReference, +) +from airflow.models.db_callback_request import DbCallbackRequest +from airflow.models.errors import ImportError +from airflow.models.log import Log +from airflow.models.pool import Pool +from airflow.models.renderedtifields import RenderedTaskInstanceFields +from airflow.models.serialized_dag import SerializedDagModel +from airflow.models.slamiss import SlaMiss +from airflow.models.taskfail import TaskFail +from airflow.models.taskinstance import TaskInstance, TaskInstanceNote +from airflow.models.tasklog import LogTemplate +from airflow.models.taskmap import TaskMap +from airflow.models.taskreschedule import TaskReschedule +from airflow.models.trigger import Trigger +from airflow.models.variable import Variable +from airflow.models.xcom import BaseXCom, XCom +from airflow.settings import engine + +# configuration variables +airflow_db_url = engine.url +temp_db_url = "sqlite:///migration.db" +supported_db_versions = [ + # see https://airflow.apache.org/docs/apache-airflow/stable/migrations-ref.html + "405de8318b3a" # Airflow 2.7.3 +] + +# initialise logging +logging.basicConfig(filename="migration.log", level=logging.DEBUG) + + +def copy_airflow_tables(source_engine, target_engine): + objects_to_migrate = [ + Action, + Resource, + Role, + User, + Permission, + RegisterUser, + Connection, + DagModel, + DagCode, + DagOwnerAttributes, + DagPickle, + DagTag, + Job, + LogTemplate, + DagRun, + DagRunNote, + DagWarning, + DatasetModel, + DagScheduleDatasetReference, + TaskOutletDatasetReference, + DatasetDagRunQueue, + DatasetEvent, + DbCallbackRequest, + ImportError, + Log, + Pool, + Trigger, + RenderedTaskInstanceFields, + SerializedDagModel, + SlaMiss, + TaskInstance, + TaskInstanceNote, + TaskFail, + TaskMap, + TaskReschedule, + Variable, + BaseXCom, + "ab_user_role", # besides the ORM objects some table which demand cleaning are listed + "ab_permission_view", + "ab_permission_view_role", + ] + source_session = Session(bind=source_engine) + target_session = Session(bind=target_engine) + target_session.autoflush = False + dialect_name = target_session.bind.dialect.name + quote = "`" if dialect_name == "mysql" else '"' + + # check that source DB is a supported version + db_version = target_session.scalar("SELECT * FROM alembic_version") + if db_version not in supported_db_versions: + raise ValueError(f"Unsupported Airflow Schema version {db_version}") + source_version = source_session.scalar("SELECT * FROM alembic_version") + if source_version != db_version: + raise ValueError( + f"Database schema must match. Source is {source_version}, destination is {db_version}." + ) + + # Deserialization fails, but we want to transfer the blob as original anyway, mock serialization away + def deserialize_mock(self: XCom): + return self.value + + BaseXCom.orm_deserialize_value = deserialize_mock + + # Step 1 - delete any leftovers, ensure all tables to be migrated are empty - use reverse order + for clz in reversed(objects_to_migrate): + if isinstance(clz, str): + logging.info("Cleaning table %s", clz) + target_session.execute(f"DELETE FROM {quote}{clz}{quote}") + else: + logging.info("Cleaning table %s", clz.__tablename__) + if clz == User: + # The user has a self-constraint, need to delete in batches not to violate cross-dependencies + continue_delete = True + while continue_delete: + filter_uids = set() + for uid in target_session.execute( + text("SELECT changed_by_fk FROM ab_user WHERE changed_by_fk IS NOT NULL") + ).fetchall(): + filter_uids.add(uid) + for uid in target_session.execute( + text("SELECT created_by_fk FROM ab_user WHERE created_by_fk IS NOT NULL") + ).fetchall(): + filter_uids.add(uid) + uid_list = ",".join(str(uid[0]) for uid in filter_uids) + if uid_list: + continue_delete = ( + target_session.execute( + f"DELETE FROM ab_user WHERE id NOT IN ({uid_list})" + ).rowcount + > 0 + ) + else: + continue_delete = target_session.execute(delete(clz)).rowcount > 0 + else: + target_session.execute(delete(clz)) + # Step 2 - copy all data over, use only ORM mapped tables + for clz in objects_to_migrate: + count = 0 + if not isinstance(clz, str): + logging.info("Migration of %s started", clz.__tablename__) + for item in source_session.scalars(select(clz)).unique(): + target_session.merge(item) + target_session.flush() + count += 1 + if count % 100 == 0: + logging.info("Migration of chunk finished, %i migrated", count) + logging.info("Migration of %s finished with %i rows", clz.__tablename__, count) + target_session.commit() + # Step 3 - update sequences to ensure new records continue with valid IDs auto-generated + for clz in objects_to_migrate: + count = 0 + if not isinstance(clz, str) and "id" in clz.__dict__: + logging.info("Resetting sequence value for %s", clz.__tablename__) + max = target_session.scalar(f"SELECT MAX(id) FROM {quote}{clz.__tablename__}{quote}") + if max: + if dialect_name == "postgresql": + target_session.execute( + f"ALTER SEQUENCE {quote}{clz.__tablename__}_id_seq{quote} RESTART WITH {max+1}" + ) + elif dialect_name == "sqlite": + pass # nothing to be done for sqlite + elif dialect_name == "mysql": + target_session.execute( + f"ALTER TABLE {quote}{clz.__tablename__}{quote} AUTO_INCREMENT = {max+1}" + ) + else: # e.g. "mssql" + raise Exception(f"Database type {dialect_name} not supported") + target_session.commit() + + +def main(extract: bool, restore: bool): + if extract == restore: + raise ValueError("Please specify what you want to do! Or use --help") + + if extract: + logging.info("Creating migration database %s", temp_db_url) + envs = os.environ.copy() + envs["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = temp_db_url + subprocess.check_call(args=["airflow", "db", "reset", "--yes"], env=envs) + + # source and target database + airflow_engine = create_engine(airflow_db_url) + temp_engine = create_engine(temp_db_url) + logging.info("Connection to databases established") + + if extract: + copy_airflow_tables(airflow_engine, temp_engine) + else: + copy_airflow_tables(temp_engine, airflow_engine) + logging.info("Migration completed!") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--extract", help="Extracts the current Airflow database to a SQLite file", action="store_true" + ) + parser.add_argument( + "--restore", help="Restores from a SQLite to the current Airflow database", action="store_true" + ) + args = parser.parse_args() + main(args.extract, args.restore)
