This is an automated email from the ASF dual-hosted git repository. sbp pushed a commit to branch safe-migrate-path in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 571d87ce068ac6d1e28c5c6c43338ea5afe9c9b7 Author: Sean B. Palmer <[email protected]> AuthorDate: Fri Jan 16 16:50:41 2026 +0000 Migrate state files with extensive checks --- atr/server.py | 300 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 161 insertions(+), 139 deletions(-) diff --git a/atr/server.py b/atr/server.py index 9dde3d0..bf56af3 100644 --- a/atr/server.py +++ b/atr/server.py @@ -21,9 +21,11 @@ import asyncio import contextlib import datetime import fcntl +import multiprocessing import os import pathlib import queue +import sys import urllib.parse from collections.abc import Iterable from typing import Any, Final @@ -60,6 +62,19 @@ import atr.util as util # We should probably find a cleaner way to do this app: base.QuartApp | None = None +# The order of these migrations must be checked carefully to avoid conflicts +_MIGRATIONS: Final[list[tuple[str, str]]] = [ + # Audit + ("storage-audit.log", "audit/storage-audit.log"), + # Cache + ("routes.json", "cache/routes.json"), + ("user_session_cache.json", "cache/user_session_cache.json"), + # Database + ("atr.db", "database/atr.db"), + ("atr.db-shm", "database/atr.db-shm"), + ("atr.db-wal", "database/atr.db-wal"), +] + _SWAGGER_UI_TEMPLATE: Final[str] = """<!DOCTYPE html> <html lang="en"> <head> @@ -396,7 +411,7 @@ def _create_app(app_config: type[config.AppConfig]) -> base.QuartApp: if os.sep != "/": raise RuntimeError('ATR requires a POSIX compatible filesystem where os.sep is "/"') config_mode = config.get_mode() - _migrate_state_directory(app_config) + _migrate_state(app_config) _app_dirs_setup(app_config) log.performance_init() app = _app_create_base(app_config) @@ -439,50 +454,6 @@ def _create_app(app_config: type[config.AppConfig]) -> base.QuartApp: return app -def _get_parent_process_age() -> float: - import datetime - import subprocess - import time - - ppid = os.getppid() - - try: - with open(f"/proc/{ppid}/stat") as f: - stat = f.read().split() - starttime_ticks = int(stat[21]) - ticks_per_sec = os.sysconf("SC_CLK_TCK") - with open("/proc/stat") as f: - for line in f: - if line.startswith("btime "): - boot_time = int(line.split()[1]) - break - else: - return 0.0 - process_start = boot_time + (starttime_ticks / ticks_per_sec) - return time.time() - process_start - except (FileNotFoundError, IndexError, ValueError, OSError): - pass - - try: - result = subprocess.run( - ["ps", "-o", "lstart=", "-p", str(ppid)], - capture_output=True, - text=True, - ) - if result.returncode == 0: - start_str = result.stdout.strip() - for fmt in ["%a %b %d %H:%M:%S %Y", "%a %d %b %H:%M:%S %Y"]: - try: - dt = datetime.datetime.strptime(start_str, fmt) - return time.time() - dt.timestamp() - except ValueError: - continue - except OSError: - pass - - return 0.0 - - async def _initialise_test_environment() -> None: if not config.get().ALLOW_TESTS: return @@ -515,100 +486,137 @@ async def _initialise_test_environment() -> None: await data.commit() -async def _register_recurrent_tasks() -> None: - """Schedule recurring tasks""" - # Start scheduled tasks 5 min after server start - await asyncio.sleep(300) - try: - await tasks.clear_scheduled() - metadata = await tasks.metadata_update(asf_uid="system", schedule_next=True) - log.info(f"Scheduled remote metadata update with ID {metadata.id}") - await asyncio.sleep(60) - workflow = await tasks.workflow_update(asf_uid="system", schedule_next=True) - log.info(f"Scheduled workflow status update with ID {workflow.id}") +def _is_hot_reload() -> bool: + proc = multiprocessing.current_process() + if proc.name == "MainProcess": + # Reloading is on, but this is the parent process + return False + if "--reload" not in sys.argv: + # Reloading is off + return False + return True - except Exception as e: - log.exception(f"Failed to schedule recurrent tasks: {e!s}") +def _migrate_path(old_path: pathlib.Path, new_path: pathlib.Path) -> None: + # Keep track of ancestor directories that we create + root_to_leaf_created: list[pathlib.Path] = [] -def _migrate_audit(state_dir: pathlib.Path) -> None: - _migrate_file( - state_dir / "storage-audit.log", - state_dir / "audit" / "storage-audit.log", - ) + try: + # Create all ancestor directories of new_path if they do not exist + # We keep track of this so that we can attempt to roll back on failure + focused_ancestor_directory = new_path.parent + leaf_to_root_to_create = [] + while not focused_ancestor_directory.exists(): + leaf_to_root_to_create.append(focused_ancestor_directory) + focused_ancestor_directory = focused_ancestor_directory.parent + + # It is not safe to run the rest of this function across filesystems + # Now that we have the closest existing ancestor, we can check its device ID + if os.stat(old_path).st_dev != os.stat(focused_ancestor_directory).st_dev: + raise RuntimeError(f"Cannot migrate across filesystems: {old_path} -> {new_path}") + + # Start from the root, and create towards the leaf + for ancestor_directory in reversed(leaf_to_root_to_create): + ancestor_directory.mkdir() + root_to_leaf_created.append(ancestor_directory) + + # Perform the actual migration as safely as possible + _migrate_path_by_type(old_path, new_path) + except Exception as e: + # Roll back any created directories from leaf to root + for created_directory in reversed(root_to_leaf_created): + created_directory.rmdir() -def _migrate_cache(state_dir: pathlib.Path) -> None: - _migrate_file( - state_dir / "routes.json", - state_dir / "cache" / "routes.json", - ) - _migrate_file( - state_dir / "user_session_cache.json", - state_dir / "cache" / "user_session_cache.json", - ) + if isinstance(e, FileNotFoundError): + # We check all paths before attempting to migrate + # So if a file mysteriously disappears, we should raise an error + raise RuntimeError(f"Migration path disappeared before migration: {old_path}") from e + raise -def _migrate_database(state_dir: pathlib.Path, app_config: type[config.AppConfig]) -> None: - configured_path = app_config.SQLITE_DB_PATH - if configured_path not in ("atr.db", "database/atr.db"): - raise RuntimeError( - f"SQLITE_DB_PATH is set to '{configured_path}' but migration only supports " - f"the default value 'atr.db'. Please manually migrate your database to " - f"'database/atr.db' and update SQLITE_DB_PATH, or remove the custom setting." - ) - _migrate_file( - state_dir / "atr.db", - state_dir / "database" / "atr.db", - ) - for suffix in ["-shm", "-wal"]: - old_path = state_dir / f"atr.db{suffix}" - new_path = state_dir / "database" / f"atr.db{suffix}" - if old_path.exists(): - _migrate_file(old_path, new_path) - - -def _migrate_directory(old_path: pathlib.Path, new_path: pathlib.Path) -> None: - if old_path.exists() and (not new_path.exists()): - old_path.rename(new_path) - print(f"Migrated directory: {old_path} -> {new_path}") - elif old_path.exists() and new_path.exists(): - raise RuntimeError(f"Migration conflict: both {old_path} and {new_path} exist") - else: - print(f"No directory migration needed: {old_path}") +def _migrate_path_by_type(old_path: pathlib.Path, new_path: pathlib.Path) -> None: + # Migrate a regular file + if old_path.is_file(): + try: + # Hard linking fails if new_path already exists + os.link(old_path, new_path) + except FileExistsError: + # If the migration was interrupted, there may be two hard links + # If they link to the same inode, we can remove old_path + # If not, then it's a real conflict + if not os.path.samefile(old_path, new_path): + # The inodes are different, so this is a real conflict + raise RuntimeError(f"Migration conflict: {new_path} already exists") + # Otherwise, the inodes are the same, so this is a partial migration + # We fall through to complete the migration, but report the detection first + print(f"Partial migration detected: {old_path} -> {new_path}") + + # Hard linking was successful, so we can remove old_path + try: + os.unlink(old_path) + except FileNotFoundError: + # Some other process must have deleted old_path + print(f"Migration path removed by a third party during migration: {old_path}") + # We do not return here, because the file is migrated + print(f"Migrated file: {old_path} -> {new_path}") + # Migrate a directory + elif old_path.is_dir(): + if new_path.exists(): + # This is a TOCTOU susceptible check, but os.rename has further safeguards + raise RuntimeError(f"Migration conflict: {new_path} already exists") + try: + # We assume that old_path is not replaced by a file before this rename + # If new_path is a file, this raises a NotADirectoryError + # If new_path is a directory and not empty, this raises an OSError + # If new_path is an empty directory, it will be replaced + # (We accept this behaviour, but also have a TOCTOU susceptible check above) + os.rename(old_path, new_path) + print(f"Migrated directory: {old_path} -> {new_path}") + except OSError as e: + # In this case, new_path was probably a directory and not empty + raise RuntimeError(f"Migration conflict: {new_path} already exists") from e -def _migrate_file(old_path: pathlib.Path, new_path: pathlib.Path) -> None: - if old_path.exists() and (not new_path.exists()): - new_path.parent.mkdir(parents=True, exist_ok=True) - old_path.rename(new_path) - print(f"Migrated file: {old_path} -> {new_path}") - elif old_path.exists() and new_path.exists(): - raise RuntimeError(f"Migration conflict: both {old_path} and {new_path} exist") else: - print(f"No file migration needed: {old_path}") + raise RuntimeError(f"Migration path is neither a file nor a directory: {old_path}") -def _migrate_state_directory(app_config: type[config.AppConfig]) -> None: +def _migrate_state(app_config: type[config.AppConfig]) -> None: + # It's okay to use synchronous code in this function and in any functions that it calls state_dir = pathlib.Path(app_config.STATE_DIR) - pending = _pending_migrations(state_dir, app_config) - if pending: - parent_age = _get_parent_process_age() - if parent_age > 10.0: - import sys - - print("=" * 70, file=sys.stderr) - print("ERROR: State directory migration required but hot reload detected", file=sys.stderr) - print(f"Parent process age: {parent_age:.1f}s", file=sys.stderr) - print("Pending migrations:", file=sys.stderr) - for p in pending: - print(f" - {p}", file=sys.stderr) - print("", file=sys.stderr) - print("Please restart the server, not hot reload, to apply migrations", file=sys.stderr) - print("=" * 70, file=sys.stderr) - sys.exit(1) + # Are there migrations to apply? + pending_migrations = _pending_migrations(state_dir) + if not pending_migrations: + return + # Are we hot reloading? + if _is_hot_reload(): + print("!!!", file=sys.stderr) + print("ERROR: Cannot migrate files during hot reload!", file=sys.stderr) + print("The following files need to be migrated:", file=sys.stderr) + for old_path, new_path in sorted(pending_migrations): + print(f" - {old_path} -> {new_path}", file=sys.stderr) + print("", file=sys.stderr) + print("Restart the server to apply the migrations", file=sys.stderr) + print("!!!", file=sys.stderr) + sys.exit(1) + + # Are we using custom configuration for the database? + configured_path = app_config.SQLITE_DB_PATH + if configured_path not in ("atr.db", "database/atr.db"): + print("!!!", file=sys.stderr) + print("ERROR: Cannot migrate files with custom SQLITE_DB_PATH!", file=sys.stderr) + print("The following files need to be migrated:", file=sys.stderr) + for old_path, new_path in sorted(pending_migrations): + print(f" - {old_path} -> {new_path}", file=sys.stderr) + print("", file=sys.stderr) + print("Set SQLITE_DB_PATH to 'atr.db' to migrate files", file=sys.stderr) + print("!!!", file=sys.stderr) + sys.exit(1) + + # Are we already migrating? runtime_dir = state_dir / "runtime" runtime_dir.mkdir(parents=True, exist_ok=True) lock_path = runtime_dir / "migration.lock" @@ -616,28 +624,42 @@ def _migrate_state_directory(app_config: type[config.AppConfig]) -> None: with open(lock_path, "w") as lock_file: fcntl.flock(lock_file, fcntl.LOCK_EX) try: - _migrate_audit(state_dir) - _migrate_cache(state_dir) - _migrate_database(state_dir, app_config) + _migrate_state_files(state_dir, pending_migrations) finally: fcntl.flock(lock_file, fcntl.LOCK_UN) -def _pending_migrations(state_dir: pathlib.Path, app_config: type[config.AppConfig]) -> list[str]: - pending = [] - if (state_dir / "storage-audit.log").exists(): - pending.append("storage-audit.log -> audit/storage-audit.log") - if (state_dir / "routes.json").exists(): - pending.append("routes.json -> cache/routes.json") - if (state_dir / "user_session_cache.json").exists(): - pending.append("user_session_cache.json -> cache/user_session_cache.json") - configured_path = app_config.SQLITE_DB_PATH - if configured_path in ("atr.db", "database/atr.db"): - if (state_dir / "atr.db").exists(): - pending.append("atr.db -> database/atr.db") +def _migrate_state_files(state_dir: pathlib.Path, pending_migrations: set[tuple[str, str]]) -> None: + for old_path, new_path in _MIGRATIONS: + if (old_path, new_path) not in pending_migrations: + continue + _migrate_path(state_dir / old_path, state_dir / new_path) + + +def _pending_migrations(state_dir: pathlib.Path) -> set[tuple[str, str]]: + pending: set[tuple[str, str]] = set() + for old_path, new_path in _MIGRATIONS: + if (state_dir / old_path).exists(): + pending.add((old_path, new_path)) return pending +async def _register_recurrent_tasks() -> None: + """Schedule recurring tasks""" + # Start scheduled tasks 5 min after server start + await asyncio.sleep(300) + try: + await tasks.clear_scheduled() + metadata = await tasks.metadata_update(asf_uid="system", schedule_next=True) + log.info(f"Scheduled remote metadata update with ID {metadata.id}") + await asyncio.sleep(60) + workflow = await tasks.workflow_update(asf_uid="system", schedule_next=True) + log.info(f"Scheduled workflow status update with ID {workflow.id}") + + except Exception as e: + log.exception(f"Failed to schedule recurrent tasks: {e!s}") + + def _register_routes(app: base.QuartApp) -> None: # Add a global error handler to show helpful error messages with tracebacks @app.errorhandler(Exception) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
