This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b37cbbe54f Fix Provider Registry showing 0 monthly downloads for many 
providers (#67670)
1b37cbbe54f is described below

commit 1b37cbbe54f693e7297b352bc96e96c31ae5633c
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri May 29 10:14:55 2026 +0100

    Fix Provider Registry showing 0 monthly downloads for many providers 
(#67670)
---
 dev/registry/extract_metadata.py            | 123 +++++++++++++++++++++++++---
 dev/registry/tests/test_extract_metadata.py | 119 ++++++++++++++++++++-------
 2 files changed, 202 insertions(+), 40 deletions(-)

diff --git a/dev/registry/extract_metadata.py b/dev/registry/extract_metadata.py
index e84ca631334..1c557a3a8fa 100644
--- a/dev/registry/extract_metadata.py
+++ b/dev/registry/extract_metadata.py
@@ -55,6 +55,10 @@ from registry_contract_models import 
validate_providers_catalog
 # External endpoints used by metadata extraction.
 PYPISTATS_RECENT_URL = 
"https://pypistats.org/api/packages/{package_name}/recent";
 PYPI_PACKAGE_JSON_URL = "https://pypi.org/pypi/{package_name}/json";
+# ClickHouse's public PyPI dataset (the data behind clickpy.clickhouse.com), 
sourced
+# from the same PyPI download logs as pypistats.org but queryable for every 
package
+# in a single SQL request -- no per-package rate limiting.
+CLICKHOUSE_PYPI_URL = "https://sql-clickhouse.clickhouse.com/?user=demo";
 S3_DOC_URL = "http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com";
 AIRFLOW_PROVIDER_DOCS_URL = 
"https://airflow.apache.org/docs/{package_name}/stable/";
 AIRFLOW_PROVIDER_SOURCE_URL = (
@@ -108,25 +112,84 @@ def fetch_pypi_dates(package_name: str) -> dict[str, str]:
         return {"first_released": "", "last_updated": ""}
 
 
+def fetch_pypi_downloads_clickhouse(package_names: list[str]) -> dict[str, 
dict[str, int]]:
+    """Fetch weekly/monthly downloads for many packages in ONE query.
+
+    Replaces the previous ~N parallel ``pypistats.org/api/recent`` calls. That 
burst
+    tripped pypistats' per-IP rate limit, and ``fetch_pypi_downloads`` 
silently turned
+    each 429 into ``{weekly: 0, monthly: 0}`` -- zeroing roughly a third of 
providers
+    on the live registry. ClickHouse exposes the same underlying PyPI download 
data
+    via a single SQL endpoint, so one request covers every package with 
nothing to
+    rate-limit.
+
+    Returns ``{package_name: {"weekly": int, "monthly": int, "total": 0}}``. 
On any
+    failure returns ``{}`` so callers fall back to pypistats per-package. 
``total`` is
+    left at 0 to match the existing schema (the registry has never populated 
it).
+    """
+    if not package_names:
+        return {}
+    # Package names are ``apache-airflow-providers-*`` (only [a-z0-9-]); strip 
any
+    # stray quote defensively before interpolating into the IN-list.
+    in_list = ", ".join("'" + name.replace("'", "") + "'" for name in 
package_names)
+    # Anchor the rolling windows on the dataset's latest loaded date, not 
today(): the
+    # public dataset lags a few days, so a today()-relative window would be 
truncated to
+    # the loaded days and undercount. max(date) gives a true last-7/30-day 
rolling sum,
+    # matching pypistats' last_week/last_month semantics.
+    query = (
+        "WITH (SELECT max(date) FROM pypi.pypi_downloads_per_day) AS max_date "
+        "SELECT project, "
+        "toUInt64(sumIf(count, date > max_date - 7)) AS weekly, "
+        "toUInt64(sumIf(count, date > max_date - 30)) AS monthly "
+        "FROM pypi.pypi_downloads_per_day "
+        f"WHERE project IN ({in_list}) "
+        "GROUP BY project FORMAT TSV"
+    )
+    try:
+        request = urllib.request.Request(CLICKHOUSE_PYPI_URL, 
data=query.encode())
+        with urllib.request.urlopen(request, timeout=30) as response:
+            body = response.read().decode()
+    except Exception as e:
+        print(f"    Warning: ClickHouse download query failed ({e}); falling 
back to pypistats")
+        return {}
+    result: dict[str, dict[str, int]] = {}
+    for line in body.splitlines():
+        parts = line.split("\t")
+        if len(parts) != 3:
+            continue
+        project, weekly, monthly = parts
+        try:
+            result[project] = {"weekly": int(weekly), "monthly": int(monthly), 
"total": 0}
+        except ValueError:
+            continue
+    return result
+
+
 def fetch_pypi_data_parallel(
     package_names: list[str], max_workers: int = 16
 ) -> dict[str, tuple[dict[str, int], dict[str, str]]]:
-    """Fetch downloads + dates for many packages concurrently.
-
-    Returns ``{package_name: (downloads_dict, dates_dict)}``. Each per-package
-    failure is isolated -- ``fetch_pypi_downloads`` / ``fetch_pypi_dates``
-    already return zero-value defaults on error, so a flaky pypistats response
-    only zeroes that one package instead of stalling or crashing the build.
-
-    Concurrency: ~86 providers * 2 endpoints, but pypistats.org and pypi.org
-    each tolerate 16 parallel connections without rate-limiting in practice.
-    Matches the ``max_workers=8`` shape used in extract_parameters.py for
-    inventory fetches; bumped slightly because PyPI calls are simpler/faster.
+    """Fetch downloads + release dates for many packages.
+
+    Downloads come from a single ClickHouse query 
(``fetch_pypi_downloads_clickhouse``).
+    Any package ClickHouse doesn't return -- or returns zero for, e.g. a 
just-published
+    provider not yet in its dataset -- falls back to a per-package 
pypistats.org call;
+    that path is now rare, so it no longer produces a rate-limiting burst. 
Release dates
+    still come from pypi.org/json, fetched in parallel (a per-package-only 
endpoint that
+    was never the source of the rate-limiting that motivated the ClickHouse 
switch).
+
+    Returns ``{package_name: (downloads_dict, dates_dict)}``. Per-package 
failures are
+    isolated -- the fallbacks return zero-value defaults -- so one flaky 
response only
+    affects that package.
     """
+    clickhouse_downloads = fetch_pypi_downloads_clickhouse(package_names)
     results: dict[str, tuple[dict[str, int], dict[str, str]]] = {}
 
     def _fetch_one(pkg: str) -> tuple[str, dict[str, int], dict[str, str]]:
-        return pkg, fetch_pypi_downloads(pkg), fetch_pypi_dates(pkg)
+        downloads = clickhouse_downloads.get(pkg)
+        if not downloads or downloads.get("monthly", 0) == 0:
+            # Missing from ClickHouse (or zero) -- fall back to pypistats for 
this one
+            # package only. No burst because this is the exception, not every 
package.
+            downloads = fetch_pypi_downloads(pkg)
+        return pkg, downloads, fetch_pypi_dates(pkg)
 
     with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as 
executor:
         futures = [executor.submit(_fetch_one, pkg) for pkg in package_names]
@@ -137,6 +200,27 @@ def fetch_pypi_data_parallel(
     return results
 
 
+def preserve_nonzero_downloads(new_providers: list[dict], existing_providers: 
list[dict]) -> int:
+    """Keep a provider's previous download counts when this build fetched zero.
+
+    Defense-in-depth for the case where BOTH ClickHouse and the pypistats 
fallback fail
+    for a package: rather than overwrite a known-good number on the live 
registry with a
+    spurious zero, retain the previous ``pypi_downloads`` from the existing 
catalog.
+    Mutates ``new_providers`` in place; returns the number of providers 
preserved.
+    """
+    existing_by_id = {p["id"]: p for p in existing_providers}
+    preserved = 0
+    for provider in new_providers:
+        if (provider.get("pypi_downloads") or {}).get("monthly", 0):
+            continue  # this build got a real number; nothing to preserve
+        previous = existing_by_id.get(provider["id"], 
{}).get("pypi_downloads") or {}
+        if previous.get("monthly", 0) > 0:
+            provider["pypi_downloads"] = previous
+            preserved += 1
+            print(f"    Preserved previous downloads for {provider['id']} 
(this build fetched 0)")
+    return preserved
+
+
 def _parse_inventory_lines(inv_path: Path) -> list[str]:
     """Read and decompress the body of a Sphinx objects.inv file."""
     with inv_path.open("rb") as f:
@@ -763,6 +847,21 @@ def main():
     # Convert to JSON-serializable format
     new_providers = [asdict(p) for p in all_providers]
 
+    # Defense-in-depth: if this build fetched 0 downloads for a provider that 
had a real
+    # number on the previous (S3) providers.json, keep the previous number 
rather than
+    # publishing a spurious zero (#1309). No-op when there's no existing 
catalog on disk.
+    for candidate_dir in (SCRIPT_DIR, OUTPUT_DIR):
+        existing_catalog_path = candidate_dir / "providers.json"
+        if existing_catalog_path.exists():
+            try:
+                previous_catalog = 
json.loads(existing_catalog_path.read_text())
+            except json.JSONDecodeError:
+                break
+            preserved = preserve_nonzero_downloads(new_providers, 
previous_catalog.get("providers", []))
+            if preserved:
+                print(f"Preserved previous download counts for {preserved} 
provider(s) that fetched 0")
+            break
+
     # In incremental mode, merge new providers into existing providers.json
     # so parallel runs for different providers don't clobber each other.
     if requested_providers:
diff --git a/dev/registry/tests/test_extract_metadata.py 
b/dev/registry/tests/test_extract_metadata.py
index ab2dfb75f7d..e4e98fcdc69 100644
--- a/dev/registry/tests/test_extract_metadata.py
+++ b/dev/registry/tests/test_extract_metadata.py
@@ -32,11 +32,13 @@ from extract_metadata import (
     fetch_pypi_data_parallel,
     fetch_pypi_dates,
     fetch_pypi_downloads,
+    fetch_pypi_downloads_clickhouse,
     find_latest_released_version,
     find_related_providers,
     load_release_tags,
     module_path_to_file_path,
     parse_pyproject_toml,
+    preserve_nonzero_downloads,
     read_connection_urls,
     read_inventory,
     resolve_connection_docs_url,
@@ -261,19 +263,59 @@ class TestFetchPypiDates:
         assert result == {"first_released": "", "last_updated": ""}
 
 
+# ---------------------------------------------------------------------------
+# fetch_pypi_downloads_clickhouse (mocked network)
+# ---------------------------------------------------------------------------
+class TestFetchPypiDownloadsClickhouse:
+    @patch("extract_metadata.urllib.request.urlopen")
+    def test_parses_tsv(self, mock_urlopen):
+        body = (
+            b"apache-airflow-providers-amazon\t766462\t8218125\n"
+            b"apache-airflow-providers-common-ai\t30881\t171721\n"
+        )
+        mock_response = MagicMock(spec=http.client.HTTPResponse)
+        mock_response.read.return_value = body
+        mock_response.__enter__ = MagicMock(return_value=mock_response)
+        mock_response.__exit__ = MagicMock(return_value=False)
+        mock_urlopen.return_value = mock_response
+
+        result = fetch_pypi_downloads_clickhouse(
+            ["apache-airflow-providers-amazon", 
"apache-airflow-providers-common-ai"]
+        )
+        assert result["apache-airflow-providers-amazon"] == {"weekly": 766462, 
"monthly": 8218125, "total": 0}
+        assert result["apache-airflow-providers-common-ai"] == {
+            "weekly": 30881,
+            "monthly": 171721,
+            "total": 0,
+        }
+
+    def test_empty_input_skips_query(self):
+        with patch("extract_metadata.urllib.request.urlopen") as mock_urlopen:
+            assert fetch_pypi_downloads_clickhouse([]) == {}
+            mock_urlopen.assert_not_called()
+
+    @patch("extract_metadata.urllib.request.urlopen", 
side_effect=OSError("clickhouse down"))
+    def test_failure_returns_empty(self, _mock):
+        # On any failure, return {} so callers fall back to pypistats.
+        assert 
fetch_pypi_downloads_clickhouse(["apache-airflow-providers-amazon"]) == {}
+
+
 # ---------------------------------------------------------------------------
 # fetch_pypi_data_parallel
 # ---------------------------------------------------------------------------
 class TestFetchPypiDataParallel:
-    def test_returns_one_entry_per_package(self):
-        pkgs = ["pkg-a", "pkg-b", "pkg-c"]
+    def test_uses_clickhouse_without_pypistats(self):
+        # When ClickHouse returns real numbers, pypistats is never called (no 
burst).
+        pkgs = ["pkg-a", "pkg-b"]
+        clickhouse = {p: {"weekly": 1, "monthly": 10, "total": 0} for p in 
pkgs}
         with (
+            patch("extract_metadata.fetch_pypi_downloads_clickhouse", 
return_value=clickhouse),
             patch("extract_metadata.fetch_pypi_downloads") as dl,
-            patch("extract_metadata.fetch_pypi_dates") as dt,
+            patch(
+                "extract_metadata.fetch_pypi_dates",
+                return_value={"first_released": "2024-01-01", "last_updated": 
"2024-06-01"},
+            ),
         ):
-            dl.side_effect = lambda p: {"weekly": 1, "monthly": 10, "total": 0}
-            dt.side_effect = lambda p: {"first_released": "2024-01-01", 
"last_updated": "2024-06-01"}
-
             result = fetch_pypi_data_parallel(pkgs, max_workers=4)
 
         assert set(result) == set(pkgs)
@@ -281,27 +323,21 @@ class TestFetchPypiDataParallel:
             downloads, dates = result[pkg]
             assert downloads == {"weekly": 1, "monthly": 10, "total": 0}
             assert dates == {"first_released": "2024-01-01", "last_updated": 
"2024-06-01"}
-        assert dl.call_count == 3
-        assert dt.call_count == 3
-
-    def test_empty_input(self):
-        assert fetch_pypi_data_parallel([]) == {}
-
-    def test_per_package_failure_does_not_abort_others(self):
-        # Simulate `fetch_pypi_downloads` raising for one package; the worker
-        # already returns zero-value defaults on error, but verify the 
orchestrator
-        # propagates per-package isolation by accepting the worker's existing
-        # error handling -- a single bad package must not poison the dict.
-        pkgs = ["good", "bad"]
-
-        def downloads_side_effect(pkg):
-            if pkg == "bad":
-                # mimic the existing fetch_pypi_downloads error path
-                return {"weekly": 0, "monthly": 0, "total": 0}
-            return {"weekly": 5, "monthly": 50, "total": 0}
-
+        dl.assert_not_called()
+
+    def test_falls_back_to_pypistats_when_clickhouse_missing_or_zero(self):
+        # ClickHouse is missing 'b' and returns zero for 'c' -- both fall back 
to pypistats.
+        pkgs = ["a", "b", "c"]
+        clickhouse = {
+            "a": {"weekly": 1, "monthly": 10, "total": 0},
+            "c": {"weekly": 0, "monthly": 0, "total": 0},
+        }
         with (
-            patch("extract_metadata.fetch_pypi_downloads", 
side_effect=downloads_side_effect),
+            patch("extract_metadata.fetch_pypi_downloads_clickhouse", 
return_value=clickhouse),
+            patch(
+                "extract_metadata.fetch_pypi_downloads",
+                return_value={"weekly": 9, "monthly": 99, "total": 0},
+            ) as dl,
             patch(
                 "extract_metadata.fetch_pypi_dates",
                 return_value={"first_released": "", "last_updated": ""},
@@ -309,8 +345,35 @@ class TestFetchPypiDataParallel:
         ):
             result = fetch_pypi_data_parallel(pkgs)
 
-        assert result["good"][0] == {"weekly": 5, "monthly": 50, "total": 0}
-        assert result["bad"][0] == {"weekly": 0, "monthly": 0, "total": 0}
+        assert result["a"][0] == {"weekly": 1, "monthly": 10, "total": 0}  # 
from ClickHouse
+        assert result["b"][0] == {"weekly": 9, "monthly": 99, "total": 0}  # 
fallback (missing)
+        assert result["c"][0] == {"weekly": 9, "monthly": 99, "total": 0}  # 
fallback (zero)
+        assert {call.args[0] for call in dl.call_args_list} == {"b", "c"}
+
+    def test_empty_input(self):
+        assert fetch_pypi_data_parallel([]) == {}
+
+
+# ---------------------------------------------------------------------------
+# preserve_nonzero_downloads
+# ---------------------------------------------------------------------------
+class TestPreserveNonzeroDownloads:
+    def test_preserves_when_new_is_zero_and_previous_nonzero(self):
+        new = [{"id": "amazon", "pypi_downloads": {"weekly": 0, "monthly": 0, 
"total": 0}}]
+        existing = [{"id": "amazon", "pypi_downloads": {"weekly": 766462, 
"monthly": 8218125, "total": 0}}]
+        assert preserve_nonzero_downloads(new, existing) == 1
+        assert new[0]["pypi_downloads"]["monthly"] == 8218125
+
+    def test_keeps_new_when_it_has_a_number(self):
+        new = [{"id": "amazon", "pypi_downloads": {"weekly": 5, "monthly": 50, 
"total": 0}}]
+        existing = [{"id": "amazon", "pypi_downloads": {"weekly": 9, 
"monthly": 99, "total": 0}}]
+        assert preserve_nonzero_downloads(new, existing) == 0
+        assert new[0]["pypi_downloads"]["monthly"] == 50
+
+    def test_no_previous_entry_leaves_zero(self):
+        new = [{"id": "brand-new", "pypi_downloads": {"weekly": 0, "monthly": 
0, "total": 0}}]
+        assert preserve_nonzero_downloads(new, []) == 0
+        assert new[0]["pypi_downloads"]["monthly"] == 0
 
 
 # ---------------------------------------------------------------------------

Reply via email to