This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new b221c36  Move the code for managing revisions to the storage interface
b221c36 is described below

commit b221c36fffc749adbacaae59eecbdd80fc5a59fa
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Oct 13 16:15:21 2025 +0100

    Move the code for managing revisions to the storage interface
---
 atr/db/interaction.py           |  18 ++++
 atr/revision.py                 | 182 --------------------------------
 atr/routes/compose.py           |   3 +-
 atr/routes/draft.py             |  53 +++++-----
 atr/routes/revisions.py         |  32 +++---
 atr/ssh.py                      |  75 +++++++-------
 atr/storage/__init__.py         |   4 +
 atr/storage/types.py            |  12 +++
 atr/storage/writers/__init__.py |   2 +
 atr/storage/writers/keys.py     |   4 +-
 atr/storage/writers/release.py  |  38 +++----
 atr/storage/writers/revision.py | 223 ++++++++++++++++++++++++++++++++++++++++
 atr/storage/writers/vote.py     |   5 +-
 atr/tasks/sbom.py               |  28 ++---
 atr/tasks/svn.py                | 107 +++++++++----------
 15 files changed, 437 insertions(+), 349 deletions(-)

diff --git a/atr/db/interaction.py b/atr/db/interaction.py
index 3641583..345b8c1 100644
--- a/atr/db/interaction.py
+++ b/atr/db/interaction.py
@@ -16,6 +16,7 @@
 # under the License.
 
 import contextlib
+import datetime
 import enum
 from collections.abc import AsyncGenerator, Sequence
 from typing import Any, Final
@@ -157,6 +158,23 @@ async def has_failing_checks(release: sql.Release, 
revision_number: str, caller_
         return result.scalar_one() > 0
 
 
+async def latest_info(project_name: str, version_name: str) -> tuple[str, str, 
datetime.datetime] | None:
+    """Get the name, editor, and timestamp of the latest revision."""
+    release_name = sql.release_name(project_name, version_name)
+    async with db.session() as data:
+        # TODO: No need to get release here
+        # Just use maximum seq from revisions
+        release = await data.release(name=release_name, _project=True).demand(
+            RuntimeError(f"Release {release_name} does not exist")
+        )
+        if release.latest_revision_number is None:
+            return None
+        revision = await data.revision(release_name=release_name, 
number=release.latest_revision_number).get()
+        if not revision:
+            return None
+    return revision.number, revision.asfuid, revision.created
+
+
 async def latest_revision(release: sql.Release) -> sql.Revision | None:
     if release.latest_revision_number is None:
         return None
diff --git a/atr/revision.py b/atr/revision.py
deleted file mode 100644
index e0aa4b6..0000000
--- a/atr/revision.py
+++ /dev/null
@@ -1,182 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import asyncio
-import contextlib
-import dataclasses
-import datetime
-import pathlib
-import secrets
-import tempfile
-from collections.abc import AsyncGenerator
-
-import aiofiles.os
-import aioshutil
-
-import atr.db as db
-import atr.db.interaction as interaction
-import atr.models.sql as sql
-import atr.tasks as tasks
-import atr.util as util
-
-
-class FailedError(Exception):
-    pass
-
-
[email protected]
-class Creating:
-    old: sql.Revision | None
-    interim_path: pathlib.Path
-    new: sql.Revision | None
-    failed: FailedError | None = None
-
-
-class SafeSession:
-    def __init__(self, temp_dir: str):
-        self._stack = contextlib.AsyncExitStack()
-        self._manager = db.session()
-        self._temp_dir = temp_dir
-
-    async def __aenter__(self) -> db.Session:
-        try:
-            return await self._stack.enter_async_context(self._manager)
-        except Exception:
-            await aioshutil.rmtree(self._temp_dir)  # type: ignore[call-arg]
-            raise
-
-    async def __aexit__(self, _exc_type, _exc, _tb):
-        await self._stack.aclose()
-        return False
-
-
[email protected]
-async def create_and_manage(
-    project_name: str,
-    version_name: str,
-    asf_uid: str,
-    description: str | None = None,
-) -> AsyncGenerator[Creating]:
-    """Manage the creation and symlinking of a mutable release revision."""
-    # Get the release
-    release_name = sql.release_name(project_name, version_name)
-    async with db.session() as data:
-        release = await data.release(name=release_name).demand(
-            RuntimeError("Release does not exist for new revision creation")
-        )
-        old_revision = await interaction.latest_revision(release)
-
-    # Create a temporary directory
-    # We ensure, below, that it's removed on any exception
-    # Use the tmp subdirectory of state, to ensure that it is on the same 
filesystem
-    prefix_token = secrets.token_hex(16)
-    temp_dir: str = await asyncio.to_thread(tempfile.mkdtemp, 
prefix=prefix_token + "-", dir=util.get_tmp_dir())
-    temp_dir_path = pathlib.Path(temp_dir)
-    creating = Creating(old=old_revision, interim_path=temp_dir_path, 
new=None, failed=None)
-    try:
-        # The directory was created by mkdtemp, but it's empty
-        if old_revision is not None:
-            # If this is not the first revision, hard link the previous 
revision
-            old_release_dir = util.release_directory(release)
-            await util.create_hard_link_clone(old_release_dir, temp_dir_path, 
do_not_create_dest_dir=True)
-        # The directory is either empty or its files are hard linked to the 
previous revision
-        yield creating
-    except FailedError as e:
-        await aioshutil.rmtree(temp_dir)  # type: ignore[call-arg]
-        creating.failed = e
-        return
-    except Exception:
-        await aioshutil.rmtree(temp_dir)  # type: ignore[call-arg]
-        raise
-
-    # Ensure that the permissions of every directory are 755
-    try:
-        await asyncio.to_thread(util.chmod_directories, temp_dir_path)
-    except Exception:
-        await aioshutil.rmtree(temp_dir)  # type: ignore[call-arg]
-        raise
-
-    async with SafeSession(temp_dir) as data:
-        try:
-            # This is the only place where models.Revision is constructed
-            # That makes models.populate_revision_sequence_and_name safe 
against races
-            # Because that event is called when data.add is called below
-            # And we have a write lock at that point through the use of 
data.begin_immediate
-            new_revision = sql.Revision(
-                release_name=release_name,
-                release=release,
-                asfuid=asf_uid,
-                created=datetime.datetime.now(datetime.UTC),
-                phase=release.phase,
-                description=description,
-            )
-
-            # Acquire the write lock and add the row
-            # We need this write lock for moving the directory below atomically
-            # But it also helps to make 
models.populate_revision_sequence_and_name safe against races
-            await data.begin_immediate()
-            data.add(new_revision)
-
-            # Flush but do not commit the new revision row to get its name and 
number
-            # The row will still be invisible to other sessions after flushing
-            await data.flush()
-            # Give the caller details about the new revision
-            creating.new = new_revision
-
-            # Rename the directory to the new revision number
-            await data.refresh(release)
-            new_revision_dir = util.release_directory(release)
-
-            # Ensure that the parent directory exists
-            await aiofiles.os.makedirs(new_revision_dir.parent, exist_ok=True)
-
-            # Rename the temporary interim directory to the new revision number
-            await aiofiles.os.rename(temp_dir, new_revision_dir)
-        except Exception:
-            await aioshutil.rmtree(temp_dir)  # type: ignore[call-arg]
-            raise
-
-        # Commit to end the transaction started by data.begin_immediate
-        # We must commit the revision before starting the checks
-        # This also releases the write lock
-        await data.commit()
-
-        async with data.begin():
-            # Run checks if in DRAFT phase
-            # We could also run this outside the data Session
-            # But then it would create its own new Session
-            # It does, however, need a transaction to be created using 
data.begin()
-            if release.phase == sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT:
-                # Must use caller_data here because we acquired the write lock
-                await tasks.draft_checks(asf_uid, project_name, version_name, 
new_revision.number, caller_data=data)
-
-
-async def latest_info(project_name: str, version_name: str) -> tuple[str, str, 
datetime.datetime] | None:
-    """Get the name, editor, and timestamp of the latest revision."""
-    release_name = sql.release_name(project_name, version_name)
-    async with db.session() as data:
-        # TODO: No need to get release here
-        # Just use maximum seq from revisions
-        release = await data.release(name=release_name, _project=True).demand(
-            RuntimeError(f"Release {release_name} does not exist")
-        )
-        if release.latest_revision_number is None:
-            return None
-        revision = await data.revision(release_name=release_name, 
number=release.latest_revision_number).get()
-        if not revision:
-            return None
-    return revision.number, revision.asfuid, revision.created
diff --git a/atr/routes/compose.py b/atr/routes/compose.py
index bfa3c87..9e91c20 100644
--- a/atr/routes/compose.py
+++ b/atr/routes/compose.py
@@ -26,7 +26,6 @@ import atr.db.interaction as interaction
 import atr.forms as forms
 import atr.models.results as results
 import atr.models.sql as sql
-import atr.revision as revision
 import atr.route as route
 import atr.routes.draft as draft
 import atr.routes.mapping as mapping
@@ -64,7 +63,7 @@ async def check(
 
     # Get the number of ongoing tasks for the current revision
     ongoing_tasks_count = 0
-    match await revision.latest_info(release.project.name, release.version):
+    match await interaction.latest_info(release.project.name, release.version):
         case (revision_number, revision_editor, revision_timestamp):
             ongoing_tasks_count = await interaction.tasks_ongoing(
                 release.project.name,
diff --git a/atr/routes/draft.py b/atr/routes/draft.py
index 58dfc2f..398ce21 100644
--- a/atr/routes/draft.py
+++ b/atr/routes/draft.py
@@ -32,7 +32,6 @@ import atr.construct as construct
 import atr.forms as forms
 import atr.log as log
 import atr.models.sql as sql
-import atr.revision as revision
 import atr.route as route
 import atr.routes.compose as compose
 import atr.routes.root as root
@@ -163,10 +162,12 @@ async def fresh(session: route.CommitterSession, 
project_name: str, version_name
     # This doesn't make sense unless the checks themselves have been updated
     # Therefore we only show the button for this to admins
     description = "Empty revision to restart all checks for the whole release 
candidate draft"
-    async with revision.create_and_manage(
-        project_name, version_name, session.uid, description=description
-    ) as _creating:
-        pass
+    async with storage.write(session.uid) as write:
+        wacp = await write.as_project_committee_participant(project_name)
+        async with wacp.revision.create_and_manage(
+            project_name, version_name, session.uid, description=description
+        ) as _creating:
+            pass
 
     return await session.redirect(
         compose.selected,
@@ -227,29 +228,29 @@ async def sbomgen(
 
     try:
         description = "SBOM generation through web interface"
-        async with revision.create_and_manage(
-            project_name, version_name, session.uid, description=description
-        ) as creating:
-            # Uses new_revision_number in a functional way
-            path_in_new_revision = creating.interim_path / rel_path
-            sbom_path_rel = rel_path.with_suffix(rel_path.suffix + 
".cdx.json").name
-            sbom_path_in_new_revision = creating.interim_path / 
rel_path.parent / sbom_path_rel
-
-            # Check that the source file exists in the new revision
-            if not await aiofiles.os.path.exists(path_in_new_revision):
-                log.error(f"Source file {rel_path} not found in new revision 
for SBOM generation.")
-                raise route.FlashError("Source artifact file not found in the 
new revision.")
-
-            # Check that the SBOM file does not already exist in the new 
revision
-            if await aiofiles.os.path.exists(sbom_path_in_new_revision):
-                raise base.ASFQuartException("SBOM file already exists", 
errorcode=400)
-
-        if creating.new is None:
-            raise route.FlashError("Internal error: New revision not found")
-
-        # Create and queue the task, using paths within the new revision
         async with storage.write(session.uid) as write:
             wacp = await write.as_project_committee_participant(project_name)
+            async with wacp.revision.create_and_manage(
+                project_name, version_name, session.uid, 
description=description
+            ) as creating:
+                # Uses new_revision_number in a functional way
+                path_in_new_revision = creating.interim_path / rel_path
+                sbom_path_rel = rel_path.with_suffix(rel_path.suffix + 
".cdx.json").name
+                sbom_path_in_new_revision = creating.interim_path / 
rel_path.parent / sbom_path_rel
+
+                # Check that the source file exists in the new revision
+                if not await aiofiles.os.path.exists(path_in_new_revision):
+                    log.error(f"Source file {rel_path} not found in new 
revision for SBOM generation.")
+                    raise route.FlashError("Source artifact file not found in 
the new revision.")
+
+                # Check that the SBOM file does not already exist in the new 
revision
+                if await aiofiles.os.path.exists(sbom_path_in_new_revision):
+                    raise base.ASFQuartException("SBOM file already exists", 
errorcode=400)
+
+            if creating.new is None:
+                raise route.FlashError("Internal error: New revision not 
found")
+
+            # Create and queue the task, using paths within the new revision
             sbom_task = await wacp.sbom.generate_cyclonedx(
                 project_name, version_name, creating.new.number, 
path_in_new_revision, sbom_path_in_new_revision
             )
diff --git a/atr/routes/revisions.py b/atr/routes/revisions.py
index 25dfc16..0a23405 100644
--- a/atr/routes/revisions.py
+++ b/atr/routes/revisions.py
@@ -30,8 +30,8 @@ import atr.db as db
 import atr.forms as forms
 import atr.models.schema as schema
 import atr.models.sql as sql
-import atr.revision as revision
 import atr.route as route
+import atr.storage as storage
 import atr.template as template
 import atr.util as util
 
@@ -121,19 +121,23 @@ async def selected_post(session: route.CommitterSession, 
project_name: str, vers
             )
 
     description = f"Copy of revision {selected_revision_number} through web 
interface"
-    async with revision.create_and_manage(project_name, version_name, 
session.uid, description=description) as creating:
-        # TODO: Stop create_and_manage from hard linking the parent first
-        await aioshutil.rmtree(creating.interim_path)  # type: ignore[call-arg]
-        await util.create_hard_link_clone(selected_revision_dir, 
creating.interim_path)
-
-    if creating.new is None:
-        raise base.ASFQuartException("Internal error: New revision not found", 
errorcode=500)
-    return await session.redirect(
-        selected,
-        success=f"Copied revision {selected_revision_number} to new latest 
revision, {creating.new.number}",
-        project_name=project_name,
-        version_name=version_name,
-    )
+    async with storage.write(session.uid) as write:
+        wacp = await write.as_project_committee_participant(project_name)
+        async with wacp.revision.create_and_manage(
+            project_name, version_name, session.uid, description=description
+        ) as creating:
+            # TODO: Stop create_and_manage from hard linking the parent first
+            await aioshutil.rmtree(creating.interim_path)  # type: 
ignore[call-arg]
+            await util.create_hard_link_clone(selected_revision_dir, 
creating.interim_path)
+
+        if creating.new is None:
+            raise base.ASFQuartException("Internal error: New revision not 
found", errorcode=500)
+        return await session.redirect(
+            selected,
+            success=f"Copied revision {selected_revision_number} to new latest 
revision, {creating.new.number}",
+            project_name=project_name,
+            version_name=version_name,
+        )
 
 
 class FilesDiff(schema.Strict):
diff --git a/atr/ssh.py b/atr/ssh.py
index 07a75a5..4073fda 100644
--- a/atr/ssh.py
+++ b/atr/ssh.py
@@ -33,7 +33,8 @@ import atr.config as config
 import atr.db as db
 import atr.log as log
 import atr.models.sql as sql
-import atr.revision as revision
+import atr.storage as storage
+import atr.storage.types as types
 import atr.user as user
 import atr.util as util
 
@@ -446,42 +447,46 @@ async def _step_07b_process_validated_rsync_write(
 
     # Create the draft revision directory structure
     description = "File synchronisation through ssh, using rsync"
-    async with revision.create_and_manage(project_name, version_name, asf_uid, 
description=description) as creating:
-        # Uses new_revision_number for logging only
-        if creating.old is not None:
-            log.info(f"Using old revision {creating.old.number} and interim 
path {creating.interim_path}")
-        # Update the rsync command path to the new revision directory
-        argv[-1] = str(creating.interim_path)
-
-        ###################################################
-        ### Calls _step_08_execute_rsync_upload_command ###
-        ###################################################
-        exit_status = await _step_08_execute_rsync(process, argv)
-        if exit_status != 0:
+    async with storage.write(asf_uid) as write:
+        wacp = await write.as_project_committee_participant(project_name)
+        async with wacp.revision.create_and_manage(
+            project_name, version_name, asf_uid, description=description
+        ) as creating:
+            # Uses new_revision_number for logging only
             if creating.old is not None:
-                for_revision = f"successor of revision {creating.old.number}"
-            else:
-                for_revision = f"initial revision for release {release_name}"
-            log.error(
-                f"rsync upload failed with exit status {exit_status} for 
{for_revision}. "
-                f"Command: {process.command} (run as {' '.join(argv)})"
-            )
-            raise revision.FailedError(f"rsync upload failed with exit status 
{exit_status} for {for_revision}")
-
-    if creating.new is not None:
-        log.info(f"rsync upload successful for revision {creating.new.number}")
-        host = config.get().APP_HOST
-        message = f"\nATR: Created revision {creating.new.number} of 
{project_name} {version_name}\n"
-        message += f"ATR: 
https://{host}/compose/{project_name}/{version_name}\n";
-        if not process.stderr.is_closing():
-            process.stderr.write(message.encode())
-            await process.stderr.drain()
-    else:
-        log.info(f"rsync upload unsuccessful for release {release_name}")
+                log.info(f"Using old revision {creating.old.number} and 
interim path {creating.interim_path}")
+            # Update the rsync command path to the new revision directory
+            argv[-1] = str(creating.interim_path)
+
+            ###################################################
+            ### Calls _step_08_execute_rsync_upload_command ###
+            ###################################################
+            exit_status = await _step_08_execute_rsync(process, argv)
+            if exit_status != 0:
+                if creating.old is not None:
+                    for_revision = f"successor of revision 
{creating.old.number}"
+                else:
+                    for_revision = f"initial revision for release 
{release_name}"
+                log.error(
+                    f"rsync upload failed with exit status {exit_status} for 
{for_revision}. "
+                    f"Command: {process.command} (run as {' '.join(argv)})"
+                )
+                raise types.FailedError(f"rsync upload failed with exit status 
{exit_status} for {for_revision}")
+
+        if creating.new is not None:
+            log.info(f"rsync upload successful for revision 
{creating.new.number}")
+            host = config.get().APP_HOST
+            message = f"\nATR: Created revision {creating.new.number} of 
{project_name} {version_name}\n"
+            message += f"ATR: 
https://{host}/compose/{project_name}/{version_name}\n";
+            if not process.stderr.is_closing():
+                process.stderr.write(message.encode())
+                await process.stderr.drain()
+        else:
+            log.info(f"rsync upload unsuccessful for release {release_name}")
 
-    # If we got here, there was no exception
-    if not process.is_closing():
-        process.exit(exit_status)
+        # If we got here, there was no exception
+        if not process.is_closing():
+            process.exit(exit_status)
 
 
 async def _step_07c_ensure_release_object_for_write(project_name: str, 
version_name: str) -> None:
diff --git a/atr/storage/__init__.py b/atr/storage/__init__.py
index a8cd78c..39e6f49 100644
--- a/atr/storage/__init__.py
+++ b/atr/storage/__init__.py
@@ -142,6 +142,7 @@ class WriteAsGeneralPublic(WriteAs):
         self.policy = writers.policy.GeneralPublic(write, self, data)
         self.project = writers.project.GeneralPublic(write, self, data)
         self.release = writers.release.GeneralPublic(write, self, data)
+        self.revision = writers.revision.GeneralPublic(write, self, data)
         self.sbom = writers.sbom.GeneralPublic(write, self, data)
         self.ssh = writers.ssh.GeneralPublic(write, self, data)
         self.tokens = writers.tokens.GeneralPublic(write, self, data)
@@ -159,6 +160,7 @@ class WriteAsFoundationCommitter(WriteAsGeneralPublic):
         self.policy = writers.policy.FoundationCommitter(write, self, data)
         self.project = writers.project.FoundationCommitter(write, self, data)
         self.release = writers.release.FoundationCommitter(write, self, data)
+        self.revision = writers.revision.FoundationCommitter(write, self, data)
         self.sbom = writers.sbom.FoundationCommitter(write, self, data)
         self.ssh = writers.ssh.FoundationCommitter(write, self, data)
         self.tokens = writers.tokens.FoundationCommitter(write, self, data)
@@ -182,6 +184,7 @@ class 
WriteAsCommitteeParticipant(WriteAsFoundationCommitter):
         self.policy = writers.policy.CommitteeParticipant(write, self, data, 
committee_name)
         self.project = writers.project.CommitteeParticipant(write, self, data, 
committee_name)
         self.release = writers.release.CommitteeParticipant(write, self, data, 
committee_name)
+        self.revision = writers.revision.CommitteeParticipant(write, self, 
data, committee_name)
         self.sbom = writers.sbom.CommitteeParticipant(write, self, data, 
committee_name)
         self.ssh = writers.ssh.CommitteeParticipant(write, self, data, 
committee_name)
         self.tokens = writers.tokens.CommitteeParticipant(write, self, data, 
committee_name)
@@ -210,6 +213,7 @@ class WriteAsCommitteeMember(WriteAsCommitteeParticipant):
         self.policy = writers.policy.CommitteeMember(write, self, data, 
committee_name)
         self.project = writers.project.CommitteeMember(write, self, data, 
committee_name)
         self.release = writers.release.CommitteeMember(write, self, data, 
committee_name)
+        self.revision = writers.revision.CommitteeMember(write, self, data, 
committee_name)
         self.sbom = writers.sbom.CommitteeMember(write, self, data, 
committee_name)
         self.ssh = writers.ssh.CommitteeMember(write, self, data, 
committee_name)
         self.tokens = writers.tokens.CommitteeMember(write, self, data, 
committee_name)
diff --git a/atr/storage/types.py b/atr/storage/types.py
index 4087674..f08985c 100644
--- a/atr/storage/types.py
+++ b/atr/storage/types.py
@@ -83,3 +83,15 @@ class PublicKeyError(Exception):
     @property
     def original_error(self) -> Exception:
         return self.__original_error
+
+
+class FailedError(Exception):
+    pass
+
+
[email protected]
+class Creating:
+    old: sql.Revision | None
+    interim_path: pathlib.Path
+    new: sql.Revision | None
+    failed: FailedError | None = None
diff --git a/atr/storage/writers/__init__.py b/atr/storage/writers/__init__.py
index 829c1be..b8243d7 100644
--- a/atr/storage/writers/__init__.py
+++ b/atr/storage/writers/__init__.py
@@ -23,6 +23,7 @@ import atr.storage.writers.keys as keys
 import atr.storage.writers.policy as policy
 import atr.storage.writers.project as project
 import atr.storage.writers.release as release
+import atr.storage.writers.revision as revision
 import atr.storage.writers.sbom as sbom
 import atr.storage.writers.ssh as ssh
 import atr.storage.writers.tokens as tokens
@@ -37,6 +38,7 @@ __all__ = [
     "policy",
     "project",
     "release",
+    "revision",
     "sbom",
     "ssh",
     "tokens",
diff --git a/atr/storage/writers/keys.py b/atr/storage/writers/keys.py
index aee319c..b8f4053 100644
--- a/atr/storage/writers/keys.py
+++ b/atr/storage/writers/keys.py
@@ -417,8 +417,6 @@ class CommitteeParticipant(FoundationCommitter):
         return outcomes
 
     async def import_keys_file(self, project_name: str, version_name: str) -> 
outcome.List[types.Key]:
-        import atr.revision as revision
-
         release = await self.__data.release(
             project_name=project_name,
             version=version_name,
@@ -438,7 +436,7 @@ class CommitteeParticipant(FoundationCommitter):
         # Remove the KEYS file if 100% imported
         if (outcomes.result_count > 0) and (outcomes.error_count == 0):
             description = "Removed KEYS file after successful import through 
web interface"
-            async with revision.create_and_manage(
+            async with self.__write_as.revision.create_and_manage(
                 project_name, version_name, self.__asf_uid, 
description=description
             ) as creating:
                 path_in_new_revision = creating.interim_path / "KEYS"
diff --git a/atr/storage/writers/release.py b/atr/storage/writers/release.py
index 96d6669..40797ea 100644
--- a/atr/storage/writers/release.py
+++ b/atr/storage/writers/release.py
@@ -36,8 +36,8 @@ import atr.db as db
 import atr.log as log
 import atr.models.api as api
 import atr.models.sql as sql
-import atr.revision as revision
 import atr.storage as storage
+import atr.storage.types as types
 import atr.util as util
 
 if TYPE_CHECKING:
@@ -94,8 +94,8 @@ class CommitteeParticipant(FoundationCommitter):
     @contextlib.asynccontextmanager
     async def create_and_manage_revision(
         self, project_name: str, version: str, description: str
-    ) -> AsyncGenerator[revision.Creating]:
-        async with revision.create_and_manage(
+    ) -> AsyncGenerator[types.Creating]:
+        async with self.__write_as.revision.create_and_manage(
             project_name, version, self.__asf_uid, description=description
         ) as _creating:
             yield _creating
@@ -155,9 +155,9 @@ class CommitteeParticipant(FoundationCommitter):
             path_to_remove = creating.interim_path / dir_to_delete_rel
             
path_to_remove.resolve().relative_to(creating.interim_path.resolve())
             if not await aiofiles.os.path.isdir(path_to_remove):
-                raise revision.FailedError(f"Path '{dir_to_delete_rel}' is not 
a directory.")
+                raise types.FailedError(f"Path '{dir_to_delete_rel}' is not a 
directory.")
             if await aiofiles.os.listdir(path_to_remove):
-                raise revision.FailedError(f"Directory '{dir_to_delete_rel}' 
is not empty.")
+                raise types.FailedError(f"Directory '{dir_to_delete_rel}' is 
not empty.")
             await aiofiles.os.rmdir(path_to_remove)
         if creating.failed is not None:
             return str(creating.failed)
@@ -377,7 +377,7 @@ class CommitteeParticipant(FoundationCommitter):
         await self.__data.refresh(release)
 
         description = "Creation of empty release candidate draft through web 
interface"
-        async with revision.create_and_manage(
+        async with self.__write_as.revision.create_and_manage(
             project_name, version, self.__asf_uid, description=description
         ) as _creating:
             pass
@@ -441,7 +441,7 @@ class CommitteeParticipant(FoundationCommitter):
                 await self.__save_file(file, target_path)
         return len(files)
 
-    async def __current_paths(self, creating: revision.Creating) -> 
list[pathlib.Path]:
+    async def __current_paths(self, creating: types.Creating) -> 
list[pathlib.Path]:
         all_current_paths_interim: list[pathlib.Path] = []
         async for p_rel_interim in 
util.paths_recursive_all(creating.interim_path):
             all_current_paths_interim.append(p_rel_interim)
@@ -508,7 +508,7 @@ class CommitteeParticipant(FoundationCommitter):
 
     async def __remove_rc_tags_revision(
         self,
-        creating: revision.Creating,
+        creating: types.Creating,
         error_messages: list[str],
     ) -> int:
         all_current_paths_interim = await self.__current_paths(creating)
@@ -586,7 +586,7 @@ class CommitteeParticipant(FoundationCommitter):
         self,
         source_files_rel: list[pathlib.Path],
         target_dir_rel: pathlib.Path,
-        creating: revision.Creating,
+        creating: types.Creating,
         moved_files_names: list[str],
         skipped_files_names: list[str],
     ) -> None:
@@ -595,22 +595,22 @@ class CommitteeParticipant(FoundationCommitter):
             target_path.resolve().relative_to(creating.interim_path.resolve())
         except ValueError:
             # Path traversal detected
-            raise revision.FailedError("Paths must be restricted to the 
release directory")
+            raise types.FailedError("Paths must be restricted to the release 
directory")
 
         if not await aiofiles.os.path.exists(target_path):
             for part in target_path.parts:
                 # TODO: This .prefix check could include some existing 
directory segment
                 if part.startswith("."):
-                    raise revision.FailedError("Segments must not start with 
'.'")
+                    raise types.FailedError("Segments must not start with '.'")
                 if ".." in part:
-                    raise revision.FailedError("Segments must not contain 
'..'")
+                    raise types.FailedError("Segments must not contain '..'")
 
             try:
                 await aiofiles.os.makedirs(target_path)
             except OSError:
-                raise revision.FailedError("Failed to create target directory")
+                raise types.FailedError("Failed to create target directory")
         elif not await aiofiles.os.path.isdir(target_path):
-            raise revision.FailedError("Target path is not a directory")
+            raise types.FailedError("Target path is not a directory")
 
         for source_file_rel in source_files_rel:
             await self.__setup_revision_item(
@@ -621,7 +621,7 @@ class CommitteeParticipant(FoundationCommitter):
         self,
         source_file_rel: pathlib.Path,
         target_dir_rel: pathlib.Path,
-        creating: revision.Creating,
+        creating: types.Creating,
         moved_files_names: list[str],
         skipped_files_names: list[str],
         target_path: pathlib.Path,
@@ -636,11 +636,11 @@ class CommitteeParticipant(FoundationCommitter):
             if (target_dir_rel == source_file_rel) or (creating.interim_path / 
target_dir_rel).resolve().is_relative_to(
                 full_source_item_path.resolve()
             ):
-                raise revision.FailedError("Cannot move a directory into 
itself or a subdirectory of itself")
+                raise types.FailedError("Cannot move a directory into itself 
or a subdirectory of itself")
 
             final_target_for_item = target_path / source_file_rel.name
             if await aiofiles.os.path.exists(final_target_for_item):
-                raise revision.FailedError("Target name already exists")
+                raise types.FailedError("Target name already exists")
 
             await aiofiles.os.rename(full_source_item_path, 
final_target_for_item)
             moved_files_names.append(source_file_rel.name)
@@ -649,11 +649,11 @@ class CommitteeParticipant(FoundationCommitter):
             bundle = [f for f in related_files if await 
aiofiles.os.path.exists(creating.interim_path / f)]
             for f_check in bundle:
                 if await aiofiles.os.path.isdir(creating.interim_path / 
f_check):
-                    raise revision.FailedError("A related 'file' is actually a 
directory")
+                    raise types.FailedError("A related 'file' is actually a 
directory")
 
             collisions = [f.name for f in bundle if await 
aiofiles.os.path.exists(target_path / f.name)]
             if collisions:
-                raise revision.FailedError("A related file already exists in 
the target directory")
+                raise types.FailedError("A related file already exists in the 
target directory")
 
             for f in bundle:
                 await aiofiles.os.rename(creating.interim_path / f, 
target_path / f.name)
diff --git a/atr/storage/writers/revision.py b/atr/storage/writers/revision.py
new file mode 100644
index 0000000..6a26b7f
--- /dev/null
+++ b/atr/storage/writers/revision.py
@@ -0,0 +1,223 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Removing this will cause circular imports
+from __future__ import annotations
+
+import asyncio
+import contextlib
+import datetime
+import pathlib
+import secrets
+import tempfile
+from typing import TYPE_CHECKING
+
+import aiofiles.os
+import aioshutil
+
+import atr.db as db
+import atr.db.interaction as interaction
+import atr.models.sql as sql
+import atr.storage as storage
+import atr.storage.types as types
+import atr.tasks as tasks
+import atr.util as util
+
+if TYPE_CHECKING:
+    from collections.abc import AsyncGenerator
+
+
+class SafeSession:
+    def __init__(self, temp_dir: str):
+        self._stack = contextlib.AsyncExitStack()
+        self._manager = db.session()
+        self._temp_dir = temp_dir
+
+    async def __aenter__(self) -> db.Session:
+        try:
+            return await self._stack.enter_async_context(self._manager)
+        except Exception:
+            await aioshutil.rmtree(self._temp_dir)  # type: ignore[call-arg]
+            raise
+
+    async def __aexit__(self, _exc_type, _exc, _tb):
+        await self._stack.aclose()
+        return False
+
+
+class GeneralPublic:
+    def __init__(
+        self,
+        write: storage.Write,
+        write_as: storage.WriteAsGeneralPublic,
+        data: db.Session,
+    ):
+        self.__write = write
+        self.__write_as = write_as
+        self.__data = data
+        self.__asf_uid = write.authorisation.asf_uid
+
+
+class FoundationCommitter(GeneralPublic):
+    def __init__(self, write: storage.Write, write_as: 
storage.WriteAsFoundationCommitter, data: db.Session):
+        super().__init__(write, write_as, data)
+        self.__write = write
+        self.__write_as = write_as
+        self.__data = data
+        asf_uid = write.authorisation.asf_uid
+        if asf_uid is None:
+            raise storage.AccessError("No ASF UID")
+        self.__asf_uid = asf_uid
+
+
+class CommitteeParticipant(FoundationCommitter):
+    def __init__(
+        self,
+        write: storage.Write,
+        write_as: storage.WriteAsCommitteeParticipant,
+        data: db.Session,
+        committee_name: str,
+    ):
+        super().__init__(write, write_as, data)
+        self.__write = write
+        self.__write_as = write_as
+        self.__data = data
+        asf_uid = write.authorisation.asf_uid
+        if asf_uid is None:
+            raise storage.AccessError("No ASF UID")
+        self.__asf_uid = asf_uid
+        self.__committee_name = committee_name
+
+    @contextlib.asynccontextmanager
+    async def create_and_manage(
+        self,
+        project_name: str,
+        version_name: str,
+        asf_uid: str,
+        description: str | None = None,
+    ) -> AsyncGenerator[types.Creating]:
+        """Manage the creation and symlinking of a mutable release revision."""
+        # Get the release
+        release_name = sql.release_name(project_name, version_name)
+        async with db.session() as data:
+            release = await data.release(name=release_name).demand(
+                RuntimeError("Release does not exist for new revision 
creation")
+            )
+            old_revision = await interaction.latest_revision(release)
+
+        # Create a temporary directory
+        # We ensure, below, that it's removed on any exception
+        # Use the tmp subdirectory of state, to ensure that it is on the same 
filesystem
+        prefix_token = secrets.token_hex(16)
+        temp_dir: str = await asyncio.to_thread(tempfile.mkdtemp, 
prefix=prefix_token + "-", dir=util.get_tmp_dir())
+        temp_dir_path = pathlib.Path(temp_dir)
+        creating = types.Creating(old=old_revision, 
interim_path=temp_dir_path, new=None, failed=None)
+        try:
+            # The directory was created by mkdtemp, but it's empty
+            if old_revision is not None:
+                # If this is not the first revision, hard link the previous 
revision
+                old_release_dir = util.release_directory(release)
+                await util.create_hard_link_clone(old_release_dir, 
temp_dir_path, do_not_create_dest_dir=True)
+            # The directory is either empty or its files are hard linked to 
the previous revision
+            yield creating
+        except types.FailedError as e:
+            await aioshutil.rmtree(temp_dir)  # type: ignore[call-arg]
+            creating.failed = e
+            return
+        except Exception:
+            await aioshutil.rmtree(temp_dir)  # type: ignore[call-arg]
+            raise
+
+        # Ensure that the permissions of every directory are 755
+        try:
+            await asyncio.to_thread(util.chmod_directories, temp_dir_path)
+        except Exception:
+            await aioshutil.rmtree(temp_dir)  # type: ignore[call-arg]
+            raise
+
+        async with SafeSession(temp_dir) as data:
+            try:
+                # This is the only place where models.Revision is constructed
+                # That makes models.populate_revision_sequence_and_name safe 
against races
+                # Because that event is called when data.add is called below
+                # And we have a write lock at that point through the use of 
data.begin_immediate
+                new_revision = sql.Revision(
+                    release_name=release_name,
+                    release=release,
+                    asfuid=asf_uid,
+                    created=datetime.datetime.now(datetime.UTC),
+                    phase=release.phase,
+                    description=description,
+                )
+
+                # Acquire the write lock and add the row
+                # We need this write lock for moving the directory below 
atomically
+                # But it also helps to make 
models.populate_revision_sequence_and_name safe against races
+                await data.begin_immediate()
+                data.add(new_revision)
+
+                # Flush but do not commit the new revision row to get its name 
and number
+                # The row will still be invisible to other sessions after 
flushing
+                await data.flush()
+                # Give the caller details about the new revision
+                creating.new = new_revision
+
+                # Rename the directory to the new revision number
+                await data.refresh(release)
+                new_revision_dir = util.release_directory(release)
+
+                # Ensure that the parent directory exists
+                await aiofiles.os.makedirs(new_revision_dir.parent, 
exist_ok=True)
+
+                # Rename the temporary interim directory to the new revision 
number
+                await aiofiles.os.rename(temp_dir, new_revision_dir)
+            except Exception:
+                await aioshutil.rmtree(temp_dir)  # type: ignore[call-arg]
+                raise
+
+            # Commit to end the transaction started by data.begin_immediate
+            # We must commit the revision before starting the checks
+            # This also releases the write lock
+            await data.commit()
+
+            async with data.begin():
+                # Run checks if in DRAFT phase
+                # We could also run this outside the data Session
+                # But then it would create its own new Session
+                # It does, however, need a transaction to be created using 
data.begin()
+                if release.phase == sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT:
+                    # Must use caller_data here because we acquired the write 
lock
+                    await tasks.draft_checks(asf_uid, project_name, 
version_name, new_revision.number, caller_data=data)
+
+
+class CommitteeMember(CommitteeParticipant):
+    def __init__(
+        self,
+        write: storage.Write,
+        write_as: storage.WriteAsCommitteeMember,
+        data: db.Session,
+        committee_name: str,
+    ):
+        super().__init__(write, write_as, data, committee_name)
+        self.__write = write
+        self.__write_as = write_as
+        self.__data = data
+        asf_uid = write.authorisation.asf_uid
+        if asf_uid is None:
+            raise storage.AccessError("No ASF UID")
+        self.__asf_uid = asf_uid
+        self.__committee_name = committee_name
diff --git a/atr/storage/writers/vote.py b/atr/storage/writers/vote.py
index d856b34..51f642d 100644
--- a/atr/storage/writers/vote.py
+++ b/atr/storage/writers/vote.py
@@ -25,7 +25,6 @@ import atr.db as db
 import atr.db.interaction as interaction
 import atr.log as log
 import atr.models.sql as sql
-import atr.revision as revision
 import atr.storage as storage
 import atr.tasks.message as message
 import atr.tasks.vote as tasks_vote
@@ -269,7 +268,7 @@ class CommitteeMember(CommitteeParticipant):
             success_message = "Vote marked as passed"
 
             description = "Create a preview revision from the last candidate 
draft"
-            async with revision.create_and_manage(
+            async with self.__write_as.revision.create_and_manage(
                 project_name, release.version, self.__asf_uid, 
description=description
             ) as _creating:
                 pass
@@ -336,7 +335,7 @@ class CommitteeMember(CommitteeParticipant):
             success_message = "Vote marked as passed"
 
             description = "Create a preview revision from the last candidate 
draft"
-            async with revision.create_and_manage(
+            async with self.__write_as.revision.create_and_manage(
                 project_name, release.version, self.__asf_uid, 
description=description
             ) as _creating:
                 pass
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index 9ae2e71..95608a8 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -30,8 +30,8 @@ import atr.config as config
 import atr.log as log
 import atr.models.results as results
 import atr.models.schema as schema
-import atr.revision as revision
 import atr.sbomtool as sbomtool
+import atr.storage as storage
 import atr.tasks.checks as checks
 import atr.tasks.checks.targz as targz
 import atr.util as util
@@ -86,18 +86,20 @@ async def augment(args: FileArgs) -> results.Results | None:
         patch_data = sbomtool.patch_to_data(patch_ops)
         merged = bundle.doc.patch(yyjson.Document(patch_data))
         description = "SBOM augmentation through web interface"
-        async with revision.create_and_manage(
-            args.project_name, args.version_name, args.asf_uid or "unknown", 
description=description
-        ) as creating:
-            new_full_path = os.path.join(str(creating.interim_path), 
args.file_path)
-            # Write to the new revision
-            log.info(f"Writing augmented SBOM to {new_full_path}")
-            await aiofiles.os.remove(new_full_path)
-            async with aiofiles.open(new_full_path, "w", encoding="utf-8") as 
f:
-                await f.write(merged.dumps())
-
-        if creating.new is None:
-            raise RuntimeError("Internal error: New revision not found")
+        async with storage.write(args.asf_uid) as write:
+            wacp = await 
write.as_project_committee_participant(args.project_name)
+            async with wacp.revision.create_and_manage(
+                args.project_name, args.version_name, args.asf_uid or 
"unknown", description=description
+            ) as creating:
+                new_full_path = os.path.join(str(creating.interim_path), 
args.file_path)
+                # Write to the new revision
+                log.info(f"Writing augmented SBOM to {new_full_path}")
+                await aiofiles.os.remove(new_full_path)
+                async with aiofiles.open(new_full_path, "w", encoding="utf-8") 
as f:
+                    await f.write(merged.dumps())
+
+            if creating.new is None:
+                raise RuntimeError("Internal error: New revision not found")
 
     return results.SBOMAugment(
         kind="sbom_augment",
diff --git a/atr/tasks/svn.py b/atr/tasks/svn.py
index 8888319..c193e06 100644
--- a/atr/tasks/svn.py
+++ b/atr/tasks/svn.py
@@ -25,7 +25,7 @@ import aioshutil
 import atr.log as log
 import atr.models.results as results
 import atr.models.schema as schema
-import atr.revision as revision
+import atr.storage as storage
 import atr.tasks.checks as checks
 
 
@@ -72,57 +72,60 @@ async def _import_files_core(args: SvnImport) -> str:
     temp_export_dir_name = ".svn-export.tmp"
 
     description = "Import of files from subversion"
-    async with revision.create_and_manage(
-        args.project_name, args.version_name, args.asf_uid, 
description=description
-    ) as creating:
-        # Uses creating.new after this block
-        log.debug(f"Created revision directory: {creating.interim_path}")
-
-        final_target_path = creating.interim_path
-        if args.target_subdirectory:
-            final_target_path = creating.interim_path / 
args.target_subdirectory
-            # Validate that final_target_path is a subdirectory of 
new_revision_dir
-            if not final_target_path.is_relative_to(creating.interim_path):
-                raise SvnImportError(
-                    f"Target subdirectory {args.target_subdirectory} is not a 
subdirectory of {creating.interim_path}"
-                )
-            await aiofiles.os.makedirs(final_target_path, exist_ok=True)
-
-        temp_export_path = creating.interim_path / temp_export_dir_name
-
-        svn_command = [
-            "svn",
-            "export",
-            "--non-interactive",
-            "--trust-server-cert-failures",
-            "unknown-ca,cn-mismatch",
-            "-r",
-            args.revision,
-            "--",
-            args.svn_url,
-            str(temp_export_path),
-        ]
-
-        await _import_files_core_run_svn_export(svn_command, temp_export_path)
-
-        # Move files from temp export path to final target path
-        # We only have to do this to avoid the SVN pegged revision issue
-        log.info(f"Moving exported files from {temp_export_path} to 
{final_target_path}")
-        for item_name in await aiofiles.os.listdir(temp_export_path):
-            source_item = temp_export_path / item_name
-            destination_item = final_target_path / item_name
-            try:
-                await aioshutil.move(str(source_item), str(destination_item))
-            except FileExistsError:
-                log.warning(f"Item {destination_item} already exists, skipping 
move for {item_name}")
-            except Exception as move_err:
-                log.error(f"Error moving {source_item} to {destination_item}: 
{move_err}")
-        await aiofiles.os.rmdir(temp_export_path)
-        log.info(f"Removed temporary export directory: {temp_export_path}")
-
-    if creating.new is None:
-        raise SvnImportError("Internal error: New revision not found")
-    return f"Successfully imported files from SVN into revision 
{creating.new.number}"
+    async with storage.write(args.asf_uid) as write:
+        wacp = await write.as_project_committee_participant(args.project_name)
+        async with wacp.revision.create_and_manage(
+            args.project_name, args.version_name, args.asf_uid, 
description=description
+        ) as creating:
+            # Uses creating.new after this block
+            log.debug(f"Created revision directory: {creating.interim_path}")
+
+            final_target_path = creating.interim_path
+            if args.target_subdirectory:
+                final_target_path = creating.interim_path / 
args.target_subdirectory
+                # Validate that final_target_path is a subdirectory of 
new_revision_dir
+                if not final_target_path.is_relative_to(creating.interim_path):
+                    raise SvnImportError(
+                        f"Target subdirectory {args.target_subdirectory}"
+                        f" is not a subdirectory of {creating.interim_path}"
+                    )
+                await aiofiles.os.makedirs(final_target_path, exist_ok=True)
+
+            temp_export_path = creating.interim_path / temp_export_dir_name
+
+            svn_command = [
+                "svn",
+                "export",
+                "--non-interactive",
+                "--trust-server-cert-failures",
+                "unknown-ca,cn-mismatch",
+                "-r",
+                args.revision,
+                "--",
+                args.svn_url,
+                str(temp_export_path),
+            ]
+
+            await _import_files_core_run_svn_export(svn_command, 
temp_export_path)
+
+            # Move files from temp export path to final target path
+            # We only have to do this to avoid the SVN pegged revision issue
+            log.info(f"Moving exported files from {temp_export_path} to 
{final_target_path}")
+            for item_name in await aiofiles.os.listdir(temp_export_path):
+                source_item = temp_export_path / item_name
+                destination_item = final_target_path / item_name
+                try:
+                    await aioshutil.move(str(source_item), 
str(destination_item))
+                except FileExistsError:
+                    log.warning(f"Item {destination_item} already exists, 
skipping move for {item_name}")
+                except Exception as move_err:
+                    log.error(f"Error moving {source_item} to 
{destination_item}: {move_err}")
+            await aiofiles.os.rmdir(temp_export_path)
+            log.info(f"Removed temporary export directory: {temp_export_path}")
+
+        if creating.new is None:
+            raise SvnImportError("Internal error: New revision not found")
+        return f"Successfully imported files from SVN into revision 
{creating.new.number}"
 
 
 async def _import_files_core_run_svn_export(svn_command: list[str], 
temp_export_path: pathlib.Path) -> None:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to