This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git


The following commit(s) were added to refs/heads/main by this push:
     new b6c1c55  Add a script to migrate old task data
b6c1c55 is described below

commit b6c1c55da69d5853b68dcd0c5cc7acce97c7d32c
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Jul 1 15:12:19 2025 +0100

    Add a script to migrate old task data
---
 scripts/vote_initiate_convert.py | 138 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 138 insertions(+)

diff --git a/scripts/vote_initiate_convert.py b/scripts/vote_initiate_convert.py
new file mode 100644
index 0000000..980e72b
--- /dev/null
+++ b/scripts/vote_initiate_convert.py
@@ -0,0 +1,138 @@
+#!/usr/bin/env python3
+"""Convert legacy vote_initiate task results to the new validated format."""
+
+from __future__ import annotations
+
+import asyncio
+import json
+import sys
+from typing import Any, Final, cast
+
+import sqlalchemy
+import sqlmodel
+
+import atr.db as db
+import atr.db.models as models
+import atr.results as results
+
+_LOG_PREFIX: Final = "[vote_convert]"
+
+
+def _write(message: str) -> None:
+    """Print and flush a log line."""
+    print(f"{_LOG_PREFIX} {message}")
+    sys.stdout.flush()
+
+
+async def _raw_result(data: db.Session, task_id: int) -> Any | None:
+    """Return the raw JSON column value for a given task id, bypassing the 
type adapter."""
+    stmt = sqlalchemy.text("SELECT result FROM task WHERE id = 
:id").bindparams(id=task_id)
+    result_row = await data.execute(stmt)
+    row = result_row.one_or_none()
+    if row is None:
+        return None
+    # The first column is the raw JSON value
+    return row[0]
+
+
+def _convert_legacy(raw_val: Any) -> results.VoteInitiate | None:
+    """Convert legacy JSON payloads to VoteInitiate, return None if 
impossible."""
+
+    if raw_val in (None, "", "[]", []):
+        return None
+
+    # If it's bytes, decode to str first
+    if isinstance(raw_val, bytes | bytearray):
+        raw_val = raw_val.decode("utf-8", errors="replace")
+
+    # At this point, raw_val is usually a JSON-encoded string (e.g. 
"[\"{...}\"]")
+    # Normalise to Python data structure
+    if isinstance(raw_val, str):
+        try:
+            raw_val = json.loads(raw_val)
+        except json.JSONDecodeError:
+            return None
+    return _convert_legacy_continued(raw_val)
+
+
+def _convert_legacy_continued(raw_val: Any) -> results.VoteInitiate | None:
+    # If we now have list or tuple, take the first element
+    if isinstance(raw_val, list | tuple):
+        if not raw_val:
+            return None
+        raw_val = raw_val[0]
+        # That element might itself be a JSON string
+        if isinstance(raw_val, str):
+            try:
+                raw_val = json.loads(raw_val)
+            except json.JSONDecodeError:
+                return None
+
+    # Expect raw_val to be dict now
+    if not isinstance(raw_val, dict):
+        return None
+
+    # Inject the discriminator
+    raw_val.setdefault("kind", "vote_initiate")
+
+    try:
+        return results.VoteInitiate.model_validate(raw_val)
+    except Exception:
+        return None
+
+
+async def audit_vote_initiate_results() -> None:
+    """Upgrade legacy vote_initiate task results to the new validated 
format."""
+
+    await db.init_database_for_worker()
+
+    async with db.session() as data:
+        stmt = sqlmodel.select(models.Task).where(models.Task.task_type == 
models.TaskType.VOTE_INITIATE)
+        result = await data.execute(stmt)
+        tasks = result.scalars().all()
+
+        _write(f"Found {len(tasks)} vote_initiate tasks total")
+
+        upgraded = 0
+        skipped = 0
+        for task in tasks:
+            if isinstance(task.result, results.VoteInitiate):
+                # Already correct
+                continue
+
+            raw_val = await _raw_result(data, task.id)
+            new_val = _convert_legacy(raw_val)
+
+            if new_val is None:
+                skipped += 1
+                # For diagnostics, print raw value in truncated form
+                preview = (
+                    f"{str(raw_val)[:120]}..." if isinstance(raw_val, str) and 
len(raw_val) > 120 else str(raw_val)
+                )
+                _write(f"Task id={task.id}: could not convert legacy result, 
raw preview: {preview}")
+                continue
+
+            # Apply upgrade in current transaction
+            task.result = cast("results.Results", new_val)
+            # Ensure SQL UPDATE issued before next iteration
+            await data.flush()
+            upgraded += 1
+
+            _write(f"Task id={task.id}: upgraded legacy result -> 
VoteInitiate")
+
+        # Commit all changes once at the end of the context manager
+        await data.commit()
+
+        _write(f"Upgrade complete. Upgraded: {upgraded}, skipped 
(unconvertible): {skipped}")
+
+
+async def amain() -> None:
+    await audit_vote_initiate_results()
+
+
+def main() -> None:
+    asyncio.run(amain())
+
+
+if __name__ == "__main__":
+    main()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to