This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new b6c1c55 Add a script to migrate old task data
b6c1c55 is described below
commit b6c1c55da69d5853b68dcd0c5cc7acce97c7d32c
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Jul 1 15:12:19 2025 +0100
Add a script to migrate old task data
---
scripts/vote_initiate_convert.py | 138 +++++++++++++++++++++++++++++++++++++++
1 file changed, 138 insertions(+)
diff --git a/scripts/vote_initiate_convert.py b/scripts/vote_initiate_convert.py
new file mode 100644
index 0000000..980e72b
--- /dev/null
+++ b/scripts/vote_initiate_convert.py
@@ -0,0 +1,138 @@
+#!/usr/bin/env python3
+"""Convert legacy vote_initiate task results to the new validated format."""
+
+from __future__ import annotations
+
+import asyncio
+import json
+import sys
+from typing import Any, Final, cast
+
+import sqlalchemy
+import sqlmodel
+
+import atr.db as db
+import atr.db.models as models
+import atr.results as results
+
+_LOG_PREFIX: Final = "[vote_convert]"
+
+
+def _write(message: str) -> None:
+ """Print and flush a log line."""
+ print(f"{_LOG_PREFIX} {message}")
+ sys.stdout.flush()
+
+
+async def _raw_result(data: db.Session, task_id: int) -> Any | None:
+ """Return the raw JSON column value for a given task id, bypassing the
type adapter."""
+ stmt = sqlalchemy.text("SELECT result FROM task WHERE id =
:id").bindparams(id=task_id)
+ result_row = await data.execute(stmt)
+ row = result_row.one_or_none()
+ if row is None:
+ return None
+ # The first column is the raw JSON value
+ return row[0]
+
+
+def _convert_legacy(raw_val: Any) -> results.VoteInitiate | None:
+ """Convert legacy JSON payloads to VoteInitiate, return None if
impossible."""
+
+ if raw_val in (None, "", "[]", []):
+ return None
+
+ # If it's bytes, decode to str first
+ if isinstance(raw_val, bytes | bytearray):
+ raw_val = raw_val.decode("utf-8", errors="replace")
+
+ # At this point, raw_val is usually a JSON-encoded string (e.g.
"[\"{...}\"]")
+ # Normalise to Python data structure
+ if isinstance(raw_val, str):
+ try:
+ raw_val = json.loads(raw_val)
+ except json.JSONDecodeError:
+ return None
+ return _convert_legacy_continued(raw_val)
+
+
+def _convert_legacy_continued(raw_val: Any) -> results.VoteInitiate | None:
+ # If we now have list or tuple, take the first element
+ if isinstance(raw_val, list | tuple):
+ if not raw_val:
+ return None
+ raw_val = raw_val[0]
+ # That element might itself be a JSON string
+ if isinstance(raw_val, str):
+ try:
+ raw_val = json.loads(raw_val)
+ except json.JSONDecodeError:
+ return None
+
+ # Expect raw_val to be dict now
+ if not isinstance(raw_val, dict):
+ return None
+
+ # Inject the discriminator
+ raw_val.setdefault("kind", "vote_initiate")
+
+ try:
+ return results.VoteInitiate.model_validate(raw_val)
+ except Exception:
+ return None
+
+
+async def audit_vote_initiate_results() -> None:
+ """Upgrade legacy vote_initiate task results to the new validated
format."""
+
+ await db.init_database_for_worker()
+
+ async with db.session() as data:
+ stmt = sqlmodel.select(models.Task).where(models.Task.task_type ==
models.TaskType.VOTE_INITIATE)
+ result = await data.execute(stmt)
+ tasks = result.scalars().all()
+
+ _write(f"Found {len(tasks)} vote_initiate tasks total")
+
+ upgraded = 0
+ skipped = 0
+ for task in tasks:
+ if isinstance(task.result, results.VoteInitiate):
+ # Already correct
+ continue
+
+ raw_val = await _raw_result(data, task.id)
+ new_val = _convert_legacy(raw_val)
+
+ if new_val is None:
+ skipped += 1
+ # For diagnostics, print raw value in truncated form
+ preview = (
+ f"{str(raw_val)[:120]}..." if isinstance(raw_val, str) and
len(raw_val) > 120 else str(raw_val)
+ )
+ _write(f"Task id={task.id}: could not convert legacy result,
raw preview: {preview}")
+ continue
+
+ # Apply upgrade in current transaction
+ task.result = cast("results.Results", new_val)
+ # Ensure SQL UPDATE issued before next iteration
+ await data.flush()
+ upgraded += 1
+
+ _write(f"Task id={task.id}: upgraded legacy result ->
VoteInitiate")
+
+ # Commit all changes once at the end of the context manager
+ await data.commit()
+
+ _write(f"Upgrade complete. Upgraded: {upgraded}, skipped
(unconvertible): {skipped}")
+
+
+async def amain() -> None:
+ await audit_vote_initiate_results()
+
+
+def main() -> None:
+ asyncio.run(amain())
+
+
+if __name__ == "__main__":
+ main()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]