This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1b37cbbe54f Fix Provider Registry showing 0 monthly downloads for many
providers (#67670)
1b37cbbe54f is described below
commit 1b37cbbe54f693e7297b352bc96e96c31ae5633c
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri May 29 10:14:55 2026 +0100
Fix Provider Registry showing 0 monthly downloads for many providers
(#67670)
---
dev/registry/extract_metadata.py | 123 +++++++++++++++++++++++++---
dev/registry/tests/test_extract_metadata.py | 119 ++++++++++++++++++++-------
2 files changed, 202 insertions(+), 40 deletions(-)
diff --git a/dev/registry/extract_metadata.py b/dev/registry/extract_metadata.py
index e84ca631334..1c557a3a8fa 100644
--- a/dev/registry/extract_metadata.py
+++ b/dev/registry/extract_metadata.py
@@ -55,6 +55,10 @@ from registry_contract_models import
validate_providers_catalog
# External endpoints used by metadata extraction.
PYPISTATS_RECENT_URL =
"https://pypistats.org/api/packages/{package_name}/recent"
PYPI_PACKAGE_JSON_URL = "https://pypi.org/pypi/{package_name}/json"
+# ClickHouse's public PyPI dataset (the data behind clickpy.clickhouse.com),
sourced
+# from the same PyPI download logs as pypistats.org but queryable for every
package
+# in a single SQL request -- no per-package rate limiting.
+CLICKHOUSE_PYPI_URL = "https://sql-clickhouse.clickhouse.com/?user=demo"
S3_DOC_URL = "http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com"
AIRFLOW_PROVIDER_DOCS_URL =
"https://airflow.apache.org/docs/{package_name}/stable/"
AIRFLOW_PROVIDER_SOURCE_URL = (
@@ -108,25 +112,84 @@ def fetch_pypi_dates(package_name: str) -> dict[str, str]:
return {"first_released": "", "last_updated": ""}
+def fetch_pypi_downloads_clickhouse(package_names: list[str]) -> dict[str,
dict[str, int]]:
+ """Fetch weekly/monthly downloads for many packages in ONE query.
+
+ Replaces the previous ~N parallel ``pypistats.org/api/recent`` calls. That
burst
+ tripped pypistats' per-IP rate limit, and ``fetch_pypi_downloads``
silently turned
+ each 429 into ``{weekly: 0, monthly: 0}`` -- zeroing roughly a third of
providers
+ on the live registry. ClickHouse exposes the same underlying PyPI download
data
+ via a single SQL endpoint, so one request covers every package with
nothing to
+ rate-limit.
+
+ Returns ``{package_name: {"weekly": int, "monthly": int, "total": 0}}``.
On any
+ failure returns ``{}`` so callers fall back to pypistats per-package.
``total`` is
+ left at 0 to match the existing schema (the registry has never populated
it).
+ """
+ if not package_names:
+ return {}
+ # Package names are ``apache-airflow-providers-*`` (only [a-z0-9-]); strip
any
+ # stray quote defensively before interpolating into the IN-list.
+ in_list = ", ".join("'" + name.replace("'", "") + "'" for name in
package_names)
+ # Anchor the rolling windows on the dataset's latest loaded date, not
today(): the
+ # public dataset lags a few days, so a today()-relative window would be
truncated to
+ # the loaded days and undercount. max(date) gives a true last-7/30-day
rolling sum,
+ # matching pypistats' last_week/last_month semantics.
+ query = (
+ "WITH (SELECT max(date) FROM pypi.pypi_downloads_per_day) AS max_date "
+ "SELECT project, "
+ "toUInt64(sumIf(count, date > max_date - 7)) AS weekly, "
+ "toUInt64(sumIf(count, date > max_date - 30)) AS monthly "
+ "FROM pypi.pypi_downloads_per_day "
+ f"WHERE project IN ({in_list}) "
+ "GROUP BY project FORMAT TSV"
+ )
+ try:
+ request = urllib.request.Request(CLICKHOUSE_PYPI_URL,
data=query.encode())
+ with urllib.request.urlopen(request, timeout=30) as response:
+ body = response.read().decode()
+ except Exception as e:
+ print(f" Warning: ClickHouse download query failed ({e}); falling
back to pypistats")
+ return {}
+ result: dict[str, dict[str, int]] = {}
+ for line in body.splitlines():
+ parts = line.split("\t")
+ if len(parts) != 3:
+ continue
+ project, weekly, monthly = parts
+ try:
+ result[project] = {"weekly": int(weekly), "monthly": int(monthly),
"total": 0}
+ except ValueError:
+ continue
+ return result
+
+
def fetch_pypi_data_parallel(
package_names: list[str], max_workers: int = 16
) -> dict[str, tuple[dict[str, int], dict[str, str]]]:
- """Fetch downloads + dates for many packages concurrently.
-
- Returns ``{package_name: (downloads_dict, dates_dict)}``. Each per-package
- failure is isolated -- ``fetch_pypi_downloads`` / ``fetch_pypi_dates``
- already return zero-value defaults on error, so a flaky pypistats response
- only zeroes that one package instead of stalling or crashing the build.
-
- Concurrency: ~86 providers * 2 endpoints, but pypistats.org and pypi.org
- each tolerate 16 parallel connections without rate-limiting in practice.
- Matches the ``max_workers=8`` shape used in extract_parameters.py for
- inventory fetches; bumped slightly because PyPI calls are simpler/faster.
+ """Fetch downloads + release dates for many packages.
+
+ Downloads come from a single ClickHouse query
(``fetch_pypi_downloads_clickhouse``).
+ Any package ClickHouse doesn't return -- or returns zero for, e.g. a
just-published
+ provider not yet in its dataset -- falls back to a per-package
pypistats.org call;
+ that path is now rare, so it no longer produces a rate-limiting burst.
Release dates
+ still come from pypi.org/json, fetched in parallel (a per-package-only
endpoint that
+ was never the source of the rate-limiting that motivated the ClickHouse
switch).
+
+ Returns ``{package_name: (downloads_dict, dates_dict)}``. Per-package
failures are
+ isolated -- the fallbacks return zero-value defaults -- so one flaky
response only
+ affects that package.
"""
+ clickhouse_downloads = fetch_pypi_downloads_clickhouse(package_names)
results: dict[str, tuple[dict[str, int], dict[str, str]]] = {}
def _fetch_one(pkg: str) -> tuple[str, dict[str, int], dict[str, str]]:
- return pkg, fetch_pypi_downloads(pkg), fetch_pypi_dates(pkg)
+ downloads = clickhouse_downloads.get(pkg)
+ if not downloads or downloads.get("monthly", 0) == 0:
+ # Missing from ClickHouse (or zero) -- fall back to pypistats for
this one
+ # package only. No burst because this is the exception, not every
package.
+ downloads = fetch_pypi_downloads(pkg)
+ return pkg, downloads, fetch_pypi_dates(pkg)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as
executor:
futures = [executor.submit(_fetch_one, pkg) for pkg in package_names]
@@ -137,6 +200,27 @@ def fetch_pypi_data_parallel(
return results
+def preserve_nonzero_downloads(new_providers: list[dict], existing_providers:
list[dict]) -> int:
+ """Keep a provider's previous download counts when this build fetched zero.
+
+ Defense-in-depth for the case where BOTH ClickHouse and the pypistats
fallback fail
+ for a package: rather than overwrite a known-good number on the live
registry with a
+ spurious zero, retain the previous ``pypi_downloads`` from the existing
catalog.
+ Mutates ``new_providers`` in place; returns the number of providers
preserved.
+ """
+ existing_by_id = {p["id"]: p for p in existing_providers}
+ preserved = 0
+ for provider in new_providers:
+ if (provider.get("pypi_downloads") or {}).get("monthly", 0):
+ continue # this build got a real number; nothing to preserve
+ previous = existing_by_id.get(provider["id"],
{}).get("pypi_downloads") or {}
+ if previous.get("monthly", 0) > 0:
+ provider["pypi_downloads"] = previous
+ preserved += 1
+ print(f" Preserved previous downloads for {provider['id']}
(this build fetched 0)")
+ return preserved
+
+
def _parse_inventory_lines(inv_path: Path) -> list[str]:
"""Read and decompress the body of a Sphinx objects.inv file."""
with inv_path.open("rb") as f:
@@ -763,6 +847,21 @@ def main():
# Convert to JSON-serializable format
new_providers = [asdict(p) for p in all_providers]
+ # Defense-in-depth: if this build fetched 0 downloads for a provider that
had a real
+ # number on the previous (S3) providers.json, keep the previous number
rather than
+ # publishing a spurious zero (#1309). No-op when there's no existing
catalog on disk.
+ for candidate_dir in (SCRIPT_DIR, OUTPUT_DIR):
+ existing_catalog_path = candidate_dir / "providers.json"
+ if existing_catalog_path.exists():
+ try:
+ previous_catalog =
json.loads(existing_catalog_path.read_text())
+ except json.JSONDecodeError:
+ break
+ preserved = preserve_nonzero_downloads(new_providers,
previous_catalog.get("providers", []))
+ if preserved:
+ print(f"Preserved previous download counts for {preserved}
provider(s) that fetched 0")
+ break
+
# In incremental mode, merge new providers into existing providers.json
# so parallel runs for different providers don't clobber each other.
if requested_providers:
diff --git a/dev/registry/tests/test_extract_metadata.py
b/dev/registry/tests/test_extract_metadata.py
index ab2dfb75f7d..e4e98fcdc69 100644
--- a/dev/registry/tests/test_extract_metadata.py
+++ b/dev/registry/tests/test_extract_metadata.py
@@ -32,11 +32,13 @@ from extract_metadata import (
fetch_pypi_data_parallel,
fetch_pypi_dates,
fetch_pypi_downloads,
+ fetch_pypi_downloads_clickhouse,
find_latest_released_version,
find_related_providers,
load_release_tags,
module_path_to_file_path,
parse_pyproject_toml,
+ preserve_nonzero_downloads,
read_connection_urls,
read_inventory,
resolve_connection_docs_url,
@@ -261,19 +263,59 @@ class TestFetchPypiDates:
assert result == {"first_released": "", "last_updated": ""}
+# ---------------------------------------------------------------------------
+# fetch_pypi_downloads_clickhouse (mocked network)
+# ---------------------------------------------------------------------------
+class TestFetchPypiDownloadsClickhouse:
+ @patch("extract_metadata.urllib.request.urlopen")
+ def test_parses_tsv(self, mock_urlopen):
+ body = (
+ b"apache-airflow-providers-amazon\t766462\t8218125\n"
+ b"apache-airflow-providers-common-ai\t30881\t171721\n"
+ )
+ mock_response = MagicMock(spec=http.client.HTTPResponse)
+ mock_response.read.return_value = body
+ mock_response.__enter__ = MagicMock(return_value=mock_response)
+ mock_response.__exit__ = MagicMock(return_value=False)
+ mock_urlopen.return_value = mock_response
+
+ result = fetch_pypi_downloads_clickhouse(
+ ["apache-airflow-providers-amazon",
"apache-airflow-providers-common-ai"]
+ )
+ assert result["apache-airflow-providers-amazon"] == {"weekly": 766462,
"monthly": 8218125, "total": 0}
+ assert result["apache-airflow-providers-common-ai"] == {
+ "weekly": 30881,
+ "monthly": 171721,
+ "total": 0,
+ }
+
+ def test_empty_input_skips_query(self):
+ with patch("extract_metadata.urllib.request.urlopen") as mock_urlopen:
+ assert fetch_pypi_downloads_clickhouse([]) == {}
+ mock_urlopen.assert_not_called()
+
+ @patch("extract_metadata.urllib.request.urlopen",
side_effect=OSError("clickhouse down"))
+ def test_failure_returns_empty(self, _mock):
+ # On any failure, return {} so callers fall back to pypistats.
+ assert
fetch_pypi_downloads_clickhouse(["apache-airflow-providers-amazon"]) == {}
+
+
# ---------------------------------------------------------------------------
# fetch_pypi_data_parallel
# ---------------------------------------------------------------------------
class TestFetchPypiDataParallel:
- def test_returns_one_entry_per_package(self):
- pkgs = ["pkg-a", "pkg-b", "pkg-c"]
+ def test_uses_clickhouse_without_pypistats(self):
+ # When ClickHouse returns real numbers, pypistats is never called (no
burst).
+ pkgs = ["pkg-a", "pkg-b"]
+ clickhouse = {p: {"weekly": 1, "monthly": 10, "total": 0} for p in
pkgs}
with (
+ patch("extract_metadata.fetch_pypi_downloads_clickhouse",
return_value=clickhouse),
patch("extract_metadata.fetch_pypi_downloads") as dl,
- patch("extract_metadata.fetch_pypi_dates") as dt,
+ patch(
+ "extract_metadata.fetch_pypi_dates",
+ return_value={"first_released": "2024-01-01", "last_updated":
"2024-06-01"},
+ ),
):
- dl.side_effect = lambda p: {"weekly": 1, "monthly": 10, "total": 0}
- dt.side_effect = lambda p: {"first_released": "2024-01-01",
"last_updated": "2024-06-01"}
-
result = fetch_pypi_data_parallel(pkgs, max_workers=4)
assert set(result) == set(pkgs)
@@ -281,27 +323,21 @@ class TestFetchPypiDataParallel:
downloads, dates = result[pkg]
assert downloads == {"weekly": 1, "monthly": 10, "total": 0}
assert dates == {"first_released": "2024-01-01", "last_updated":
"2024-06-01"}
- assert dl.call_count == 3
- assert dt.call_count == 3
-
- def test_empty_input(self):
- assert fetch_pypi_data_parallel([]) == {}
-
- def test_per_package_failure_does_not_abort_others(self):
- # Simulate `fetch_pypi_downloads` raising for one package; the worker
- # already returns zero-value defaults on error, but verify the
orchestrator
- # propagates per-package isolation by accepting the worker's existing
- # error handling -- a single bad package must not poison the dict.
- pkgs = ["good", "bad"]
-
- def downloads_side_effect(pkg):
- if pkg == "bad":
- # mimic the existing fetch_pypi_downloads error path
- return {"weekly": 0, "monthly": 0, "total": 0}
- return {"weekly": 5, "monthly": 50, "total": 0}
-
+ dl.assert_not_called()
+
+ def test_falls_back_to_pypistats_when_clickhouse_missing_or_zero(self):
+ # ClickHouse is missing 'b' and returns zero for 'c' -- both fall back
to pypistats.
+ pkgs = ["a", "b", "c"]
+ clickhouse = {
+ "a": {"weekly": 1, "monthly": 10, "total": 0},
+ "c": {"weekly": 0, "monthly": 0, "total": 0},
+ }
with (
- patch("extract_metadata.fetch_pypi_downloads",
side_effect=downloads_side_effect),
+ patch("extract_metadata.fetch_pypi_downloads_clickhouse",
return_value=clickhouse),
+ patch(
+ "extract_metadata.fetch_pypi_downloads",
+ return_value={"weekly": 9, "monthly": 99, "total": 0},
+ ) as dl,
patch(
"extract_metadata.fetch_pypi_dates",
return_value={"first_released": "", "last_updated": ""},
@@ -309,8 +345,35 @@ class TestFetchPypiDataParallel:
):
result = fetch_pypi_data_parallel(pkgs)
- assert result["good"][0] == {"weekly": 5, "monthly": 50, "total": 0}
- assert result["bad"][0] == {"weekly": 0, "monthly": 0, "total": 0}
+ assert result["a"][0] == {"weekly": 1, "monthly": 10, "total": 0} #
from ClickHouse
+ assert result["b"][0] == {"weekly": 9, "monthly": 99, "total": 0} #
fallback (missing)
+ assert result["c"][0] == {"weekly": 9, "monthly": 99, "total": 0} #
fallback (zero)
+ assert {call.args[0] for call in dl.call_args_list} == {"b", "c"}
+
+ def test_empty_input(self):
+ assert fetch_pypi_data_parallel([]) == {}
+
+
+# ---------------------------------------------------------------------------
+# preserve_nonzero_downloads
+# ---------------------------------------------------------------------------
+class TestPreserveNonzeroDownloads:
+ def test_preserves_when_new_is_zero_and_previous_nonzero(self):
+ new = [{"id": "amazon", "pypi_downloads": {"weekly": 0, "monthly": 0,
"total": 0}}]
+ existing = [{"id": "amazon", "pypi_downloads": {"weekly": 766462,
"monthly": 8218125, "total": 0}}]
+ assert preserve_nonzero_downloads(new, existing) == 1
+ assert new[0]["pypi_downloads"]["monthly"] == 8218125
+
+ def test_keeps_new_when_it_has_a_number(self):
+ new = [{"id": "amazon", "pypi_downloads": {"weekly": 5, "monthly": 50,
"total": 0}}]
+ existing = [{"id": "amazon", "pypi_downloads": {"weekly": 9,
"monthly": 99, "total": 0}}]
+ assert preserve_nonzero_downloads(new, existing) == 0
+ assert new[0]["pypi_downloads"]["monthly"] == 50
+
+ def test_no_previous_entry_leaves_zero(self):
+ new = [{"id": "brand-new", "pypi_downloads": {"weekly": 0, "monthly":
0, "total": 0}}]
+ assert preserve_nonzero_downloads(new, []) == 0
+ assert new[0]["pypi_downloads"]["monthly"] == 0
# ---------------------------------------------------------------------------