This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new 501235f Migrate state files with extensive checks
501235f is described below
commit 501235f3b0fe07523e91f636caba8f08f2c9aec4
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Jan 16 16:50:41 2026 +0000
Migrate state files with extensive checks
---
atr/server.py | 297 +++++++++++++++++++++++++++++++---------------------------
1 file changed, 158 insertions(+), 139 deletions(-)
diff --git a/atr/server.py b/atr/server.py
index 9dde3d0..5859c72 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -21,9 +21,11 @@ import asyncio
import contextlib
import datetime
import fcntl
+import multiprocessing
import os
import pathlib
import queue
+import sys
import urllib.parse
from collections.abc import Iterable
from typing import Any, Final
@@ -60,6 +62,19 @@ import atr.util as util
# We should probably find a cleaner way to do this
app: base.QuartApp | None = None
+# The order of these migrations must be checked carefully to avoid conflicts
+_MIGRATIONS: Final[list[tuple[str, str]]] = [
+ # Audit
+ ("storage-audit.log", "audit/storage-audit.log"),
+ # Cache
+ ("routes.json", "cache/routes.json"),
+ ("user_session_cache.json", "cache/user_session_cache.json"),
+ # Database
+ ("atr.db", "database/atr.db"),
+ ("atr.db-shm", "database/atr.db-shm"),
+ ("atr.db-wal", "database/atr.db-wal"),
+]
+
_SWAGGER_UI_TEMPLATE: Final[str] = """<!DOCTYPE html>
<html lang="en">
<head>
@@ -396,7 +411,17 @@ def _create_app(app_config: type[config.AppConfig]) ->
base.QuartApp:
if os.sep != "/":
raise RuntimeError('ATR requires a POSIX compatible filesystem where
os.sep is "/"')
config_mode = config.get_mode()
- _migrate_state_directory(app_config)
+
+ # Custom configuration for the database path is no longer supported
+ configured_path = app_config.SQLITE_DB_PATH
+ if configured_path != "database/atr.db":
+ print("!!!", file=sys.stderr)
+ print("ERROR: Custom values of SQLITE_DB_PATH are no longer
supported!", file=sys.stderr)
+ print("Please unset SQLITE_DB_PATH to allow the server to start",
file=sys.stderr)
+ print("!!!", file=sys.stderr)
+ sys.exit(1)
+
+ _migrate_state(app_config)
_app_dirs_setup(app_config)
log.performance_init()
app = _app_create_base(app_config)
@@ -439,50 +464,6 @@ def _create_app(app_config: type[config.AppConfig]) ->
base.QuartApp:
return app
-def _get_parent_process_age() -> float:
- import datetime
- import subprocess
- import time
-
- ppid = os.getppid()
-
- try:
- with open(f"/proc/{ppid}/stat") as f:
- stat = f.read().split()
- starttime_ticks = int(stat[21])
- ticks_per_sec = os.sysconf("SC_CLK_TCK")
- with open("/proc/stat") as f:
- for line in f:
- if line.startswith("btime "):
- boot_time = int(line.split()[1])
- break
- else:
- return 0.0
- process_start = boot_time + (starttime_ticks / ticks_per_sec)
- return time.time() - process_start
- except (FileNotFoundError, IndexError, ValueError, OSError):
- pass
-
- try:
- result = subprocess.run(
- ["ps", "-o", "lstart=", "-p", str(ppid)],
- capture_output=True,
- text=True,
- )
- if result.returncode == 0:
- start_str = result.stdout.strip()
- for fmt in ["%a %b %d %H:%M:%S %Y", "%a %d %b %H:%M:%S %Y"]:
- try:
- dt = datetime.datetime.strptime(start_str, fmt)
- return time.time() - dt.timestamp()
- except ValueError:
- continue
- except OSError:
- pass
-
- return 0.0
-
-
async def _initialise_test_environment() -> None:
if not config.get().ALLOW_TESTS:
return
@@ -515,100 +496,124 @@ async def _initialise_test_environment() -> None:
await data.commit()
-async def _register_recurrent_tasks() -> None:
- """Schedule recurring tasks"""
- # Start scheduled tasks 5 min after server start
- await asyncio.sleep(300)
- try:
- await tasks.clear_scheduled()
- metadata = await tasks.metadata_update(asf_uid="system",
schedule_next=True)
- log.info(f"Scheduled remote metadata update with ID {metadata.id}")
- await asyncio.sleep(60)
- workflow = await tasks.workflow_update(asf_uid="system",
schedule_next=True)
- log.info(f"Scheduled workflow status update with ID {workflow.id}")
+def _is_hot_reload() -> bool:
+ proc = multiprocessing.current_process()
+ if proc.name == "MainProcess":
+ # Reloading is on, but this is the parent process
+ return False
+ if "--reload" not in sys.argv:
+ # Reloading is off
+ return False
+ return True
- except Exception as e:
- log.exception(f"Failed to schedule recurrent tasks: {e!s}")
+def _migrate_path(old_path: pathlib.Path, new_path: pathlib.Path) -> None:
+ # Keep track of ancestor directories that we create
+ root_to_leaf_created: list[pathlib.Path] = []
-def _migrate_audit(state_dir: pathlib.Path) -> None:
- _migrate_file(
- state_dir / "storage-audit.log",
- state_dir / "audit" / "storage-audit.log",
- )
+ try:
+ # Create all ancestor directories of new_path if they do not exist
+ # We keep track of this so that we can attempt to roll back on failure
+ focused_ancestor_directory = new_path.parent
+ leaf_to_root_to_create = []
+ while not focused_ancestor_directory.exists():
+ leaf_to_root_to_create.append(focused_ancestor_directory)
+ focused_ancestor_directory = focused_ancestor_directory.parent
+
+ # It is not safe to run the rest of this function across filesystems
+ # Now that we have the closest existing ancestor, we can check its
device ID
+ if os.stat(old_path).st_dev !=
os.stat(focused_ancestor_directory).st_dev:
+ raise RuntimeError(f"Cannot migrate across filesystems: {old_path}
-> {new_path}")
+
+ # Start from the root, and create towards the leaf
+ for ancestor_directory in reversed(leaf_to_root_to_create):
+ ancestor_directory.mkdir()
+ root_to_leaf_created.append(ancestor_directory)
+
+ # Perform the actual migration as safely as possible
+ _migrate_path_by_type(old_path, new_path)
+ except Exception as e:
+ # Roll back any created directories from leaf to root
+ for created_directory in reversed(root_to_leaf_created):
+ created_directory.rmdir()
-def _migrate_cache(state_dir: pathlib.Path) -> None:
- _migrate_file(
- state_dir / "routes.json",
- state_dir / "cache" / "routes.json",
- )
- _migrate_file(
- state_dir / "user_session_cache.json",
- state_dir / "cache" / "user_session_cache.json",
- )
+ if isinstance(e, FileNotFoundError):
+ # We check all paths before attempting to migrate
+ # So if a file mysteriously disappears, we should raise an error
+ raise RuntimeError(f"Migration path disappeared before migration:
{old_path}") from e
+ raise
-def _migrate_database(state_dir: pathlib.Path, app_config:
type[config.AppConfig]) -> None:
- configured_path = app_config.SQLITE_DB_PATH
- if configured_path not in ("atr.db", "database/atr.db"):
- raise RuntimeError(
- f"SQLITE_DB_PATH is set to '{configured_path}' but migration only
supports "
- f"the default value 'atr.db'. Please manually migrate your
database to "
- f"'database/atr.db' and update SQLITE_DB_PATH, or remove the
custom setting."
- )
- _migrate_file(
- state_dir / "atr.db",
- state_dir / "database" / "atr.db",
- )
- for suffix in ["-shm", "-wal"]:
- old_path = state_dir / f"atr.db{suffix}"
- new_path = state_dir / "database" / f"atr.db{suffix}"
- if old_path.exists():
- _migrate_file(old_path, new_path)
-
-
-def _migrate_directory(old_path: pathlib.Path, new_path: pathlib.Path) -> None:
- if old_path.exists() and (not new_path.exists()):
- old_path.rename(new_path)
- print(f"Migrated directory: {old_path} -> {new_path}")
- elif old_path.exists() and new_path.exists():
- raise RuntimeError(f"Migration conflict: both {old_path} and
{new_path} exist")
- else:
- print(f"No directory migration needed: {old_path}")
+def _migrate_path_by_type(old_path: pathlib.Path, new_path: pathlib.Path) ->
None:
+ # Migrate a regular file
+ if old_path.is_file():
+ try:
+ # Hard linking fails if new_path already exists
+ os.link(old_path, new_path)
+ except FileExistsError:
+ # If the migration was interrupted, there may be two hard links
+ # If they link to the same inode, we can remove old_path
+ # If not, then it's a real conflict
+ if not os.path.samefile(old_path, new_path):
+ # The inodes are different, so this is a real conflict
+ raise RuntimeError(f"Migration conflict: {new_path} already
exists")
+ # Otherwise, the inodes are the same, so this is a partial
migration
+ # We fall through to complete the migration, but report the
detection first
+ print(f"Partial migration detected: {old_path} -> {new_path}")
+
+ # Hard linking was successful, so we can remove old_path
+ try:
+ os.unlink(old_path)
+ except FileNotFoundError:
+ # Some other process must have deleted old_path
+ print(f"Migration path removed by a third party during migration:
{old_path}")
+ # We do not return here, because the file is migrated
+ print(f"Migrated file: {old_path} -> {new_path}")
+ # Migrate a directory
+ elif old_path.is_dir():
+ if new_path.exists():
+ # This is a TOCTOU susceptible check, but os.rename has further
safeguards
+ raise RuntimeError(f"Migration conflict: {new_path} already
exists")
+ try:
+ # We assume that old_path is not replaced by a file before this
rename
+ # If new_path is a file, this raises a NotADirectoryError
+ # If new_path is a directory and not empty, this raises an OSError
+ # If new_path is an empty directory, it will be replaced
+ # (We accept this behaviour, but also have a TOCTOU susceptible
check above)
+ os.rename(old_path, new_path)
+ print(f"Migrated directory: {old_path} -> {new_path}")
+ except OSError as e:
+ # In this case, new_path was probably a directory and not empty
+ raise RuntimeError(f"Migration conflict: {new_path} already
exists") from e
-def _migrate_file(old_path: pathlib.Path, new_path: pathlib.Path) -> None:
- if old_path.exists() and (not new_path.exists()):
- new_path.parent.mkdir(parents=True, exist_ok=True)
- old_path.rename(new_path)
- print(f"Migrated file: {old_path} -> {new_path}")
- elif old_path.exists() and new_path.exists():
- raise RuntimeError(f"Migration conflict: both {old_path} and
{new_path} exist")
else:
- print(f"No file migration needed: {old_path}")
+ raise RuntimeError(f"Migration path is neither a file nor a directory:
{old_path}")
-def _migrate_state_directory(app_config: type[config.AppConfig]) -> None:
+def _migrate_state(app_config: type[config.AppConfig]) -> None:
+ # It's okay to use synchronous code in this function and in any functions
that it calls
state_dir = pathlib.Path(app_config.STATE_DIR)
- pending = _pending_migrations(state_dir, app_config)
- if pending:
- parent_age = _get_parent_process_age()
- if parent_age > 10.0:
- import sys
-
- print("=" * 70, file=sys.stderr)
- print("ERROR: State directory migration required but hot reload
detected", file=sys.stderr)
- print(f"Parent process age: {parent_age:.1f}s", file=sys.stderr)
- print("Pending migrations:", file=sys.stderr)
- for p in pending:
- print(f" - {p}", file=sys.stderr)
- print("", file=sys.stderr)
- print("Please restart the server, not hot reload, to apply
migrations", file=sys.stderr)
- print("=" * 70, file=sys.stderr)
- sys.exit(1)
+ # Are there migrations to apply?
+ pending_migrations = _pending_migrations(state_dir)
+ if not pending_migrations:
+ return
+ # Are we hot reloading?
+ if _is_hot_reload():
+ print("!!!", file=sys.stderr)
+ print("ERROR: Cannot migrate files during hot reload!",
file=sys.stderr)
+ print("The following files need to be migrated:", file=sys.stderr)
+ for old_path, new_path in sorted(pending_migrations):
+ print(f" - {old_path} -> {new_path}", file=sys.stderr)
+ print("", file=sys.stderr)
+ print("Restart the server to apply the migrations", file=sys.stderr)
+ print("!!!", file=sys.stderr)
+ sys.exit(1)
+
+ # Are we already migrating?
runtime_dir = state_dir / "runtime"
runtime_dir.mkdir(parents=True, exist_ok=True)
lock_path = runtime_dir / "migration.lock"
@@ -616,28 +621,42 @@ def _migrate_state_directory(app_config:
type[config.AppConfig]) -> None:
with open(lock_path, "w") as lock_file:
fcntl.flock(lock_file, fcntl.LOCK_EX)
try:
- _migrate_audit(state_dir)
- _migrate_cache(state_dir)
- _migrate_database(state_dir, app_config)
+ _migrate_state_files(state_dir, pending_migrations)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
-def _pending_migrations(state_dir: pathlib.Path, app_config:
type[config.AppConfig]) -> list[str]:
- pending = []
- if (state_dir / "storage-audit.log").exists():
- pending.append("storage-audit.log -> audit/storage-audit.log")
- if (state_dir / "routes.json").exists():
- pending.append("routes.json -> cache/routes.json")
- if (state_dir / "user_session_cache.json").exists():
- pending.append("user_session_cache.json ->
cache/user_session_cache.json")
- configured_path = app_config.SQLITE_DB_PATH
- if configured_path in ("atr.db", "database/atr.db"):
- if (state_dir / "atr.db").exists():
- pending.append("atr.db -> database/atr.db")
+def _migrate_state_files(state_dir: pathlib.Path, pending_migrations:
set[tuple[str, str]]) -> None:
+ for old_path, new_path in _MIGRATIONS:
+ if (old_path, new_path) not in pending_migrations:
+ continue
+ _migrate_path(state_dir / old_path, state_dir / new_path)
+
+
+def _pending_migrations(state_dir: pathlib.Path) -> set[tuple[str, str]]:
+ pending: set[tuple[str, str]] = set()
+ for old_path, new_path in _MIGRATIONS:
+ if (state_dir / old_path).exists():
+ pending.add((old_path, new_path))
return pending
+async def _register_recurrent_tasks() -> None:
+ """Schedule recurring tasks"""
+ # Start scheduled tasks 5 min after server start
+ await asyncio.sleep(300)
+ try:
+ await tasks.clear_scheduled()
+ metadata = await tasks.metadata_update(asf_uid="system",
schedule_next=True)
+ log.info(f"Scheduled remote metadata update with ID {metadata.id}")
+ await asyncio.sleep(60)
+ workflow = await tasks.workflow_update(asf_uid="system",
schedule_next=True)
+ log.info(f"Scheduled workflow status update with ID {workflow.id}")
+
+ except Exception as e:
+ log.exception(f"Failed to schedule recurrent tasks: {e!s}")
+
+
def _register_routes(app: base.QuartApp) -> None:
# Add a global error handler to show helpful error messages with tracebacks
@app.errorhandler(Exception)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]