This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow-steward.git
The following commit(s) were added to refs/heads/main by this push:
new 453fe32 feat(vulnogram-oauth-api): merge-mode safety nets in
record-update (#363)
453fe32 is described below
commit 453fe322ba128506acbe3f8a594382a1d0bd62ab
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu May 28 21:11:08 2026 +0200
feat(vulnogram-oauth-api): merge-mode safety nets in record-update (#363)
The default Vulnogram API push is a full record replacement: whatever
JSON the script sends becomes the record. That model has bitten the
Airflow security team in three concrete ways during 2026 — all
documented on CVE-2026-41016's reviewer-comment thread on 2026-05-28
(see this PR's description for the full diagnostic).
* PUBLIC → REVIEW: a regenerated re-push walked the state back, broke
cve.org advisory lifecycle ownership.
* apache-airflow-providers-smtp → apache-airflow: the regenerator's
scope-label resolution changed the affected package post-publication.
* Lost references[]: the hand-added lists.apache.org advisory URL was
blasted by the regenerator's references-from-body-only emission.
This PR adds three guards that fire before the POST. All three are
opt-out via explicit flags so deliberate changes still work:
--allow-state-downgrade PUBLIC → REVIEW / DRAFT / READY
--replace-references drop existing refs not in new emission
--allow-product-change change affected[].product / packageName
--full-replace umbrella for all three
The guards live in a new vulnogram_api.merge_mode module so the
contract is testable in isolation. The CLI fetches the current
record via get_record before the push (no-op when the record
doesn't exist yet), passes both docs through apply_merge_mode_guards,
and pushes the merged document. New exit code 3 specifically reports
a merge-mode refusal so a scripted caller can distinguish a guard
refusal from a transport / validation failure.
12 new tests in test_merge_mode.py + 7 new in test_record_update.py
covering: each guard's refusal path, each guard's override flag,
references-by-URL merge semantics, no-current-record (first push)
no-op path, deep-copy isolation of the input doc. 84 tests pass.
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../oauth-api/src/vulnogram_api/merge_mode.py | 264 +++++++++++++++
.../oauth-api/src/vulnogram_api/record_update.py | 111 +++++++
tools/vulnogram/oauth-api/tests/test_merge_mode.py | 301 +++++++++++++++++
.../oauth-api/tests/test_record_update.py | 369 +++++++++++++++++++++
4 files changed, 1045 insertions(+)
diff --git a/tools/vulnogram/oauth-api/src/vulnogram_api/merge_mode.py
b/tools/vulnogram/oauth-api/src/vulnogram_api/merge_mode.py
new file mode 100644
index 0000000..28ed6be
--- /dev/null
+++ b/tools/vulnogram/oauth-api/src/vulnogram_api/merge_mode.py
@@ -0,0 +1,264 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Merge-mode safety nets for ``vulnogram-api-record-update``.
+
+The default Vulnogram API push is a full record replacement: whatever
+JSON the script sends becomes the record. That model has bitten the
+security team in three concrete ways during 2026:
+
+* ``CVE-2026-41016`` — a regenerated push from the wrong sibling
+ tracker moved the record's ``CNA_private.state`` from ``PUBLIC``
+ back to ``REVIEW``, broke the ``cve.org`` advisory's lifecycle
+ ownership, and required a manual revert.
+* Same record — the regenerator overwrote ``affected[].product`` /
+ ``packageName`` (the originally-published shape was
+ ``Apache Airflow Providers SMTP`` / ``apache-airflow-providers-smtp``;
+ the re-push from the core-scope sibling produced
+ ``Apache Airflow`` / ``apache-airflow``), changing the meaning of
+ the record after publication.
+* Same record — the hand-added ``lists.apache.org/thread/<hash>``
+ advisory URL in ``references[]`` was lost when the regenerated
+ document only carried the fix PR.
+
+This module supplies three guard checks the push can run before
+sending. Each one is **opt-out**: by default the guard is active and
+the push refuses (or merges) when it would otherwise regress the
+record. Explicit override flags let a release manager force the
+change when the regression is intentional.
+
+The guards work on the **document body** that the API push sends
+(``cveMetadata`` / ``CNA_private`` / ``containers``) versus the
+**body** sub-object of the fetched record (Vulnogram's GET endpoint
+wraps the same shape under ``body``). The asymmetry is unavoidable —
+the API was built that way long before this module existed.
+"""
+
+from __future__ import annotations
+
+import copy
+from typing import Any
+
+
+class MergeModeRefused(Exception):
+ """Raised when a merge-mode guard rejects the push.
+
+ The constructor takes a single ``message`` argument; the script
+ surfaces it verbatim to the user with a non-zero exit code so the
+ release manager sees exactly which guard fired and which override
+ flag to add if the change is deliberate.
+ """
+
+
+def _path(obj: Any, *keys: str) -> Any:
+ """Navigate ``obj`` along ``keys``; return ``None`` on any miss."""
+ current: Any = obj
+ for key in keys:
+ if not isinstance(current, dict):
+ return None
+ current = current.get(key)
+ return current
+
+
+def _current_state(current_doc: dict[str, Any]) -> str | None:
+ """Return ``current_doc.body.CNA_private.state`` or ``None``."""
+ return _path(current_doc, "body", "CNA_private", "state")
+
+
+def _new_state(new_doc: dict[str, Any]) -> str | None:
+ """Return ``new_doc.CNA_private.state`` or ``None``."""
+ return _path(new_doc, "CNA_private", "state")
+
+
+def _current_references(current_doc: dict[str, Any]) -> list[dict[str, Any]]:
+ refs = _path(current_doc, "body", "containers", "cna", "references")
+ return list(refs) if isinstance(refs, list) else []
+
+
+def _new_references(new_doc: dict[str, Any]) -> list[dict[str, Any]]:
+ refs = _path(new_doc, "containers", "cna", "references")
+ return list(refs) if isinstance(refs, list) else []
+
+
+def _current_affected(current_doc: dict[str, Any]) -> list[dict[str, Any]]:
+ aff = _path(current_doc, "body", "containers", "cna", "affected")
+ return list(aff) if isinstance(aff, list) else []
+
+
+def _new_affected(new_doc: dict[str, Any]) -> list[dict[str, Any]]:
+ aff = _path(new_doc, "containers", "cna", "affected")
+ return list(aff) if isinstance(aff, list) else []
+
+
+def _merge_references_by_url(
+ current: list[dict[str, Any]],
+ new: list[dict[str, Any]],
+) -> list[dict[str, Any]]:
+ """Union-merge references, keyed by ``url``.
+
+ Order: the new emission's entries come first (in their original
+ order), followed by any current-record entries whose URL is not
+ in the new emission. The order matters for human-readability of
+ the rendered advisory page; the new emission is presumed to
+ reflect the latest reviewer intent.
+
+ Entries without a ``url`` field are passed through as-is from the
+ new emission (current-record duplicates are not detected because
+ there is no key to match on).
+ """
+ new_urls = {entry.get("url") for entry in new}
+ merged: list[dict[str, Any]] = list(new)
+ for entry in current:
+ url = entry.get("url")
+ if url and url not in new_urls:
+ merged.append(entry)
+ return merged
+
+
+def _product_signature(entry: dict[str, Any]) -> tuple[str, str]:
+ """Return ``(packageName, product)`` for an ``affected[]`` entry.
+
+ Either field may be missing; the signature uses ``""`` to keep
+ set semantics consistent.
+ """
+ package = str(entry.get("packageName") or "")
+ product = str(entry.get("product") or "")
+ return (package, product)
+
+
+def _diff_affected_products(
+ current: list[dict[str, Any]],
+ new: list[dict[str, Any]],
+) -> list[str]:
+ """Return a list of human-readable diffs for product/packageName
+ changes between the current and new affected[] arrays.
+
+ Returns an empty list when both arrays carry the same
+ ``(packageName, product)`` signatures (order ignored). Otherwise
+ returns one line per dropped, added, or renamed signature so the
+ caller can decide whether to refuse or warn.
+ """
+ current_sigs = {_product_signature(entry) for entry in current if
isinstance(entry, dict)}
+ new_sigs = {_product_signature(entry) for entry in new if
isinstance(entry, dict)}
+ if current_sigs == new_sigs:
+ return []
+ diffs: list[str] = []
+ for sig in sorted(current_sigs - new_sigs):
+ package, product = sig
+ diffs.append(f" - removed: packageName={package!r},
product={product!r}")
+ for sig in sorted(new_sigs - current_sigs):
+ package, product = sig
+ diffs.append(f" + added: packageName={package!r},
product={product!r}")
+ return diffs
+
+
+def apply_merge_mode_guards(
+ current_doc: dict[str, Any] | None,
+ new_doc: dict[str, Any],
+ *,
+ allow_state_downgrade: bool = False,
+ replace_references: bool = False,
+ allow_product_change: bool = False,
+) -> dict[str, Any]:
+ """Apply the three safety nets and return the document to push.
+
+ ``current_doc`` is the record's current state as returned by
+ :func:`vulnogram_api.client.get_record` (i.e. the ``comments`` /
+ ``files`` / ``body`` envelope). When ``None`` — the record does
+ not exist yet — the guards are no-ops and ``new_doc`` is returned
+ unchanged (no current state to compare against).
+
+ ``new_doc`` is the body the script intends to push (``cveMetadata``
+ / ``CNA_private`` / ``containers`` at the top level). A deep copy
+ is taken before mutation; the input is not modified in place so
+ the caller's reference stays stable.
+
+ The three guards in order:
+
+ 1. **State downgrade**: refuse when ``current.CNA_private.state ==
+ "PUBLIC"`` and ``new.CNA_private.state != "PUBLIC"``. Raise
+ :class:`MergeModeRefused` unless ``allow_state_downgrade`` is
+ ``True``. PUBLIC means the record was pushed to cve.org and
+ walking it back to REVIEW / DRAFT is almost always an
+ accidental side-effect of a regenerator re-push.
+ 2. **References merge**: when ``replace_references`` is ``False``
+ (the default), union the current record's ``references[]``
+ with the new emission's by URL, preserving any URL not in the
+ new emission. This catches the hand-added advisory URL that
+ the regenerator does not know about.
+ 3. **Product / packageName change**: when ``allow_product_change``
+ is ``False`` (the default) and any ``affected[]`` entry's
+ ``(packageName, product)`` signature differs between the
+ current record and the new emission, raise
+ :class:`MergeModeRefused` with a diff so the caller can decide
+ whether the change is intentional (e.g. broadening the scope
+ to add a new package) or a regression.
+ """
+ if current_doc is None:
+ # First push — nothing to merge against. The guards exist to
+ # prevent regressions of already-published state, and a new
+ # record has no published state to regress.
+ return new_doc
+
+ merged = copy.deepcopy(new_doc)
+
+ current_state = _current_state(current_doc)
+ new_state_value = _new_state(merged)
+ if (
+ current_state == "PUBLIC"
+ and new_state_value is not None
+ and new_state_value != "PUBLIC"
+ and not allow_state_downgrade
+ ):
+ raise MergeModeRefused(
+ f"Refusing CNA_private.state downgrade "
+ f"{current_state!r} → {new_state_value!r}. The record was "
+ f"published to cve.org at the PUBLIC state; walking it "
+ f"back to REVIEW/DRAFT is almost always an accidental "
+ f"regression. Pass --allow-state-downgrade to force "
+ f'the push, or set CNA_private.state = "PUBLIC" in the '
+ f"JSON file before re-running."
+ )
+
+ if not replace_references:
+ merged_refs = _merge_references_by_url(
+ current=_current_references(current_doc),
+ new=_new_references(merged),
+ )
+ # Only write back when the merge added something — avoids
+ # an empty `references` block sprouting on records that
+ # never had one.
+ if merged_refs:
+ containers = merged.setdefault("containers", {})
+ cna = containers.setdefault("cna", {})
+ cna["references"] = merged_refs
+
+ diffs = _diff_affected_products(
+ current=_current_affected(current_doc),
+ new=_new_affected(merged),
+ )
+ if diffs and not allow_product_change:
+ raise MergeModeRefused(
+ "Refusing affected[].product / packageName change(s):\n"
+ + "\n".join(diffs)
+ + "\nIf the change is intentional (e.g. broadening the "
+ "scope to add a new package, or correcting the originally-"
+ "published product name), pass --allow-product-change to "
+ "force the push. Otherwise the regenerator emitted the "
+ "wrong scope — check the originating tracker's labels."
+ )
+
+ return merged
diff --git a/tools/vulnogram/oauth-api/src/vulnogram_api/record_update.py
b/tools/vulnogram/oauth-api/src/vulnogram_api/record_update.py
index 60b768b..7c71dab 100644
--- a/tools/vulnogram/oauth-api/src/vulnogram_api/record_update.py
+++ b/tools/vulnogram/oauth-api/src/vulnogram_api/record_update.py
@@ -30,6 +30,15 @@ before calling this script. The actual publish-to-cve.org
push
(``READY`` → ``PUBLIC``) still happens through the Vulnogram UI button
because it has out-of-band side effects (CNA feed dispatch) that the
script intentionally does not automate.
+
+**Merge mode (default on)** — before the POST, the script fetches
+the record's current state and applies three safety nets against
+the document about to be pushed: state-downgrade refusal,
+references-by-URL merge, product/packageName-change refusal. See
+:mod:`vulnogram_api.merge_mode` for the full rules, and the
+``--allow-state-downgrade`` / ``--replace-references`` /
+``--allow-product-change`` / ``--full-replace`` flags below for the
+escape hatches.
"""
from __future__ import annotations
@@ -45,9 +54,11 @@ from vulnogram_api.client import (
RecordSaveFailed,
SessionExpired,
VulnogramAPIError,
+ get_record,
update_record,
)
from vulnogram_api.credentials import Session, locate_session
+from vulnogram_api.merge_mode import MergeModeRefused, apply_merge_mode_guards
CVE_ID_RE = re.compile(r"^CVE-\d{4}-\d{4,7}$")
@@ -84,9 +95,79 @@ def parse_args(argv: list[str] | None = None) ->
argparse.Namespace:
default="cve5",
help="Vulnogram section path component. Default: cve5.",
)
+ ap.add_argument(
+ "--allow-state-downgrade",
+ action="store_true",
+ help=(
+ "Allow CNA_private.state to move backwards from PUBLIC "
+ "to REVIEW / DRAFT / READY. Required when the regression "
+ "is intentional; refused by default because every prior "
+ "instance was an accidental side-effect of a regenerator "
+ "re-push (see CVE-2026-41016)."
+ ),
+ )
+ ap.add_argument(
+ "--replace-references",
+ action="store_true",
+ help=(
+ "Replace `references[]` wholesale instead of merging the "
+ "new emission with the current record by URL. Use when "
+ "the reviewer is genuinely dropping an old reference; by "
+ "default the merge preserves any URL in the current "
+ "record that is not in the new emission (catches the "
+ "hand-added advisory URL the regenerator forgets)."
+ ),
+ )
+ ap.add_argument(
+ "--allow-product-change",
+ action="store_true",
+ help=(
+ "Allow `affected[].product` / `packageName` changes vs "
+ "the current record. Required when the change is "
+ "intentional (broadening scope to add a new package, or "
+ "correcting the originally-published name); refused by "
+ "default because every prior instance was a regenerator "
+ "scope mismatch."
+ ),
+ )
+ ap.add_argument(
+ "--full-replace",
+ action="store_true",
+ help=(
+ "Umbrella: equivalent to passing all three merge-mode "
+ "overrides above. Use only when the intent is to wholly "
+ "replace the current record (e.g. an emergency revert "
+ "to a known-good canonical JSON)."
+ ),
+ )
return ap.parse_args(argv)
+def _fetch_current_or_none(
+ session: Session,
+ cve_id: str,
+ *,
+ section: str,
+) -> dict | None:
+ """Return the current record's JSON, or ``None`` when it does
+ not yet exist (first push for this CVE ID).
+
+ Other API errors propagate — only the "record not found" shape
+ falls through as ``None`` because that is the signal that there
+ is nothing to merge against. Distinguishing it lets the new-
+ record path land cleanly without spurious merge-guard refusals.
+ """
+ try:
+ return get_record(session, cve_id, section=section)
+ except VulnogramAPIError as exc:
+ # The not-found shape is a specific error string from
+ # `get_record`; check it loosely so we don't swallow other
+ # API failures (auth, 5xx, malformed response).
+ if "not found" in str(exc).lower():
+ return None
+ raise
+
+
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
@@ -107,6 +188,36 @@ def main(argv: list[str] | None = None) -> int:
creds_path = locate_session(args.credentials)
session = Session.load(creds_path)
+ # Fetch the current record so the merge-mode guards have
+ # something to compare against. The fetch is a no-op when the
+ # record does not yet exist (first push) — guards become
+ # no-ops in that case and the original document is pushed
+ # verbatim.
+ try:
+ current = _fetch_current_or_none(session, args.cve_id,
section=args.section)
+ except SessionExpired as e:
+ print(f"✗ {e}", file=sys.stderr)
+ return 2
+ except VulnogramAPIError as e:
+ print(f"✗ {e}", file=sys.stderr)
+ return 6
+
+ allow_state_downgrade = args.allow_state_downgrade or args.full_replace
+ replace_references = args.replace_references or args.full_replace
+ allow_product_change = args.allow_product_change or args.full_replace
+
+ try:
+ document = apply_merge_mode_guards(
+ current,
+ document,
+ allow_state_downgrade=allow_state_downgrade,
+ replace_references=replace_references,
+ allow_product_change=allow_product_change,
+ )
+ except MergeModeRefused as exc:
+ print(f"✗ {exc}", file=sys.stderr)
+ return 3
+
try:
envelope = update_record(session, args.cve_id, document,
section=args.section)
except SessionExpired as e:
diff --git a/tools/vulnogram/oauth-api/tests/test_merge_mode.py
b/tools/vulnogram/oauth-api/tests/test_merge_mode.py
new file mode 100644
index 0000000..5fc0a28
--- /dev/null
+++ b/tools/vulnogram/oauth-api/tests/test_merge_mode.py
@@ -0,0 +1,301 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Unit tests for :mod:`vulnogram_api.merge_mode`.
+
+The integration tests in ``test_record_update.py`` exercise these
+through the CLI's exit codes. This file pins down the merge / refusal
+contract at the function level so a regression there is caught even
+when the CLI wiring changes.
+"""
+
+from __future__ import annotations
+
+import copy
+
+import pytest
+
+from vulnogram_api.merge_mode import (
+ MergeModeRefused,
+ _diff_affected_products,
+ _merge_references_by_url,
+ apply_merge_mode_guards,
+)
+
+
+def _current(
+ *,
+ state: str = "PUBLIC",
+ references: list[dict] | None = None,
+ affected: list[dict] | None = None,
+) -> dict:
+ """Build a current-record snapshot (the ``get_record`` shape:
+ ``comments`` / ``files`` / ``body`` envelope).
+ """
+ return {
+ "comments": [],
+ "files": [],
+ "body": {
+ "cveMetadata": {"cveId": "CVE-2026-00001", "state": "PUBLISHED"},
+ "CNA_private": {"state": state},
+ "containers": {
+ "cna": {
+ "affected": affected
+ if affected is not None
+ else [{"packageName": "apache-foo", "product": "Apache
Foo"}],
+ "references": references
+ if references is not None
+ else [
+ {"url": "https://github.com/apache/foo/pull/1",
"tags": ["patch"]},
+ {
+ "url": "https://lists.apache.org/thread/abc",
+ "tags": ["vendor-advisory"],
+ },
+ ],
+ },
+ },
+ },
+ }
+
+
+def _new(
+ *,
+ state: str = "PUBLIC",
+ references: list[dict] | None = None,
+ affected: list[dict] | None = None,
+) -> dict:
+ """Build the new push body shape (no ``comments`` / ``files`` wrapper)."""
+ return {
+ "cveMetadata": {"cveId": "CVE-2026-00001", "state": "PUBLISHED"},
+ "CNA_private": {"state": state},
+ "containers": {
+ "cna": {
+ "affected": affected
+ if affected is not None
+ else [{"packageName": "apache-foo", "product": "Apache Foo"}],
+ "references": references
+ if references is not None
+ else [{"url": "https://github.com/apache/foo/pull/1", "tags":
["patch"]}],
+ },
+ },
+ }
+
+
+# ---------------------------------------------------------------------------
+# State-downgrade guard
+# ---------------------------------------------------------------------------
+
+
+class TestStateDowngradeGuard:
+ def test_public_to_review_refused(self):
+ with pytest.raises(MergeModeRefused, match="state downgrade"):
+ apply_merge_mode_guards(_current(state="PUBLIC"),
_new(state="REVIEW"))
+
+ def test_public_to_draft_refused(self):
+ with pytest.raises(MergeModeRefused, match="state downgrade"):
+ apply_merge_mode_guards(_current(state="PUBLIC"),
_new(state="DRAFT"))
+
+ def test_public_to_public_allowed(self):
+ # No transition at all — emit identical state.
+ merged = apply_merge_mode_guards(_current(state="PUBLIC"),
_new(state="PUBLIC"))
+ assert merged["CNA_private"]["state"] == "PUBLIC"
+
+ def test_review_to_review_allowed(self):
+ # Not a downgrade from PUBLIC; the guard does not fire.
+ merged = apply_merge_mode_guards(_current(state="REVIEW"),
_new(state="REVIEW"))
+ assert merged["CNA_private"]["state"] == "REVIEW"
+
+ def test_review_to_public_allowed(self):
+ # Upgrade — exactly what release managers do on publication.
+ merged = apply_merge_mode_guards(_current(state="REVIEW"),
_new(state="PUBLIC"))
+ assert merged["CNA_private"]["state"] == "PUBLIC"
+
+ def test_public_to_review_with_override_allowed(self):
+ merged = apply_merge_mode_guards(
+ _current(state="PUBLIC"),
+ _new(state="REVIEW"),
+ allow_state_downgrade=True,
+ )
+ assert merged["CNA_private"]["state"] == "REVIEW"
+
+ def test_message_names_both_states(self):
+ with pytest.raises(MergeModeRefused) as excinfo:
+ apply_merge_mode_guards(_current(state="PUBLIC"),
_new(state="REVIEW"))
+ message = str(excinfo.value)
+ assert "PUBLIC" in message
+ assert "REVIEW" in message
+ assert "--allow-state-downgrade" in message
+
+
+# ---------------------------------------------------------------------------
+# References merge
+# ---------------------------------------------------------------------------
+
+
+class TestReferencesMerge:
+ def test_merge_preserves_url_not_in_new(self):
+ merged = _merge_references_by_url(
+ current=[
+ {"url": "https://github.com/apache/foo/pull/1", "tags":
["patch"]},
+ {"url": "https://lists.apache.org/thread/abc", "tags":
["vendor-advisory"]},
+ ],
+ new=[
+ {"url": "https://github.com/apache/foo/pull/1", "tags":
["patch"]},
+ ],
+ )
+ urls = [ref["url"] for ref in merged]
+ assert "https://lists.apache.org/thread/abc" in urls
+
+ def test_merge_new_entries_come_first(self):
+ merged = _merge_references_by_url(
+ current=[{"url": "https://existing", "tags": []}],
+ new=[{"url": "https://just-added", "tags": ["patch"]}],
+ )
+ urls = [ref["url"] for ref in merged]
+ assert urls == ["https://just-added", "https://existing"]
+
+ def test_merge_deduplicates_by_url(self):
+ # When the same URL appears in both, the new emission's
+ # entry wins (its tags / metadata may have changed).
+ merged = _merge_references_by_url(
+ current=[{"url": "https://x", "tags": ["old-tag"]}],
+ new=[{"url": "https://x", "tags": ["new-tag"]}],
+ )
+ assert len(merged) == 1
+ assert merged[0]["tags"] == ["new-tag"]
+
+ def test_apply_merges_references_by_default(self):
+ merged = apply_merge_mode_guards(_current(), _new())
+ urls = {ref["url"] for ref in
merged["containers"]["cna"]["references"]}
+ assert "https://github.com/apache/foo/pull/1" in urls
+ assert "https://lists.apache.org/thread/abc" in urls
+
+ def test_apply_replaces_references_with_flag(self):
+ merged = apply_merge_mode_guards(_current(), _new(),
replace_references=True)
+ urls = {ref["url"] for ref in
merged["containers"]["cna"]["references"]}
+ assert urls == {"https://github.com/apache/foo/pull/1"}
+
+ def test_apply_does_not_create_empty_references_block(self):
+ """When both current and new have no references, the merged
+ document should not sprout an empty ``references: []`` field.
+ """
+ current = _current(references=[])
+ new = _new(references=[])
+ del current["body"]["containers"]["cna"]["references"]
+ new_copy = copy.deepcopy(new)
+ del new_copy["containers"]["cna"]["references"]
+ merged = apply_merge_mode_guards(current, new_copy)
+ # The new doc has no `references` key — merged should keep it absent.
+ assert "references" not in merged["containers"]["cna"]
+
+
+# ---------------------------------------------------------------------------
+# Product / packageName change guard
+# ---------------------------------------------------------------------------
+
+
+class TestProductChangeGuard:
+ def test_packagename_change_refused(self):
+ with pytest.raises(MergeModeRefused, match="product"):
+ apply_merge_mode_guards(
+ _current(affected=[{"packageName": "apache-foo-bar",
"product": "Apache Foo Bar"}]),
+ _new(affected=[{"packageName": "apache-foo", "product":
"Apache Foo"}]),
+ )
+
+ def test_product_only_change_refused(self):
+ # Same packageName, different product display name. Still
+ # refused — the display name is what shows on the cve.org page.
+ with pytest.raises(MergeModeRefused, match="product"):
+ apply_merge_mode_guards(
+ _current(affected=[{"packageName": "apache-foo", "product":
"Apache Foo Original"}]),
+ _new(affected=[{"packageName": "apache-foo", "product":
"Apache Foo Rewritten"}]),
+ )
+
+ def test_same_product_allowed(self):
+ merged = apply_merge_mode_guards(_current(), _new())
+ assert merged["containers"]["cna"]["affected"][0]["packageName"] ==
"apache-foo"
+
+ def test_change_allowed_with_flag(self):
+ merged = apply_merge_mode_guards(
+ _current(affected=[{"packageName": "apache-foo-bar", "product":
"Apache Foo Bar"}]),
+ _new(affected=[{"packageName": "apache-foo", "product": "Apache
Foo"}]),
+ allow_product_change=True,
+ )
+ assert merged["containers"]["cna"]["affected"][0]["packageName"] ==
"apache-foo"
+
+ def test_diff_lists_dropped_and_added(self):
+ diffs = _diff_affected_products(
+ current=[{"packageName": "apache-foo-bar", "product": "Apache Foo
Bar"}],
+ new=[{"packageName": "apache-foo", "product": "Apache Foo"}],
+ )
+ joined = "\n".join(diffs)
+ assert "removed" in joined
+ assert "added" in joined
+ assert "apache-foo-bar" in joined
+ assert "apache-foo" in joined
+
+ def test_diff_empty_when_unchanged(self):
+ diffs = _diff_affected_products(
+ current=[{"packageName": "apache-foo", "product": "Apache Foo"}],
+ new=[{"packageName": "apache-foo", "product": "Apache Foo"}],
+ )
+ assert diffs == []
+
+ def test_diff_ignores_order(self):
+ # The signatures are compared as a set, so re-ordering
+ # affected[] entries between the current record and the new
+ # emission must not trip the guard.
+ diffs = _diff_affected_products(
+ current=[
+ {"packageName": "apache-foo", "product": "Apache Foo"},
+ {"packageName": "apache-bar", "product": "Apache Bar"},
+ ],
+ new=[
+ {"packageName": "apache-bar", "product": "Apache Bar"},
+ {"packageName": "apache-foo", "product": "Apache Foo"},
+ ],
+ )
+ assert diffs == []
+
+
+# ---------------------------------------------------------------------------
+# Composition + edge cases
+# ---------------------------------------------------------------------------
+
+
+class TestApplyComposition:
+ def test_no_current_doc_is_noop(self):
+ new = _new(state="REVIEW", affected=[{"packageName": "x", "product":
"X"}])
+ merged = apply_merge_mode_guards(None, new)
+ # Nothing to compare against → return the input verbatim.
+ assert merged is new
+
+ def test_input_not_mutated_on_merge(self):
+ new = _new()
+ original_refs = list(new["containers"]["cna"]["references"])
+ apply_merge_mode_guards(_current(), new)
+ # The original new doc still has its original references list
+ # (the merge made a deep copy).
+ assert new["containers"]["cna"]["references"] == original_refs
+
+ def test_all_guards_pass_returns_merged_doc(self):
+ merged = apply_merge_mode_guards(_current(), _new())
+ assert merged["CNA_private"]["state"] == "PUBLIC"
+ urls = {ref["url"] for ref in
merged["containers"]["cna"]["references"]}
+ assert urls == {
+ "https://github.com/apache/foo/pull/1",
+ "https://lists.apache.org/thread/abc",
+ }
diff --git a/tools/vulnogram/oauth-api/tests/test_record_update.py
b/tools/vulnogram/oauth-api/tests/test_record_update.py
index 9f68df3..542bee5 100644
--- a/tools/vulnogram/oauth-api/tests/test_record_update.py
+++ b/tools/vulnogram/oauth-api/tests/test_record_update.py
@@ -37,6 +37,17 @@ def _write_session(path):
return path
+def _no_current_record(monkeypatch):
+ """Make _fetch_current_or_none return None so merge-mode guards
+ behave as no-ops (the "first push, nothing to merge against" path).
+ """
+ monkeypatch.setattr(
+ record_update,
+ "_fetch_current_or_none",
+ lambda *a, **kw: None,
+ )
+
+
def test_invalid_cve_id_rejected(tmp_path, monkeypatch, capsys):
creds = _write_session(tmp_path / "session.json")
body = tmp_path / "body.json"
@@ -76,6 +87,7 @@ def test_session_expired_returns_2(tmp_path, monkeypatch,
capsys):
body = tmp_path / "body.json"
body.write_text(json.dumps({"x": 1}))
monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ _no_current_record(monkeypatch)
def _raise_expired(*a, **kw):
from vulnogram_api.client import SessionExpired
@@ -94,6 +106,7 @@ def test_save_failed_returns_5(tmp_path, monkeypatch,
capsys):
body = tmp_path / "body.json"
body.write_text(json.dumps({"x": 1}))
monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ _no_current_record(monkeypatch)
def _raise_save_failed(*a, **kw):
from vulnogram_api.client import RecordSaveFailed
@@ -112,6 +125,7 @@ def test_happy_path_returns_0(tmp_path, monkeypatch,
capsys):
body = tmp_path / "body.json"
body.write_text(json.dumps({"x": 1}))
monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ _no_current_record(monkeypatch)
monkeypatch.setattr(
record_update,
"update_record",
@@ -122,3 +136,358 @@ def test_happy_path_returns_0(tmp_path, monkeypatch,
capsys):
out = capsys.readouterr().out
assert "saved" in out
assert "CVE-2026-12345" in out
+
+
+# ---------------------------------------------------------------------------
+# Merge-mode integration tests (the new behaviour)
+# ---------------------------------------------------------------------------
+
+
+def _public_record() -> dict:
+ """A current-record snapshot with `PUBLIC` state and one
+ advisory reference. Models the canonical post-publication shape
+ that the merge-mode guards exist to protect.
+ """
+ return {
+ "comments": [],
+ "files": [],
+ "body": {
+ "cveMetadata": {"cveId": "CVE-2026-12345", "state": "PUBLISHED"},
+ "CNA_private": {"state": "PUBLIC"},
+ "containers": {
+ "cna": {
+ "affected": [
+ {
+ "packageName": "apache-foo-providers-bar",
+ "product": "Apache Foo Providers Bar",
+ }
+ ],
+ "references": [
+ {"url": "https://github.com/apache/foo/pull/100",
"tags": ["patch"]},
+ {
+ "url": "https://lists.apache.org/thread/abc",
+ "tags": ["vendor-advisory"],
+ },
+ ],
+ },
+ },
+ },
+ }
+
+
+def _new_doc_review_state_with_provider() -> dict:
+ """A regenerated body that walks state back to REVIEW. Mirrors
+ the CVE-2026-41016 regression class.
+ """
+ return {
+ "cveMetadata": {"cveId": "CVE-2026-12345", "state": "PUBLISHED"},
+ "CNA_private": {"state": "REVIEW"},
+ "containers": {
+ "cna": {
+ "affected": [
+ {
+ "packageName": "apache-foo-providers-bar",
+ "product": "Apache Foo Providers Bar",
+ }
+ ],
+ "references": [
+ {"url": "https://github.com/apache/foo/pull/100", "tags":
["patch"]},
+ ],
+ },
+ },
+ }
+
+
+def test_state_downgrade_refused_by_default(tmp_path, monkeypatch, capsys):
+ _write_session(tmp_path / "session.json")
+ body = tmp_path / "body.json"
+ body.write_text(json.dumps(_new_doc_review_state_with_provider()))
+ monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ monkeypatch.setattr(
+ record_update,
+ "_fetch_current_or_none",
+ lambda *a, **kw: _public_record(),
+ )
+ push_called: list = []
+
+ def _record_call(*a, **kw):
+ push_called.append(a)
+ return {"type": "saved"}
+
+ monkeypatch.setattr(record_update, "update_record", _record_call)
+
+ rc = record_update.main(["--cve-id", "CVE-2026-12345", "--json-file",
str(body)])
+
+ assert rc == 3
+ err = capsys.readouterr().err
+ assert "state downgrade" in err
+ assert "PUBLIC" in err
+ assert "REVIEW" in err
+ assert push_called == [], "push must not fire when a guard refuses"
+
+
+def test_state_downgrade_allowed_with_flag(tmp_path, monkeypatch, capsys):
+ _write_session(tmp_path / "session.json")
+ body = tmp_path / "body.json"
+ body.write_text(json.dumps(_new_doc_review_state_with_provider()))
+ monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ monkeypatch.setattr(
+ record_update,
+ "_fetch_current_or_none",
+ lambda *a, **kw: _public_record(),
+ )
+ monkeypatch.setattr(
+ record_update,
+ "update_record",
+ lambda *a, **kw: {"type": "saved"},
+ )
+
+ rc = record_update.main(
+ [
+ "--cve-id",
+ "CVE-2026-12345",
+ "--json-file",
+ str(body),
+ "--allow-state-downgrade",
+ ]
+ )
+
+ assert rc == 0
+
+
+def test_references_merged_by_default(tmp_path, monkeypatch):
+ """The new emission carries only the patch reference; the current
+ record's advisory URL must be preserved on the merged push.
+ """
+ _write_session(tmp_path / "session.json")
+ new_body = _new_doc_review_state_with_provider()
+ new_body["CNA_private"]["state"] = "PUBLIC" # bypass state guard
+ body = tmp_path / "body.json"
+ body.write_text(json.dumps(new_body))
+ monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ monkeypatch.setattr(
+ record_update,
+ "_fetch_current_or_none",
+ lambda *a, **kw: _public_record(),
+ )
+ captured = {}
+
+ def _capture(session, cve_id, document, **kw):
+ captured["document"] = document
+ return {"type": "saved"}
+
+ monkeypatch.setattr(record_update, "update_record", _capture)
+
+ rc = record_update.main(["--cve-id", "CVE-2026-12345", "--json-file",
str(body)])
+
+ assert rc == 0
+ refs = captured["document"]["containers"]["cna"]["references"]
+ urls = {ref["url"] for ref in refs}
+ assert "https://github.com/apache/foo/pull/100" in urls
+ assert "https://lists.apache.org/thread/abc" in urls
+
+
+def test_references_wholesale_replace_with_flag(tmp_path, monkeypatch):
+ _write_session(tmp_path / "session.json")
+ new_body = _new_doc_review_state_with_provider()
+ new_body["CNA_private"]["state"] = "PUBLIC" # bypass state guard
+ body = tmp_path / "body.json"
+ body.write_text(json.dumps(new_body))
+ monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ monkeypatch.setattr(
+ record_update,
+ "_fetch_current_or_none",
+ lambda *a, **kw: _public_record(),
+ )
+ captured = {}
+
+ def _capture(session, cve_id, document, **kw):
+ captured["document"] = document
+ return {"type": "saved"}
+
+ monkeypatch.setattr(record_update, "update_record", _capture)
+
+ rc = record_update.main(
+ [
+ "--cve-id",
+ "CVE-2026-12345",
+ "--json-file",
+ str(body),
+ "--replace-references",
+ ]
+ )
+
+ assert rc == 0
+ refs = captured["document"]["containers"]["cna"]["references"]
+ urls = {ref["url"] for ref in refs}
+ assert urls == {"https://github.com/apache/foo/pull/100"}
+
+
+def test_product_change_refused_by_default(tmp_path, monkeypatch, capsys):
+ """The regenerated body changes packageName from the providers
+ package to the core package — the CVE-2026-41016 regression.
+ """
+ _write_session(tmp_path / "session.json")
+ new_body = {
+ "cveMetadata": {"cveId": "CVE-2026-12345", "state": "PUBLISHED"},
+ "CNA_private": {"state": "PUBLIC"}, # keep state to isolate this guard
+ "containers": {
+ "cna": {
+ "affected": [
+ {
+ "packageName": "apache-foo",
+ "product": "Apache Foo",
+ }
+ ],
+ "references": [
+ {"url": "https://github.com/apache/foo/pull/100", "tags":
["patch"]},
+ {"url": "https://lists.apache.org/thread/abc", "tags":
["vendor-advisory"]},
+ ],
+ },
+ },
+ }
+ body = tmp_path / "body.json"
+ body.write_text(json.dumps(new_body))
+ monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ monkeypatch.setattr(
+ record_update,
+ "_fetch_current_or_none",
+ lambda *a, **kw: _public_record(),
+ )
+ push_called: list = []
+
+ def _record_call(*a, **kw):
+ push_called.append(a)
+ return {"type": "saved"}
+
+ monkeypatch.setattr(record_update, "update_record", _record_call)
+
+ rc = record_update.main(["--cve-id", "CVE-2026-12345", "--json-file",
str(body)])
+
+ assert rc == 3
+ err = capsys.readouterr().err
+ assert "product" in err.lower() or "packagename" in err.lower()
+ assert "apache-foo-providers-bar" in err
+ assert "apache-foo" in err
+ assert push_called == []
+
+
+def test_product_change_allowed_with_flag(tmp_path, monkeypatch):
+ _write_session(tmp_path / "session.json")
+ new_body = {
+ "cveMetadata": {"cveId": "CVE-2026-12345", "state": "PUBLISHED"},
+ "CNA_private": {"state": "PUBLIC"},
+ "containers": {
+ "cna": {
+ "affected": [
+ {
+ "packageName": "apache-foo",
+ "product": "Apache Foo",
+ }
+ ],
+ "references": [
+ {"url": "https://github.com/apache/foo/pull/100", "tags":
["patch"]},
+ {"url": "https://lists.apache.org/thread/abc", "tags":
["vendor-advisory"]},
+ ],
+ },
+ },
+ }
+ body = tmp_path / "body.json"
+ body.write_text(json.dumps(new_body))
+ monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ monkeypatch.setattr(
+ record_update,
+ "_fetch_current_or_none",
+ lambda *a, **kw: _public_record(),
+ )
+ monkeypatch.setattr(record_update, "update_record", lambda *a, **kw:
{"type": "saved"})
+
+ rc = record_update.main(
+ [
+ "--cve-id",
+ "CVE-2026-12345",
+ "--json-file",
+ str(body),
+ "--allow-product-change",
+ ]
+ )
+
+ assert rc == 0
+
+
+def test_full_replace_overrides_all_three(tmp_path, monkeypatch, capsys):
+ """`--full-replace` is the umbrella: it should allow a record
+ that combines all three regressions (state downgrade + reference
+ drop + product change) without firing any guard.
+ """
+ _write_session(tmp_path / "session.json")
+ new_body = _new_doc_review_state_with_provider() # REVIEW state
+ new_body["containers"]["cna"]["affected"][0] = {
+ "packageName": "apache-foo",
+ "product": "Apache Foo",
+ } # changed product
+ body = tmp_path / "body.json"
+ body.write_text(json.dumps(new_body))
+ monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ monkeypatch.setattr(
+ record_update,
+ "_fetch_current_or_none",
+ lambda *a, **kw: _public_record(),
+ )
+ captured = {}
+
+ def _capture(session, cve_id, document, **kw):
+ captured["document"] = document
+ return {"type": "saved"}
+
+ monkeypatch.setattr(record_update, "update_record", _capture)
+
+ rc = record_update.main(
+ [
+ "--cve-id",
+ "CVE-2026-12345",
+ "--json-file",
+ str(body),
+ "--full-replace",
+ ]
+ )
+
+ assert rc == 0
+ # References were replaced wholesale — the advisory URL is gone.
+ refs = captured["document"]["containers"]["cna"]["references"]
+ urls = {ref["url"] for ref in refs}
+ assert "https://lists.apache.org/thread/abc" not in urls
+
+
+def test_new_record_skips_all_guards(tmp_path, monkeypatch):
+ """First push for a CVE ID: get_record returns None and the
+ merge-mode guards are no-ops. The original document is pushed
+ verbatim with no state-downgrade / product-change refusal.
+ """
+ _write_session(tmp_path / "session.json")
+ new_body = _new_doc_review_state_with_provider() # REVIEW state, fewer
refs
+ body = tmp_path / "body.json"
+ body.write_text(json.dumps(new_body))
+ monkeypatch.setenv("VULNOGRAM_SESSION", str(tmp_path / "session.json"))
+ monkeypatch.setattr(
+ record_update,
+ "_fetch_current_or_none",
+ lambda *a, **kw: None, # record doesn't exist yet
+ )
+ captured = {}
+
+ def _capture(session, cve_id, document, **kw):
+ captured["document"] = document
+ return {"type": "saved"}
+
+ monkeypatch.setattr(record_update, "update_record", _capture)
+
+ rc = record_update.main(["--cve-id", "CVE-2026-12345", "--json-file",
str(body)])
+
+ assert rc == 0
+ # The pushed body matches the input verbatim (modulo a deep copy
+ # that the guards make but skip mutating).
+ assert captured["document"]["CNA_private"]["state"] == "REVIEW"
+ refs = captured["document"]["containers"]["cna"]["references"]
+ urls = {ref["url"] for ref in refs}
+ assert urls == {"https://github.com/apache/foo/pull/100"}