This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new b221c36 Move the code for managing revisions to the storage interface
b221c36 is described below
commit b221c36fffc749adbacaae59eecbdd80fc5a59fa
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Oct 13 16:15:21 2025 +0100
Move the code for managing revisions to the storage interface
---
atr/db/interaction.py | 18 ++++
atr/revision.py | 182 --------------------------------
atr/routes/compose.py | 3 +-
atr/routes/draft.py | 53 +++++-----
atr/routes/revisions.py | 32 +++---
atr/ssh.py | 75 +++++++-------
atr/storage/__init__.py | 4 +
atr/storage/types.py | 12 +++
atr/storage/writers/__init__.py | 2 +
atr/storage/writers/keys.py | 4 +-
atr/storage/writers/release.py | 38 +++----
atr/storage/writers/revision.py | 223 ++++++++++++++++++++++++++++++++++++++++
atr/storage/writers/vote.py | 5 +-
atr/tasks/sbom.py | 28 ++---
atr/tasks/svn.py | 107 +++++++++----------
15 files changed, 437 insertions(+), 349 deletions(-)
diff --git a/atr/db/interaction.py b/atr/db/interaction.py
index 3641583..345b8c1 100644
--- a/atr/db/interaction.py
+++ b/atr/db/interaction.py
@@ -16,6 +16,7 @@
# under the License.
import contextlib
+import datetime
import enum
from collections.abc import AsyncGenerator, Sequence
from typing import Any, Final
@@ -157,6 +158,23 @@ async def has_failing_checks(release: sql.Release,
revision_number: str, caller_
return result.scalar_one() > 0
+async def latest_info(project_name: str, version_name: str) -> tuple[str, str,
datetime.datetime] | None:
+ """Get the name, editor, and timestamp of the latest revision."""
+ release_name = sql.release_name(project_name, version_name)
+ async with db.session() as data:
+ # TODO: No need to get release here
+ # Just use maximum seq from revisions
+ release = await data.release(name=release_name, _project=True).demand(
+ RuntimeError(f"Release {release_name} does not exist")
+ )
+ if release.latest_revision_number is None:
+ return None
+ revision = await data.revision(release_name=release_name,
number=release.latest_revision_number).get()
+ if not revision:
+ return None
+ return revision.number, revision.asfuid, revision.created
+
+
async def latest_revision(release: sql.Release) -> sql.Revision | None:
if release.latest_revision_number is None:
return None
diff --git a/atr/revision.py b/atr/revision.py
deleted file mode 100644
index e0aa4b6..0000000
--- a/atr/revision.py
+++ /dev/null
@@ -1,182 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import asyncio
-import contextlib
-import dataclasses
-import datetime
-import pathlib
-import secrets
-import tempfile
-from collections.abc import AsyncGenerator
-
-import aiofiles.os
-import aioshutil
-
-import atr.db as db
-import atr.db.interaction as interaction
-import atr.models.sql as sql
-import atr.tasks as tasks
-import atr.util as util
-
-
-class FailedError(Exception):
- pass
-
-
[email protected]
-class Creating:
- old: sql.Revision | None
- interim_path: pathlib.Path
- new: sql.Revision | None
- failed: FailedError | None = None
-
-
-class SafeSession:
- def __init__(self, temp_dir: str):
- self._stack = contextlib.AsyncExitStack()
- self._manager = db.session()
- self._temp_dir = temp_dir
-
- async def __aenter__(self) -> db.Session:
- try:
- return await self._stack.enter_async_context(self._manager)
- except Exception:
- await aioshutil.rmtree(self._temp_dir) # type: ignore[call-arg]
- raise
-
- async def __aexit__(self, _exc_type, _exc, _tb):
- await self._stack.aclose()
- return False
-
-
[email protected]
-async def create_and_manage(
- project_name: str,
- version_name: str,
- asf_uid: str,
- description: str | None = None,
-) -> AsyncGenerator[Creating]:
- """Manage the creation and symlinking of a mutable release revision."""
- # Get the release
- release_name = sql.release_name(project_name, version_name)
- async with db.session() as data:
- release = await data.release(name=release_name).demand(
- RuntimeError("Release does not exist for new revision creation")
- )
- old_revision = await interaction.latest_revision(release)
-
- # Create a temporary directory
- # We ensure, below, that it's removed on any exception
- # Use the tmp subdirectory of state, to ensure that it is on the same
filesystem
- prefix_token = secrets.token_hex(16)
- temp_dir: str = await asyncio.to_thread(tempfile.mkdtemp,
prefix=prefix_token + "-", dir=util.get_tmp_dir())
- temp_dir_path = pathlib.Path(temp_dir)
- creating = Creating(old=old_revision, interim_path=temp_dir_path,
new=None, failed=None)
- try:
- # The directory was created by mkdtemp, but it's empty
- if old_revision is not None:
- # If this is not the first revision, hard link the previous
revision
- old_release_dir = util.release_directory(release)
- await util.create_hard_link_clone(old_release_dir, temp_dir_path,
do_not_create_dest_dir=True)
- # The directory is either empty or its files are hard linked to the
previous revision
- yield creating
- except FailedError as e:
- await aioshutil.rmtree(temp_dir) # type: ignore[call-arg]
- creating.failed = e
- return
- except Exception:
- await aioshutil.rmtree(temp_dir) # type: ignore[call-arg]
- raise
-
- # Ensure that the permissions of every directory are 755
- try:
- await asyncio.to_thread(util.chmod_directories, temp_dir_path)
- except Exception:
- await aioshutil.rmtree(temp_dir) # type: ignore[call-arg]
- raise
-
- async with SafeSession(temp_dir) as data:
- try:
- # This is the only place where models.Revision is constructed
- # That makes models.populate_revision_sequence_and_name safe
against races
- # Because that event is called when data.add is called below
- # And we have a write lock at that point through the use of
data.begin_immediate
- new_revision = sql.Revision(
- release_name=release_name,
- release=release,
- asfuid=asf_uid,
- created=datetime.datetime.now(datetime.UTC),
- phase=release.phase,
- description=description,
- )
-
- # Acquire the write lock and add the row
- # We need this write lock for moving the directory below atomically
- # But it also helps to make
models.populate_revision_sequence_and_name safe against races
- await data.begin_immediate()
- data.add(new_revision)
-
- # Flush but do not commit the new revision row to get its name and
number
- # The row will still be invisible to other sessions after flushing
- await data.flush()
- # Give the caller details about the new revision
- creating.new = new_revision
-
- # Rename the directory to the new revision number
- await data.refresh(release)
- new_revision_dir = util.release_directory(release)
-
- # Ensure that the parent directory exists
- await aiofiles.os.makedirs(new_revision_dir.parent, exist_ok=True)
-
- # Rename the temporary interim directory to the new revision number
- await aiofiles.os.rename(temp_dir, new_revision_dir)
- except Exception:
- await aioshutil.rmtree(temp_dir) # type: ignore[call-arg]
- raise
-
- # Commit to end the transaction started by data.begin_immediate
- # We must commit the revision before starting the checks
- # This also releases the write lock
- await data.commit()
-
- async with data.begin():
- # Run checks if in DRAFT phase
- # We could also run this outside the data Session
- # But then it would create its own new Session
- # It does, however, need a transaction to be created using
data.begin()
- if release.phase == sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT:
- # Must use caller_data here because we acquired the write lock
- await tasks.draft_checks(asf_uid, project_name, version_name,
new_revision.number, caller_data=data)
-
-
-async def latest_info(project_name: str, version_name: str) -> tuple[str, str,
datetime.datetime] | None:
- """Get the name, editor, and timestamp of the latest revision."""
- release_name = sql.release_name(project_name, version_name)
- async with db.session() as data:
- # TODO: No need to get release here
- # Just use maximum seq from revisions
- release = await data.release(name=release_name, _project=True).demand(
- RuntimeError(f"Release {release_name} does not exist")
- )
- if release.latest_revision_number is None:
- return None
- revision = await data.revision(release_name=release_name,
number=release.latest_revision_number).get()
- if not revision:
- return None
- return revision.number, revision.asfuid, revision.created
diff --git a/atr/routes/compose.py b/atr/routes/compose.py
index bfa3c87..9e91c20 100644
--- a/atr/routes/compose.py
+++ b/atr/routes/compose.py
@@ -26,7 +26,6 @@ import atr.db.interaction as interaction
import atr.forms as forms
import atr.models.results as results
import atr.models.sql as sql
-import atr.revision as revision
import atr.route as route
import atr.routes.draft as draft
import atr.routes.mapping as mapping
@@ -64,7 +63,7 @@ async def check(
# Get the number of ongoing tasks for the current revision
ongoing_tasks_count = 0
- match await revision.latest_info(release.project.name, release.version):
+ match await interaction.latest_info(release.project.name, release.version):
case (revision_number, revision_editor, revision_timestamp):
ongoing_tasks_count = await interaction.tasks_ongoing(
release.project.name,
diff --git a/atr/routes/draft.py b/atr/routes/draft.py
index 58dfc2f..398ce21 100644
--- a/atr/routes/draft.py
+++ b/atr/routes/draft.py
@@ -32,7 +32,6 @@ import atr.construct as construct
import atr.forms as forms
import atr.log as log
import atr.models.sql as sql
-import atr.revision as revision
import atr.route as route
import atr.routes.compose as compose
import atr.routes.root as root
@@ -163,10 +162,12 @@ async def fresh(session: route.CommitterSession,
project_name: str, version_name
# This doesn't make sense unless the checks themselves have been updated
# Therefore we only show the button for this to admins
description = "Empty revision to restart all checks for the whole release
candidate draft"
- async with revision.create_and_manage(
- project_name, version_name, session.uid, description=description
- ) as _creating:
- pass
+ async with storage.write(session.uid) as write:
+ wacp = await write.as_project_committee_participant(project_name)
+ async with wacp.revision.create_and_manage(
+ project_name, version_name, session.uid, description=description
+ ) as _creating:
+ pass
return await session.redirect(
compose.selected,
@@ -227,29 +228,29 @@ async def sbomgen(
try:
description = "SBOM generation through web interface"
- async with revision.create_and_manage(
- project_name, version_name, session.uid, description=description
- ) as creating:
- # Uses new_revision_number in a functional way
- path_in_new_revision = creating.interim_path / rel_path
- sbom_path_rel = rel_path.with_suffix(rel_path.suffix +
".cdx.json").name
- sbom_path_in_new_revision = creating.interim_path /
rel_path.parent / sbom_path_rel
-
- # Check that the source file exists in the new revision
- if not await aiofiles.os.path.exists(path_in_new_revision):
- log.error(f"Source file {rel_path} not found in new revision
for SBOM generation.")
- raise route.FlashError("Source artifact file not found in the
new revision.")
-
- # Check that the SBOM file does not already exist in the new
revision
- if await aiofiles.os.path.exists(sbom_path_in_new_revision):
- raise base.ASFQuartException("SBOM file already exists",
errorcode=400)
-
- if creating.new is None:
- raise route.FlashError("Internal error: New revision not found")
-
- # Create and queue the task, using paths within the new revision
async with storage.write(session.uid) as write:
wacp = await write.as_project_committee_participant(project_name)
+ async with wacp.revision.create_and_manage(
+ project_name, version_name, session.uid,
description=description
+ ) as creating:
+ # Uses new_revision_number in a functional way
+ path_in_new_revision = creating.interim_path / rel_path
+ sbom_path_rel = rel_path.with_suffix(rel_path.suffix +
".cdx.json").name
+ sbom_path_in_new_revision = creating.interim_path /
rel_path.parent / sbom_path_rel
+
+ # Check that the source file exists in the new revision
+ if not await aiofiles.os.path.exists(path_in_new_revision):
+ log.error(f"Source file {rel_path} not found in new
revision for SBOM generation.")
+ raise route.FlashError("Source artifact file not found in
the new revision.")
+
+ # Check that the SBOM file does not already exist in the new
revision
+ if await aiofiles.os.path.exists(sbom_path_in_new_revision):
+ raise base.ASFQuartException("SBOM file already exists",
errorcode=400)
+
+ if creating.new is None:
+ raise route.FlashError("Internal error: New revision not
found")
+
+ # Create and queue the task, using paths within the new revision
sbom_task = await wacp.sbom.generate_cyclonedx(
project_name, version_name, creating.new.number,
path_in_new_revision, sbom_path_in_new_revision
)
diff --git a/atr/routes/revisions.py b/atr/routes/revisions.py
index 25dfc16..0a23405 100644
--- a/atr/routes/revisions.py
+++ b/atr/routes/revisions.py
@@ -30,8 +30,8 @@ import atr.db as db
import atr.forms as forms
import atr.models.schema as schema
import atr.models.sql as sql
-import atr.revision as revision
import atr.route as route
+import atr.storage as storage
import atr.template as template
import atr.util as util
@@ -121,19 +121,23 @@ async def selected_post(session: route.CommitterSession,
project_name: str, vers
)
description = f"Copy of revision {selected_revision_number} through web
interface"
- async with revision.create_and_manage(project_name, version_name,
session.uid, description=description) as creating:
- # TODO: Stop create_and_manage from hard linking the parent first
- await aioshutil.rmtree(creating.interim_path) # type: ignore[call-arg]
- await util.create_hard_link_clone(selected_revision_dir,
creating.interim_path)
-
- if creating.new is None:
- raise base.ASFQuartException("Internal error: New revision not found",
errorcode=500)
- return await session.redirect(
- selected,
- success=f"Copied revision {selected_revision_number} to new latest
revision, {creating.new.number}",
- project_name=project_name,
- version_name=version_name,
- )
+ async with storage.write(session.uid) as write:
+ wacp = await write.as_project_committee_participant(project_name)
+ async with wacp.revision.create_and_manage(
+ project_name, version_name, session.uid, description=description
+ ) as creating:
+ # TODO: Stop create_and_manage from hard linking the parent first
+ await aioshutil.rmtree(creating.interim_path) # type:
ignore[call-arg]
+ await util.create_hard_link_clone(selected_revision_dir,
creating.interim_path)
+
+ if creating.new is None:
+ raise base.ASFQuartException("Internal error: New revision not
found", errorcode=500)
+ return await session.redirect(
+ selected,
+ success=f"Copied revision {selected_revision_number} to new latest
revision, {creating.new.number}",
+ project_name=project_name,
+ version_name=version_name,
+ )
class FilesDiff(schema.Strict):
diff --git a/atr/ssh.py b/atr/ssh.py
index 07a75a5..4073fda 100644
--- a/atr/ssh.py
+++ b/atr/ssh.py
@@ -33,7 +33,8 @@ import atr.config as config
import atr.db as db
import atr.log as log
import atr.models.sql as sql
-import atr.revision as revision
+import atr.storage as storage
+import atr.storage.types as types
import atr.user as user
import atr.util as util
@@ -446,42 +447,46 @@ async def _step_07b_process_validated_rsync_write(
# Create the draft revision directory structure
description = "File synchronisation through ssh, using rsync"
- async with revision.create_and_manage(project_name, version_name, asf_uid,
description=description) as creating:
- # Uses new_revision_number for logging only
- if creating.old is not None:
- log.info(f"Using old revision {creating.old.number} and interim
path {creating.interim_path}")
- # Update the rsync command path to the new revision directory
- argv[-1] = str(creating.interim_path)
-
- ###################################################
- ### Calls _step_08_execute_rsync_upload_command ###
- ###################################################
- exit_status = await _step_08_execute_rsync(process, argv)
- if exit_status != 0:
+ async with storage.write(asf_uid) as write:
+ wacp = await write.as_project_committee_participant(project_name)
+ async with wacp.revision.create_and_manage(
+ project_name, version_name, asf_uid, description=description
+ ) as creating:
+ # Uses new_revision_number for logging only
if creating.old is not None:
- for_revision = f"successor of revision {creating.old.number}"
- else:
- for_revision = f"initial revision for release {release_name}"
- log.error(
- f"rsync upload failed with exit status {exit_status} for
{for_revision}. "
- f"Command: {process.command} (run as {' '.join(argv)})"
- )
- raise revision.FailedError(f"rsync upload failed with exit status
{exit_status} for {for_revision}")
-
- if creating.new is not None:
- log.info(f"rsync upload successful for revision {creating.new.number}")
- host = config.get().APP_HOST
- message = f"\nATR: Created revision {creating.new.number} of
{project_name} {version_name}\n"
- message += f"ATR:
https://{host}/compose/{project_name}/{version_name}\n"
- if not process.stderr.is_closing():
- process.stderr.write(message.encode())
- await process.stderr.drain()
- else:
- log.info(f"rsync upload unsuccessful for release {release_name}")
+ log.info(f"Using old revision {creating.old.number} and
interim path {creating.interim_path}")
+ # Update the rsync command path to the new revision directory
+ argv[-1] = str(creating.interim_path)
+
+ ###################################################
+ ### Calls _step_08_execute_rsync_upload_command ###
+ ###################################################
+ exit_status = await _step_08_execute_rsync(process, argv)
+ if exit_status != 0:
+ if creating.old is not None:
+ for_revision = f"successor of revision
{creating.old.number}"
+ else:
+ for_revision = f"initial revision for release
{release_name}"
+ log.error(
+ f"rsync upload failed with exit status {exit_status} for
{for_revision}. "
+ f"Command: {process.command} (run as {' '.join(argv)})"
+ )
+ raise types.FailedError(f"rsync upload failed with exit status
{exit_status} for {for_revision}")
+
+ if creating.new is not None:
+ log.info(f"rsync upload successful for revision
{creating.new.number}")
+ host = config.get().APP_HOST
+ message = f"\nATR: Created revision {creating.new.number} of
{project_name} {version_name}\n"
+ message += f"ATR:
https://{host}/compose/{project_name}/{version_name}\n"
+ if not process.stderr.is_closing():
+ process.stderr.write(message.encode())
+ await process.stderr.drain()
+ else:
+ log.info(f"rsync upload unsuccessful for release {release_name}")
- # If we got here, there was no exception
- if not process.is_closing():
- process.exit(exit_status)
+ # If we got here, there was no exception
+ if not process.is_closing():
+ process.exit(exit_status)
async def _step_07c_ensure_release_object_for_write(project_name: str,
version_name: str) -> None:
diff --git a/atr/storage/__init__.py b/atr/storage/__init__.py
index a8cd78c..39e6f49 100644
--- a/atr/storage/__init__.py
+++ b/atr/storage/__init__.py
@@ -142,6 +142,7 @@ class WriteAsGeneralPublic(WriteAs):
self.policy = writers.policy.GeneralPublic(write, self, data)
self.project = writers.project.GeneralPublic(write, self, data)
self.release = writers.release.GeneralPublic(write, self, data)
+ self.revision = writers.revision.GeneralPublic(write, self, data)
self.sbom = writers.sbom.GeneralPublic(write, self, data)
self.ssh = writers.ssh.GeneralPublic(write, self, data)
self.tokens = writers.tokens.GeneralPublic(write, self, data)
@@ -159,6 +160,7 @@ class WriteAsFoundationCommitter(WriteAsGeneralPublic):
self.policy = writers.policy.FoundationCommitter(write, self, data)
self.project = writers.project.FoundationCommitter(write, self, data)
self.release = writers.release.FoundationCommitter(write, self, data)
+ self.revision = writers.revision.FoundationCommitter(write, self, data)
self.sbom = writers.sbom.FoundationCommitter(write, self, data)
self.ssh = writers.ssh.FoundationCommitter(write, self, data)
self.tokens = writers.tokens.FoundationCommitter(write, self, data)
@@ -182,6 +184,7 @@ class
WriteAsCommitteeParticipant(WriteAsFoundationCommitter):
self.policy = writers.policy.CommitteeParticipant(write, self, data,
committee_name)
self.project = writers.project.CommitteeParticipant(write, self, data,
committee_name)
self.release = writers.release.CommitteeParticipant(write, self, data,
committee_name)
+ self.revision = writers.revision.CommitteeParticipant(write, self,
data, committee_name)
self.sbom = writers.sbom.CommitteeParticipant(write, self, data,
committee_name)
self.ssh = writers.ssh.CommitteeParticipant(write, self, data,
committee_name)
self.tokens = writers.tokens.CommitteeParticipant(write, self, data,
committee_name)
@@ -210,6 +213,7 @@ class WriteAsCommitteeMember(WriteAsCommitteeParticipant):
self.policy = writers.policy.CommitteeMember(write, self, data,
committee_name)
self.project = writers.project.CommitteeMember(write, self, data,
committee_name)
self.release = writers.release.CommitteeMember(write, self, data,
committee_name)
+ self.revision = writers.revision.CommitteeMember(write, self, data,
committee_name)
self.sbom = writers.sbom.CommitteeMember(write, self, data,
committee_name)
self.ssh = writers.ssh.CommitteeMember(write, self, data,
committee_name)
self.tokens = writers.tokens.CommitteeMember(write, self, data,
committee_name)
diff --git a/atr/storage/types.py b/atr/storage/types.py
index 4087674..f08985c 100644
--- a/atr/storage/types.py
+++ b/atr/storage/types.py
@@ -83,3 +83,15 @@ class PublicKeyError(Exception):
@property
def original_error(self) -> Exception:
return self.__original_error
+
+
+class FailedError(Exception):
+ pass
+
+
[email protected]
+class Creating:
+ old: sql.Revision | None
+ interim_path: pathlib.Path
+ new: sql.Revision | None
+ failed: FailedError | None = None
diff --git a/atr/storage/writers/__init__.py b/atr/storage/writers/__init__.py
index 829c1be..b8243d7 100644
--- a/atr/storage/writers/__init__.py
+++ b/atr/storage/writers/__init__.py
@@ -23,6 +23,7 @@ import atr.storage.writers.keys as keys
import atr.storage.writers.policy as policy
import atr.storage.writers.project as project
import atr.storage.writers.release as release
+import atr.storage.writers.revision as revision
import atr.storage.writers.sbom as sbom
import atr.storage.writers.ssh as ssh
import atr.storage.writers.tokens as tokens
@@ -37,6 +38,7 @@ __all__ = [
"policy",
"project",
"release",
+ "revision",
"sbom",
"ssh",
"tokens",
diff --git a/atr/storage/writers/keys.py b/atr/storage/writers/keys.py
index aee319c..b8f4053 100644
--- a/atr/storage/writers/keys.py
+++ b/atr/storage/writers/keys.py
@@ -417,8 +417,6 @@ class CommitteeParticipant(FoundationCommitter):
return outcomes
async def import_keys_file(self, project_name: str, version_name: str) ->
outcome.List[types.Key]:
- import atr.revision as revision
-
release = await self.__data.release(
project_name=project_name,
version=version_name,
@@ -438,7 +436,7 @@ class CommitteeParticipant(FoundationCommitter):
# Remove the KEYS file if 100% imported
if (outcomes.result_count > 0) and (outcomes.error_count == 0):
description = "Removed KEYS file after successful import through
web interface"
- async with revision.create_and_manage(
+ async with self.__write_as.revision.create_and_manage(
project_name, version_name, self.__asf_uid,
description=description
) as creating:
path_in_new_revision = creating.interim_path / "KEYS"
diff --git a/atr/storage/writers/release.py b/atr/storage/writers/release.py
index 96d6669..40797ea 100644
--- a/atr/storage/writers/release.py
+++ b/atr/storage/writers/release.py
@@ -36,8 +36,8 @@ import atr.db as db
import atr.log as log
import atr.models.api as api
import atr.models.sql as sql
-import atr.revision as revision
import atr.storage as storage
+import atr.storage.types as types
import atr.util as util
if TYPE_CHECKING:
@@ -94,8 +94,8 @@ class CommitteeParticipant(FoundationCommitter):
@contextlib.asynccontextmanager
async def create_and_manage_revision(
self, project_name: str, version: str, description: str
- ) -> AsyncGenerator[revision.Creating]:
- async with revision.create_and_manage(
+ ) -> AsyncGenerator[types.Creating]:
+ async with self.__write_as.revision.create_and_manage(
project_name, version, self.__asf_uid, description=description
) as _creating:
yield _creating
@@ -155,9 +155,9 @@ class CommitteeParticipant(FoundationCommitter):
path_to_remove = creating.interim_path / dir_to_delete_rel
path_to_remove.resolve().relative_to(creating.interim_path.resolve())
if not await aiofiles.os.path.isdir(path_to_remove):
- raise revision.FailedError(f"Path '{dir_to_delete_rel}' is not
a directory.")
+ raise types.FailedError(f"Path '{dir_to_delete_rel}' is not a
directory.")
if await aiofiles.os.listdir(path_to_remove):
- raise revision.FailedError(f"Directory '{dir_to_delete_rel}'
is not empty.")
+ raise types.FailedError(f"Directory '{dir_to_delete_rel}' is
not empty.")
await aiofiles.os.rmdir(path_to_remove)
if creating.failed is not None:
return str(creating.failed)
@@ -377,7 +377,7 @@ class CommitteeParticipant(FoundationCommitter):
await self.__data.refresh(release)
description = "Creation of empty release candidate draft through web
interface"
- async with revision.create_and_manage(
+ async with self.__write_as.revision.create_and_manage(
project_name, version, self.__asf_uid, description=description
) as _creating:
pass
@@ -441,7 +441,7 @@ class CommitteeParticipant(FoundationCommitter):
await self.__save_file(file, target_path)
return len(files)
- async def __current_paths(self, creating: revision.Creating) ->
list[pathlib.Path]:
+ async def __current_paths(self, creating: types.Creating) ->
list[pathlib.Path]:
all_current_paths_interim: list[pathlib.Path] = []
async for p_rel_interim in
util.paths_recursive_all(creating.interim_path):
all_current_paths_interim.append(p_rel_interim)
@@ -508,7 +508,7 @@ class CommitteeParticipant(FoundationCommitter):
async def __remove_rc_tags_revision(
self,
- creating: revision.Creating,
+ creating: types.Creating,
error_messages: list[str],
) -> int:
all_current_paths_interim = await self.__current_paths(creating)
@@ -586,7 +586,7 @@ class CommitteeParticipant(FoundationCommitter):
self,
source_files_rel: list[pathlib.Path],
target_dir_rel: pathlib.Path,
- creating: revision.Creating,
+ creating: types.Creating,
moved_files_names: list[str],
skipped_files_names: list[str],
) -> None:
@@ -595,22 +595,22 @@ class CommitteeParticipant(FoundationCommitter):
target_path.resolve().relative_to(creating.interim_path.resolve())
except ValueError:
# Path traversal detected
- raise revision.FailedError("Paths must be restricted to the
release directory")
+ raise types.FailedError("Paths must be restricted to the release
directory")
if not await aiofiles.os.path.exists(target_path):
for part in target_path.parts:
# TODO: This .prefix check could include some existing
directory segment
if part.startswith("."):
- raise revision.FailedError("Segments must not start with
'.'")
+ raise types.FailedError("Segments must not start with '.'")
if ".." in part:
- raise revision.FailedError("Segments must not contain
'..'")
+ raise types.FailedError("Segments must not contain '..'")
try:
await aiofiles.os.makedirs(target_path)
except OSError:
- raise revision.FailedError("Failed to create target directory")
+ raise types.FailedError("Failed to create target directory")
elif not await aiofiles.os.path.isdir(target_path):
- raise revision.FailedError("Target path is not a directory")
+ raise types.FailedError("Target path is not a directory")
for source_file_rel in source_files_rel:
await self.__setup_revision_item(
@@ -621,7 +621,7 @@ class CommitteeParticipant(FoundationCommitter):
self,
source_file_rel: pathlib.Path,
target_dir_rel: pathlib.Path,
- creating: revision.Creating,
+ creating: types.Creating,
moved_files_names: list[str],
skipped_files_names: list[str],
target_path: pathlib.Path,
@@ -636,11 +636,11 @@ class CommitteeParticipant(FoundationCommitter):
if (target_dir_rel == source_file_rel) or (creating.interim_path /
target_dir_rel).resolve().is_relative_to(
full_source_item_path.resolve()
):
- raise revision.FailedError("Cannot move a directory into
itself or a subdirectory of itself")
+ raise types.FailedError("Cannot move a directory into itself
or a subdirectory of itself")
final_target_for_item = target_path / source_file_rel.name
if await aiofiles.os.path.exists(final_target_for_item):
- raise revision.FailedError("Target name already exists")
+ raise types.FailedError("Target name already exists")
await aiofiles.os.rename(full_source_item_path,
final_target_for_item)
moved_files_names.append(source_file_rel.name)
@@ -649,11 +649,11 @@ class CommitteeParticipant(FoundationCommitter):
bundle = [f for f in related_files if await
aiofiles.os.path.exists(creating.interim_path / f)]
for f_check in bundle:
if await aiofiles.os.path.isdir(creating.interim_path /
f_check):
- raise revision.FailedError("A related 'file' is actually a
directory")
+ raise types.FailedError("A related 'file' is actually a
directory")
collisions = [f.name for f in bundle if await
aiofiles.os.path.exists(target_path / f.name)]
if collisions:
- raise revision.FailedError("A related file already exists in
the target directory")
+ raise types.FailedError("A related file already exists in the
target directory")
for f in bundle:
await aiofiles.os.rename(creating.interim_path / f,
target_path / f.name)
diff --git a/atr/storage/writers/revision.py b/atr/storage/writers/revision.py
new file mode 100644
index 0000000..6a26b7f
--- /dev/null
+++ b/atr/storage/writers/revision.py
@@ -0,0 +1,223 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Removing this will cause circular imports
+from __future__ import annotations
+
+import asyncio
+import contextlib
+import datetime
+import pathlib
+import secrets
+import tempfile
+from typing import TYPE_CHECKING
+
+import aiofiles.os
+import aioshutil
+
+import atr.db as db
+import atr.db.interaction as interaction
+import atr.models.sql as sql
+import atr.storage as storage
+import atr.storage.types as types
+import atr.tasks as tasks
+import atr.util as util
+
+if TYPE_CHECKING:
+ from collections.abc import AsyncGenerator
+
+
+class SafeSession:
+ def __init__(self, temp_dir: str):
+ self._stack = contextlib.AsyncExitStack()
+ self._manager = db.session()
+ self._temp_dir = temp_dir
+
+ async def __aenter__(self) -> db.Session:
+ try:
+ return await self._stack.enter_async_context(self._manager)
+ except Exception:
+ await aioshutil.rmtree(self._temp_dir) # type: ignore[call-arg]
+ raise
+
+ async def __aexit__(self, _exc_type, _exc, _tb):
+ await self._stack.aclose()
+ return False
+
+
+class GeneralPublic:
+ def __init__(
+ self,
+ write: storage.Write,
+ write_as: storage.WriteAsGeneralPublic,
+ data: db.Session,
+ ):
+ self.__write = write
+ self.__write_as = write_as
+ self.__data = data
+ self.__asf_uid = write.authorisation.asf_uid
+
+
+class FoundationCommitter(GeneralPublic):
+ def __init__(self, write: storage.Write, write_as:
storage.WriteAsFoundationCommitter, data: db.Session):
+ super().__init__(write, write_as, data)
+ self.__write = write
+ self.__write_as = write_as
+ self.__data = data
+ asf_uid = write.authorisation.asf_uid
+ if asf_uid is None:
+ raise storage.AccessError("No ASF UID")
+ self.__asf_uid = asf_uid
+
+
+class CommitteeParticipant(FoundationCommitter):
+ def __init__(
+ self,
+ write: storage.Write,
+ write_as: storage.WriteAsCommitteeParticipant,
+ data: db.Session,
+ committee_name: str,
+ ):
+ super().__init__(write, write_as, data)
+ self.__write = write
+ self.__write_as = write_as
+ self.__data = data
+ asf_uid = write.authorisation.asf_uid
+ if asf_uid is None:
+ raise storage.AccessError("No ASF UID")
+ self.__asf_uid = asf_uid
+ self.__committee_name = committee_name
+
+ @contextlib.asynccontextmanager
+ async def create_and_manage(
+ self,
+ project_name: str,
+ version_name: str,
+ asf_uid: str,
+ description: str | None = None,
+ ) -> AsyncGenerator[types.Creating]:
+ """Manage the creation and symlinking of a mutable release revision."""
+ # Get the release
+ release_name = sql.release_name(project_name, version_name)
+ async with db.session() as data:
+ release = await data.release(name=release_name).demand(
+ RuntimeError("Release does not exist for new revision
creation")
+ )
+ old_revision = await interaction.latest_revision(release)
+
+ # Create a temporary directory
+ # We ensure, below, that it's removed on any exception
+ # Use the tmp subdirectory of state, to ensure that it is on the same
filesystem
+ prefix_token = secrets.token_hex(16)
+ temp_dir: str = await asyncio.to_thread(tempfile.mkdtemp,
prefix=prefix_token + "-", dir=util.get_tmp_dir())
+ temp_dir_path = pathlib.Path(temp_dir)
+ creating = types.Creating(old=old_revision,
interim_path=temp_dir_path, new=None, failed=None)
+ try:
+ # The directory was created by mkdtemp, but it's empty
+ if old_revision is not None:
+ # If this is not the first revision, hard link the previous
revision
+ old_release_dir = util.release_directory(release)
+ await util.create_hard_link_clone(old_release_dir,
temp_dir_path, do_not_create_dest_dir=True)
+ # The directory is either empty or its files are hard linked to
the previous revision
+ yield creating
+ except types.FailedError as e:
+ await aioshutil.rmtree(temp_dir) # type: ignore[call-arg]
+ creating.failed = e
+ return
+ except Exception:
+ await aioshutil.rmtree(temp_dir) # type: ignore[call-arg]
+ raise
+
+ # Ensure that the permissions of every directory are 755
+ try:
+ await asyncio.to_thread(util.chmod_directories, temp_dir_path)
+ except Exception:
+ await aioshutil.rmtree(temp_dir) # type: ignore[call-arg]
+ raise
+
+ async with SafeSession(temp_dir) as data:
+ try:
+ # This is the only place where models.Revision is constructed
+ # That makes models.populate_revision_sequence_and_name safe
against races
+ # Because that event is called when data.add is called below
+ # And we have a write lock at that point through the use of
data.begin_immediate
+ new_revision = sql.Revision(
+ release_name=release_name,
+ release=release,
+ asfuid=asf_uid,
+ created=datetime.datetime.now(datetime.UTC),
+ phase=release.phase,
+ description=description,
+ )
+
+ # Acquire the write lock and add the row
+ # We need this write lock for moving the directory below
atomically
+ # But it also helps to make
models.populate_revision_sequence_and_name safe against races
+ await data.begin_immediate()
+ data.add(new_revision)
+
+ # Flush but do not commit the new revision row to get its name
and number
+ # The row will still be invisible to other sessions after
flushing
+ await data.flush()
+ # Give the caller details about the new revision
+ creating.new = new_revision
+
+ # Rename the directory to the new revision number
+ await data.refresh(release)
+ new_revision_dir = util.release_directory(release)
+
+ # Ensure that the parent directory exists
+ await aiofiles.os.makedirs(new_revision_dir.parent,
exist_ok=True)
+
+ # Rename the temporary interim directory to the new revision
number
+ await aiofiles.os.rename(temp_dir, new_revision_dir)
+ except Exception:
+ await aioshutil.rmtree(temp_dir) # type: ignore[call-arg]
+ raise
+
+ # Commit to end the transaction started by data.begin_immediate
+ # We must commit the revision before starting the checks
+ # This also releases the write lock
+ await data.commit()
+
+ async with data.begin():
+ # Run checks if in DRAFT phase
+ # We could also run this outside the data Session
+ # But then it would create its own new Session
+ # It does, however, need a transaction to be created using
data.begin()
+ if release.phase == sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT:
+ # Must use caller_data here because we acquired the write
lock
+ await tasks.draft_checks(asf_uid, project_name,
version_name, new_revision.number, caller_data=data)
+
+
+class CommitteeMember(CommitteeParticipant):
+ def __init__(
+ self,
+ write: storage.Write,
+ write_as: storage.WriteAsCommitteeMember,
+ data: db.Session,
+ committee_name: str,
+ ):
+ super().__init__(write, write_as, data, committee_name)
+ self.__write = write
+ self.__write_as = write_as
+ self.__data = data
+ asf_uid = write.authorisation.asf_uid
+ if asf_uid is None:
+ raise storage.AccessError("No ASF UID")
+ self.__asf_uid = asf_uid
+ self.__committee_name = committee_name
diff --git a/atr/storage/writers/vote.py b/atr/storage/writers/vote.py
index d856b34..51f642d 100644
--- a/atr/storage/writers/vote.py
+++ b/atr/storage/writers/vote.py
@@ -25,7 +25,6 @@ import atr.db as db
import atr.db.interaction as interaction
import atr.log as log
import atr.models.sql as sql
-import atr.revision as revision
import atr.storage as storage
import atr.tasks.message as message
import atr.tasks.vote as tasks_vote
@@ -269,7 +268,7 @@ class CommitteeMember(CommitteeParticipant):
success_message = "Vote marked as passed"
description = "Create a preview revision from the last candidate
draft"
- async with revision.create_and_manage(
+ async with self.__write_as.revision.create_and_manage(
project_name, release.version, self.__asf_uid,
description=description
) as _creating:
pass
@@ -336,7 +335,7 @@ class CommitteeMember(CommitteeParticipant):
success_message = "Vote marked as passed"
description = "Create a preview revision from the last candidate
draft"
- async with revision.create_and_manage(
+ async with self.__write_as.revision.create_and_manage(
project_name, release.version, self.__asf_uid,
description=description
) as _creating:
pass
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index 9ae2e71..95608a8 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -30,8 +30,8 @@ import atr.config as config
import atr.log as log
import atr.models.results as results
import atr.models.schema as schema
-import atr.revision as revision
import atr.sbomtool as sbomtool
+import atr.storage as storage
import atr.tasks.checks as checks
import atr.tasks.checks.targz as targz
import atr.util as util
@@ -86,18 +86,20 @@ async def augment(args: FileArgs) -> results.Results | None:
patch_data = sbomtool.patch_to_data(patch_ops)
merged = bundle.doc.patch(yyjson.Document(patch_data))
description = "SBOM augmentation through web interface"
- async with revision.create_and_manage(
- args.project_name, args.version_name, args.asf_uid or "unknown",
description=description
- ) as creating:
- new_full_path = os.path.join(str(creating.interim_path),
args.file_path)
- # Write to the new revision
- log.info(f"Writing augmented SBOM to {new_full_path}")
- await aiofiles.os.remove(new_full_path)
- async with aiofiles.open(new_full_path, "w", encoding="utf-8") as
f:
- await f.write(merged.dumps())
-
- if creating.new is None:
- raise RuntimeError("Internal error: New revision not found")
+ async with storage.write(args.asf_uid) as write:
+ wacp = await
write.as_project_committee_participant(args.project_name)
+ async with wacp.revision.create_and_manage(
+ args.project_name, args.version_name, args.asf_uid or
"unknown", description=description
+ ) as creating:
+ new_full_path = os.path.join(str(creating.interim_path),
args.file_path)
+ # Write to the new revision
+ log.info(f"Writing augmented SBOM to {new_full_path}")
+ await aiofiles.os.remove(new_full_path)
+ async with aiofiles.open(new_full_path, "w", encoding="utf-8")
as f:
+ await f.write(merged.dumps())
+
+ if creating.new is None:
+ raise RuntimeError("Internal error: New revision not found")
return results.SBOMAugment(
kind="sbom_augment",
diff --git a/atr/tasks/svn.py b/atr/tasks/svn.py
index 8888319..c193e06 100644
--- a/atr/tasks/svn.py
+++ b/atr/tasks/svn.py
@@ -25,7 +25,7 @@ import aioshutil
import atr.log as log
import atr.models.results as results
import atr.models.schema as schema
-import atr.revision as revision
+import atr.storage as storage
import atr.tasks.checks as checks
@@ -72,57 +72,60 @@ async def _import_files_core(args: SvnImport) -> str:
temp_export_dir_name = ".svn-export.tmp"
description = "Import of files from subversion"
- async with revision.create_and_manage(
- args.project_name, args.version_name, args.asf_uid,
description=description
- ) as creating:
- # Uses creating.new after this block
- log.debug(f"Created revision directory: {creating.interim_path}")
-
- final_target_path = creating.interim_path
- if args.target_subdirectory:
- final_target_path = creating.interim_path /
args.target_subdirectory
- # Validate that final_target_path is a subdirectory of
new_revision_dir
- if not final_target_path.is_relative_to(creating.interim_path):
- raise SvnImportError(
- f"Target subdirectory {args.target_subdirectory} is not a
subdirectory of {creating.interim_path}"
- )
- await aiofiles.os.makedirs(final_target_path, exist_ok=True)
-
- temp_export_path = creating.interim_path / temp_export_dir_name
-
- svn_command = [
- "svn",
- "export",
- "--non-interactive",
- "--trust-server-cert-failures",
- "unknown-ca,cn-mismatch",
- "-r",
- args.revision,
- "--",
- args.svn_url,
- str(temp_export_path),
- ]
-
- await _import_files_core_run_svn_export(svn_command, temp_export_path)
-
- # Move files from temp export path to final target path
- # We only have to do this to avoid the SVN pegged revision issue
- log.info(f"Moving exported files from {temp_export_path} to
{final_target_path}")
- for item_name in await aiofiles.os.listdir(temp_export_path):
- source_item = temp_export_path / item_name
- destination_item = final_target_path / item_name
- try:
- await aioshutil.move(str(source_item), str(destination_item))
- except FileExistsError:
- log.warning(f"Item {destination_item} already exists, skipping
move for {item_name}")
- except Exception as move_err:
- log.error(f"Error moving {source_item} to {destination_item}:
{move_err}")
- await aiofiles.os.rmdir(temp_export_path)
- log.info(f"Removed temporary export directory: {temp_export_path}")
-
- if creating.new is None:
- raise SvnImportError("Internal error: New revision not found")
- return f"Successfully imported files from SVN into revision
{creating.new.number}"
+ async with storage.write(args.asf_uid) as write:
+ wacp = await write.as_project_committee_participant(args.project_name)
+ async with wacp.revision.create_and_manage(
+ args.project_name, args.version_name, args.asf_uid,
description=description
+ ) as creating:
+ # Uses creating.new after this block
+ log.debug(f"Created revision directory: {creating.interim_path}")
+
+ final_target_path = creating.interim_path
+ if args.target_subdirectory:
+ final_target_path = creating.interim_path /
args.target_subdirectory
+ # Validate that final_target_path is a subdirectory of
new_revision_dir
+ if not final_target_path.is_relative_to(creating.interim_path):
+ raise SvnImportError(
+ f"Target subdirectory {args.target_subdirectory}"
+ f" is not a subdirectory of {creating.interim_path}"
+ )
+ await aiofiles.os.makedirs(final_target_path, exist_ok=True)
+
+ temp_export_path = creating.interim_path / temp_export_dir_name
+
+ svn_command = [
+ "svn",
+ "export",
+ "--non-interactive",
+ "--trust-server-cert-failures",
+ "unknown-ca,cn-mismatch",
+ "-r",
+ args.revision,
+ "--",
+ args.svn_url,
+ str(temp_export_path),
+ ]
+
+ await _import_files_core_run_svn_export(svn_command,
temp_export_path)
+
+ # Move files from temp export path to final target path
+ # We only have to do this to avoid the SVN pegged revision issue
+ log.info(f"Moving exported files from {temp_export_path} to
{final_target_path}")
+ for item_name in await aiofiles.os.listdir(temp_export_path):
+ source_item = temp_export_path / item_name
+ destination_item = final_target_path / item_name
+ try:
+ await aioshutil.move(str(source_item),
str(destination_item))
+ except FileExistsError:
+ log.warning(f"Item {destination_item} already exists,
skipping move for {item_name}")
+ except Exception as move_err:
+ log.error(f"Error moving {source_item} to
{destination_item}: {move_err}")
+ await aiofiles.os.rmdir(temp_export_path)
+ log.info(f"Removed temporary export directory: {temp_export_path}")
+
+ if creating.new is None:
+ raise SvnImportError("Internal error: New revision not found")
+ return f"Successfully imported files from SVN into revision
{creating.new.number}"
async def _import_files_core_run_svn_export(svn_command: list[str],
temp_export_path: pathlib.Path) -> None:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]