jason810496 commented on code in PR #62261: URL: https://github.com/apache/airflow/pull/62261#discussion_r2837502947
########## registry/README.md: ########## @@ -0,0 +1,470 @@ +<!-- Review Comment: Non-blocking nit: Do we need to add pyproject.toml for registry? Or using `PEP-723 inline dependencies` for the scripts? Although there's only `PyYaml` dependencies right now. ########## dev/registry/extract_metadata.py: ########## @@ -0,0 +1,604 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Airflow Registry Metadata Extractor + +Extracts provider metadata from: +1. provider.yaml files - Rich metadata (integrations, logos, categories) +2. pyproject.toml files - Dependencies, Python version constraints +3. PyPI API - Download statistics and release dates (optional, requires network) + +Output: providers.json for the Astro static site generator + +Module discovery (modules.json) is handled by extract_parameters.py, which uses +runtime inspection inside breeze for accurate class discovery. +""" + +from __future__ import annotations + +import datetime +import json +import re +import shutil +import urllib.request +import zlib +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Any + +import tomllib +import yaml + + +def fetch_pypi_downloads(package_name: str) -> dict[str, int]: + """Fetch download statistics from pypistats.org API.""" + try: + url = f"https://pypistats.org/api/packages/{package_name}/recent" + with urllib.request.urlopen(url, timeout=5) as response: + data = json.loads(response.read().decode()) + return { + "weekly": data["data"].get("last_week", 0), + "monthly": data["data"].get("last_month", 0), + "total": 0, # Total not available in recent endpoint + } + except Exception as e: + print(f" Warning: Could not fetch PyPI stats for {package_name}: {e}") + return {"weekly": 0, "monthly": 0, "total": 0} + + +def fetch_pypi_dates(package_name: str) -> dict[str, str]: + """Fetch first release and latest release dates from PyPI JSON API.""" + try: + url = f"https://pypi.org/pypi/{package_name}/json" + with urllib.request.urlopen(url, timeout=10) as response: + data = json.loads(response.read().decode()) + + earliest = None + latest = None + for version_files in data.get("releases", {}).values(): + for file_info in version_files: + upload = file_info.get("upload_time_iso_8601") or file_info.get("upload_time") + if not upload: + continue + ts = upload[:10] + if earliest is None or ts < earliest: + earliest = ts + if latest is None or ts > latest: + latest = ts + + return { + "first_released": earliest or "", + "last_updated": latest or "", + } + except Exception as e: + print(f" Warning: Could not fetch PyPI dates for {package_name}: {e}") + return {"first_released": "", "last_updated": ""} + + +def read_inventory(inv_path: Path) -> dict[str, str]: + """Parse a Sphinx objects.inv file and return {qualified_name: url_path} for py:class entries.""" + with inv_path.open("rb") as f: + # Skip the 4 header lines + for _ in range(4): + f.readline() + data = zlib.decompress(f.read()).decode("utf-8").splitlines() + result: dict[str, str] = {} + for line in data: + parts = line.split(None, 4) + if len(parts) != 5: + continue + name, domain_role, _prio, location, _dispname = parts + if domain_role == "py:class": + # "$" in location means "use the name as anchor" + result[name] = location.replace("$", name) + return result + + +S3_DOC_URL = "http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com" +INVENTORY_CACHE_DIR = Path(__file__).parent / ".inventory_cache" +INVENTORY_TTL = datetime.timedelta(hours=12) + + +def fetch_provider_inventory(package_name: str, cache_dir: Path = INVENTORY_CACHE_DIR) -> Path | None: + """Download a provider's objects.inv from S3, caching locally with a 12-hour TTL. + + Returns the local cache path on success, or None if the fetch fails + (e.g. provider not yet published). + """ + cache_path = cache_dir / package_name / "objects.inv" + if cache_path.exists(): + age = datetime.datetime.now(tz=datetime.timezone.utc) - datetime.datetime.fromtimestamp( + cache_path.stat().st_mtime, tz=datetime.timezone.utc + ) + if age < INVENTORY_TTL: + return cache_path + + url = f"{S3_DOC_URL}/docs/{package_name}/stable/objects.inv" + try: + with urllib.request.urlopen(url, timeout=10) as response: + content = response.read() + # Validate it's a Sphinx inventory + if not content.startswith(b"# Sphinx inventory version"): + print(f" Warning: Invalid inventory header for {package_name}") + return None + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_bytes(content) + return cache_path + except Exception as e: + print(f" Warning: Could not fetch inventory for {package_name}: {e}") + # On refetch failure, serve stale cache rather than nothing + if cache_path.exists(): + print(f" Using stale cache for {package_name}") + return cache_path + return None + + +# Base paths +AIRFLOW_ROOT = Path(__file__).parent.parent.parent +SCRIPT_DIR = Path(__file__).parent +PROVIDERS_DIR = AIRFLOW_ROOT / "providers" +REGISTRY_DIR = AIRFLOW_ROOT / "registry" +OUTPUT_DIR = REGISTRY_DIR / "src" / "_data" Review Comment: Perhaps we could consolidate these config (as well as all the config for AWS resources) in `config` or `global_constant` module for better maintenance. ########## dev/registry/extract_metadata.py: ########## @@ -0,0 +1,604 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Airflow Registry Metadata Extractor + +Extracts provider metadata from: +1. provider.yaml files - Rich metadata (integrations, logos, categories) +2. pyproject.toml files - Dependencies, Python version constraints +3. PyPI API - Download statistics and release dates (optional, requires network) + +Output: providers.json for the Astro static site generator + +Module discovery (modules.json) is handled by extract_parameters.py, which uses +runtime inspection inside breeze for accurate class discovery. +""" + +from __future__ import annotations + +import datetime +import json +import re +import shutil +import urllib.request +import zlib +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Any + +import tomllib +import yaml + + +def fetch_pypi_downloads(package_name: str) -> dict[str, int]: + """Fetch download statistics from pypistats.org API.""" + try: + url = f"https://pypistats.org/api/packages/{package_name}/recent" + with urllib.request.urlopen(url, timeout=5) as response: + data = json.loads(response.read().decode()) + return { + "weekly": data["data"].get("last_week", 0), + "monthly": data["data"].get("last_month", 0), + "total": 0, # Total not available in recent endpoint + } + except Exception as e: + print(f" Warning: Could not fetch PyPI stats for {package_name}: {e}") + return {"weekly": 0, "monthly": 0, "total": 0} + + +def fetch_pypi_dates(package_name: str) -> dict[str, str]: + """Fetch first release and latest release dates from PyPI JSON API.""" + try: + url = f"https://pypi.org/pypi/{package_name}/json" + with urllib.request.urlopen(url, timeout=10) as response: + data = json.loads(response.read().decode()) + + earliest = None + latest = None + for version_files in data.get("releases", {}).values(): + for file_info in version_files: + upload = file_info.get("upload_time_iso_8601") or file_info.get("upload_time") + if not upload: + continue + ts = upload[:10] + if earliest is None or ts < earliest: + earliest = ts + if latest is None or ts > latest: + latest = ts + + return { + "first_released": earliest or "", + "last_updated": latest or "", + } + except Exception as e: + print(f" Warning: Could not fetch PyPI dates for {package_name}: {e}") + return {"first_released": "", "last_updated": ""} + + +def read_inventory(inv_path: Path) -> dict[str, str]: + """Parse a Sphinx objects.inv file and return {qualified_name: url_path} for py:class entries.""" + with inv_path.open("rb") as f: + # Skip the 4 header lines + for _ in range(4): + f.readline() + data = zlib.decompress(f.read()).decode("utf-8").splitlines() + result: dict[str, str] = {} + for line in data: + parts = line.split(None, 4) + if len(parts) != 5: + continue + name, domain_role, _prio, location, _dispname = parts + if domain_role == "py:class": + # "$" in location means "use the name as anchor" + result[name] = location.replace("$", name) + return result + + +S3_DOC_URL = "http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com" +INVENTORY_CACHE_DIR = Path(__file__).parent / ".inventory_cache" +INVENTORY_TTL = datetime.timedelta(hours=12) + + +def fetch_provider_inventory(package_name: str, cache_dir: Path = INVENTORY_CACHE_DIR) -> Path | None: + """Download a provider's objects.inv from S3, caching locally with a 12-hour TTL. + + Returns the local cache path on success, or None if the fetch fails + (e.g. provider not yet published). + """ + cache_path = cache_dir / package_name / "objects.inv" + if cache_path.exists(): + age = datetime.datetime.now(tz=datetime.timezone.utc) - datetime.datetime.fromtimestamp( + cache_path.stat().st_mtime, tz=datetime.timezone.utc + ) + if age < INVENTORY_TTL: + return cache_path + + url = f"{S3_DOC_URL}/docs/{package_name}/stable/objects.inv" + try: + with urllib.request.urlopen(url, timeout=10) as response: + content = response.read() + # Validate it's a Sphinx inventory + if not content.startswith(b"# Sphinx inventory version"): + print(f" Warning: Invalid inventory header for {package_name}") + return None + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_bytes(content) + return cache_path + except Exception as e: + print(f" Warning: Could not fetch inventory for {package_name}: {e}") + # On refetch failure, serve stale cache rather than nothing + if cache_path.exists(): + print(f" Using stale cache for {package_name}") + return cache_path + return None + + +# Base paths +AIRFLOW_ROOT = Path(__file__).parent.parent.parent +SCRIPT_DIR = Path(__file__).parent +PROVIDERS_DIR = AIRFLOW_ROOT / "providers" +REGISTRY_DIR = AIRFLOW_ROOT / "registry" +OUTPUT_DIR = REGISTRY_DIR / "src" / "_data" + + +@dataclass +class Category: + """Category within a provider.""" + + id: str + name: str + module_count: int = 0 + + +@dataclass +class Provider: + """Provider metadata.""" + + id: str + name: str + package_name: str + description: str + lifecycle: str = "production" # AIP-95: incubation, production, mature, deprecated + logo: str | None = None + version: str = "" + versions: list[str] = field(default_factory=list) + airflow_versions: list[str] = field(default_factory=list) + pypi_downloads: dict[str, int] = field(default_factory=lambda: {"weekly": 0, "monthly": 0, "total": 0}) + module_counts: dict[str, int] = field( + default_factory=lambda: { + "operator": 0, + "hook": 0, + "sensor": 0, + "trigger": 0, + "transfer": 0, + "notifier": 0, + "secret": 0, + "logging": 0, + "executor": 0, + "bundle": 0, + "decorator": 0, + } + ) + categories: list[dict] = field(default_factory=list) + connection_types: list[dict] = field(default_factory=list) # {conn_type, hook_class, docs_url} + requires_python: str = "" # e.g., ">=3.10" + dependencies: list[str] = field(default_factory=list) # from pyproject.toml + optional_extras: dict[str, list[str]] = field(default_factory=dict) # {extra_name: [deps]} + dependents: list[str] = field(default_factory=list) + related_providers: list[str] = field(default_factory=list) + docs_url: str = "" + source_url: str = "" + pypi_url: str = "" + first_released: str = "" + last_updated: str = "" + + +def parse_provider_yaml(yaml_path: Path) -> dict[str, Any]: + """Parse a provider.yaml file.""" + with open(yaml_path) as f: + return yaml.safe_load(f) + + +def parse_pyproject_toml(pyproject_path: Path) -> dict[str, Any]: + """Parse pyproject.toml and extract requires-python, dependencies, and optional extras.""" + result: dict[str, Any] = {"requires_python": "", "dependencies": [], "optional_extras": {}} + + if not pyproject_path.exists(): + return result + + try: + with open(pyproject_path, "rb") as f: + data = tomllib.load(f) + + project = data.get("project", {}) + + result["requires_python"] = project.get("requires-python", "") + result["dependencies"] = [d.strip() for d in project.get("dependencies", [])][:20] + + optional_deps = project.get("optional-dependencies", {}) + for extra_name, extra_deps in optional_deps.items(): + clean = [d.strip() for d in extra_deps if d.strip()] + if clean: + result["optional_extras"][extra_name] = clean[:5] + + except Exception as e: + print(f" Warning: Could not parse {pyproject_path}: {e}") + + return result + + +def extract_integrations_as_categories(provider_yaml: dict[str, Any]) -> list[Category]: + """Extract integrations from provider.yaml as categories.""" + categories: dict[str, Category] = {} + + for integration in provider_yaml.get("integrations", []): + name = integration.get("integration-name", "") + if not name: + continue + + # Create a slug for the category ID + cat_id = name.lower().replace(" ", "-").replace("(", "").replace(")", "") + cat_id = re.sub(r"[^a-z0-9-]", "", cat_id) + + if cat_id not in categories: + categories[cat_id] = Category(id=cat_id, name=name, module_count=0) + + return list(categories.values()) + + +def count_modules_by_type(provider_yaml: dict[str, Any]) -> dict[str, int]: + """Count modules by type from provider.yaml.""" + counts = { + "operator": 0, + "hook": 0, + "sensor": 0, + "trigger": 0, + "transfer": 0, + "notifier": 0, + "secret": 0, + "logging": 0, + "executor": 0, + "bundle": 0, + "decorator": 0, + } + Review Comment: I wonder do we need to show Auth Manager, Queue, CLI and all the rest of the "feature" we have supported? Since I search FAB and Keycloak on stage site, it show zero module available for them. ########## .github/workflows/registry-build.yml: ########## @@ -0,0 +1,232 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: Build & Publish Registry +on: # yamllint disable-line rule:truthy + workflow_dispatch: + inputs: + destination: + description: "Publish to live or staging S3 bucket" + required: true + type: choice + options: + - staging + - live + default: staging + provider: + description: "Provider ID(s) for incremental build (space-separated, empty = full build)" + required: false + type: string + default: "" + workflow_call: + inputs: + destination: + description: "Publish to live or staging S3 bucket" + required: false + type: string + default: staging + provider: + description: "Provider ID(s) for incremental build (space-separated, empty = full build)" + required: false + type: string + default: "" + secrets: + DOCS_AWS_ACCESS_KEY_ID: + required: true + DOCS_AWS_SECRET_ACCESS_KEY: + required: true + +permissions: + contents: read + +jobs: + build-and-publish-registry: + timeout-minutes: 30 + name: "Build & Publish Registry" + runs-on: ubuntu-latest + permissions: + contents: read + id-token: write + if: > + github.event_name == 'workflow_call' || + contains(fromJSON('[ + "ashb", + "bugraoz93", + "eladkal", + "ephraimbuddy", + "jedcunningham", + "jscheffl", + "kaxil", + "pierrejeambrun", + "shahar1", + "potiuk", + "utkarsharma2", + "vincbeck" + ]'), github.event.sender.login) + steps: + - name: "Checkout repository" + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + persist-credentials: false + + # --- Breeze setup --- + # All three extraction scripts run inside breeze so that + # extract_parameters.py and extract_connections.py can import provider + # classes at runtime. extract_metadata.py also runs in breeze for + # consistency — it writes to dev/registry/ (mounted) so the other two + # scripts can read providers.json / modules.json from there. + - name: "Install Breeze" + uses: ./.github/actions/breeze + with: + python-version: "3.12" + + - name: "Build CI image" + # Fallback to raw docker buildx when breeze cache is stale — same + # pattern as publish-docs-to-s3.yml. + run: > + breeze ci-image build --python 3.12 || + docker buildx build --load --builder default --progress=auto --pull + --build-arg AIRFLOW_EXTRAS=devel-ci --build-arg AIRFLOW_PRE_CACHED_PIP_PACKAGES=false + --build-arg AIRFLOW_USE_UV=true --build-arg BUILD_PROGRESS=auto + --build-arg INSTALL_MYSQL_CLIENT_TYPE=mariadb + --build-arg VERSION_SUFFIX_FOR_PYPI=dev0 + -t ghcr.io/apache/airflow/main/ci/python3.12:latest --target main . + -f Dockerfile.ci --platform linux/amd64 + + - name: "Install AWS CLI v2" + run: | + curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o /tmp/awscliv2.zip + unzip -q /tmp/awscliv2.zip -d /tmp + rm /tmp/awscliv2.zip + sudo /tmp/aws/install --update + rm -rf /tmp/aws/ + + - name: "Configure AWS credentials" + uses: aws-actions/configure-aws-credentials@010d0da01d0b5a38af31e9c3470dbfdabdecca3a # v4.0.1 + with: + aws-access-key-id: ${{ secrets.DOCS_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.DOCS_AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-2 Review Comment: Non-blocking nit: The `Build CI image`, `Install AWS CLI v2` and `Configure AWS credentials` looks similar or looks exact same in `.github/workflows/publish-docs-to-s3.yml`. Maybe we could define reusable `actions` to ensure the consistency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
