potiuk commented on code in PR #28516: URL: https://github.com/apache/airflow/pull/28516#discussion_r1054654657
########## scripts/in_container/run_provider_yaml_files_check.py: ########## @@ -0,0 +1,482 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import importlib +import inspect +import json +import os +import pathlib +import platform +import sys +import textwrap +from collections import Counter +from enum import Enum +from itertools import chain, product +from typing import Any, Iterable + +import jsonschema +import yaml +from jsonpath_ng.ext import parse +from rich.console import Console +from tabulate import tabulate + +from airflow.cli.commands.info_command import Architecture + +try: + from yaml import CSafeLoader as SafeLoader +except ImportError: + from yaml import SafeLoader # type: ignore + +if __name__ != "__main__": + raise Exception( + "This file is intended to be executed as an executable program. You cannot use it as a module." + ) + +ROOT_DIR = pathlib.Path(__file__).resolve().parents[2] +DOCS_DIR = ROOT_DIR.joinpath("docs") +PROVIDER_DATA_SCHEMA_PATH = ROOT_DIR.joinpath("airflow", "provider.yaml.schema.json") +PROVIDER_ISSUE_TEMPLATE_PATH = ROOT_DIR.joinpath( + ".github", "ISSUE_TEMPLATE", "airflow_providers_bug_report.yml" +) +CORE_INTEGRATIONS = ["SQL", "Local"] + +errors = [] + +console = Console(width=400, color_system="standard") + + +def _filepath_to_module(filepath: pathlib.Path) -> str: + p = filepath.resolve().relative_to(ROOT_DIR).as_posix() + if p.endswith(".py"): + p = p[:-3] + return p.replace("/", ".") + + +def _load_schema() -> dict[str, Any]: + with PROVIDER_DATA_SCHEMA_PATH.open() as schema_file: + content = json.load(schema_file) + return content + + +def _load_package_data(package_paths: Iterable[str]): + schema = _load_schema() + result = {} + for provider_yaml_path in package_paths: + with open(provider_yaml_path) as yaml_file: + provider = yaml.load(yaml_file, SafeLoader) + rel_path = pathlib.Path(provider_yaml_path).relative_to(ROOT_DIR).as_posix() + try: + jsonschema.validate(provider, schema=schema) + except jsonschema.ValidationError: + raise Exception(f"Unable to parse: {rel_path}.") + result[rel_path] = provider + return result + + +def get_all_integration_names(yaml_files) -> list[str]: + all_integrations = [ + i["integration-name"] for f in yaml_files.values() if "integrations" in f for i in f["integrations"] + ] + all_integrations += ["Local"] + return all_integrations + + +def check_integration_duplicates(yaml_files: dict[str, dict]): + """Integration names must be globally unique.""" + print("Checking integration duplicates") + all_integrations = get_all_integration_names(yaml_files) + + duplicates = [(k, v) for (k, v) in Counter(all_integrations).items() if v > 1] + + if duplicates: + print( + "Duplicate integration names found. Integration names must be globally unique. " + "Please delete duplicates." + ) + print(tabulate(duplicates, headers=["Integration name", "Number of occurrences"])) + sys.exit(3) + + +def assert_sets_equal(set1, set2): + try: + difference1 = set1.difference(set2) + except TypeError as e: + raise AssertionError(f"invalid type when attempting set difference: {e}") + except AttributeError as e: + raise AssertionError(f"first argument does not support set difference: {e}") + + try: + difference2 = set2.difference(set1) + except TypeError as e: + raise AssertionError(f"invalid type when attempting set difference: {e}") + except AttributeError as e: + raise AssertionError(f"second argument does not support set difference: {e}") + + if not (difference1 or difference2): + return + + lines = [] + if difference1: + lines.append(" -- Items in the left set but not the right:") + for item in sorted(difference1): + lines.append(f" {item!r}") + if difference2: + lines.append(" -- Items in the right set but not the left:") + for item in sorted(difference2): + lines.append(f" {item!r}") + + standard_msg = "\n".join(lines) + raise AssertionError(standard_msg) + + +class ObjectType(Enum): + MODULE = "module" + CLASS = "class" + + +def check_if_object_exist(object_name: str, resource_type: str, yaml_file_path: str, object_type: ObjectType): + try: + if object_type == ObjectType.CLASS: + module_name, object_name = object_name.rsplit(".", maxsplit=1) + the_class = getattr(importlib.import_module(module_name), object_name) + if the_class and inspect.isclass(the_class): + return + elif object_type == ObjectType.MODULE: + module = importlib.import_module(object_name) + if inspect.ismodule(module): + return + else: + raise RuntimeError(f"Wrong enum {object_type}???") + except Exception as e: + if architecture == Architecture.ARM: + if "pymssql" in str(e) or "MySQLdb" in str(e): + console.print( + f"[yellow]The imports fail on ARM: {object_name} in {resource_type} {e}, " + f"but it is expected.[/]" + ) + return + errors.append( + f"The `{object_name}` object in {resource_type} list in {yaml_file_path} does not exist " + f"or is not a class: {e}" + ) + else: + errors.append( + f"The `{object_name}` object in {resource_type} list in {yaml_file_path} does not exist " + f"or is not a {object_type.value}." + ) + + +def check_if_objects_exist_and_belong_to_package( + object_names: set[str], + provider_package: str, + yaml_file_path: str, + resource_type: str, + object_type: ObjectType, +): + for object_name in object_names: + if os.environ.get("VERBOSE"): + console.print( + f"[bright_blue]Checking if {object_name} of {resource_type} " + f"in {yaml_file_path} is {object_type.value} and belongs to {provider_package} package" + ) + if not object_name.startswith(provider_package): + errors.append( + f"The `{object_name}` object in {resource_type} list in {yaml_file_path} does not start" + f" with the expected {provider_package}." + ) + check_if_object_exist(object_name, resource_type, yaml_file_path, object_type) + + +def parse_module_data(provider_data, resource_type, yaml_file_path): + package_dir = ROOT_DIR.joinpath(yaml_file_path).parent + provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".") + py_files = chain( + package_dir.glob(f"**/{resource_type}/*.py"), + package_dir.glob(f"{resource_type}/*.py"), + package_dir.glob(f"**/{resource_type}/**/*.py"), + package_dir.glob(f"{resource_type}/**/*.py"), + ) + expected_modules = {_filepath_to_module(f) for f in py_files if f.name != "__init__.py"} + resource_data = provider_data.get(resource_type, []) + return expected_modules, provider_package, resource_data + + +def check_correctness_of_list_of_sensors_operators_hook_modules(yaml_files: dict[str, dict]): + print("Checking completeness of list of {sensors, hooks, operators}") + print(" -- {sensors, hooks, operators} - Expected modules (left) : Current modules (right)") + for (yaml_file_path, provider_data), resource_type in product( + yaml_files.items(), ["sensors", "operators", "hooks"] + ): + expected_modules, provider_package, resource_data = parse_module_data( + provider_data, resource_type, yaml_file_path + ) + + current_modules = {str(i) for r in resource_data for i in r.get("python-modules", [])} + check_if_objects_exist_and_belong_to_package( + current_modules, provider_package, yaml_file_path, resource_type, ObjectType.MODULE + ) + try: + assert_sets_equal(set(expected_modules), set(current_modules)) + except AssertionError as ex: + nested_error = textwrap.indent(str(ex), " ") + errors.append( + f"Incorrect content of key '{resource_type}/python-modules' " + f"in file: {yaml_file_path}\n{nested_error}" + ) + + +def check_duplicates_in_integrations_names_of_hooks_sensors_operators(yaml_files: dict[str, dict]): + print("Checking for duplicates in list of {sensors, hooks, operators}") + for (yaml_file_path, provider_data), resource_type in product( + yaml_files.items(), ["sensors", "operators", "hooks"] + ): + resource_data = provider_data.get(resource_type, []) + current_integrations = [r.get("integration-name", "") for r in resource_data] + if len(current_integrations) != len(set(current_integrations)): + for integration in current_integrations: + if current_integrations.count(integration) > 1: + errors.append( + f"Duplicated content of '{resource_type}/integration-name/{integration}' " + f"in file: {yaml_file_path}" + ) + + +def check_completeness_of_list_of_transfers(yaml_files: dict[str, dict]): + print("Checking completeness of list of transfers") + resource_type = "transfers" + + print(" -- Expected transfers modules(Left): Current transfers Modules(Right)") + for yaml_file_path, provider_data in yaml_files.items(): + expected_modules, provider_package, resource_data = parse_module_data( + provider_data, resource_type, yaml_file_path + ) + + current_modules = {r.get("python-module") for r in resource_data} + check_if_objects_exist_and_belong_to_package( + current_modules, provider_package, yaml_file_path, resource_type, ObjectType.MODULE + ) + try: + assert_sets_equal(set(expected_modules), set(current_modules)) + except AssertionError as ex: + nested_error = textwrap.indent(str(ex), " ") + errors.append( + f"Incorrect content of key '{resource_type}/python-module' " + f"in file: {yaml_file_path}\n{nested_error}" + ) + + +def check_hook_classes(yaml_files: dict[str, dict]): + print("Checking connection classes belong to package, exist and are classes") + resource_type = "hook-class-names" + for yaml_file_path, provider_data in yaml_files.items(): + provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".") + hook_class_names = provider_data.get(resource_type) + if hook_class_names: + check_if_objects_exist_and_belong_to_package( + hook_class_names, provider_package, yaml_file_path, resource_type, ObjectType.CLASS + ) + + +def check_extra_link_classes(yaml_files: dict[str, dict]): + print("Checking extra-links belong to package, exist and are classes") + resource_type = "extra-links" + for yaml_file_path, provider_data in yaml_files.items(): + provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".") + extra_links = provider_data.get(resource_type) + if extra_links: + check_if_objects_exist_and_belong_to_package( + extra_links, provider_package, yaml_file_path, resource_type, ObjectType.CLASS + ) + + +def check_duplicates_in_list_of_transfers(yaml_files: dict[str, dict]): + print("Checking for duplicates in list of transfers") + errors = [] + resource_type = "transfers" + for yaml_file_path, provider_data in yaml_files.items(): + resource_data = provider_data.get(resource_type, []) + + source_target_integrations = [ + (r.get("source-integration-name", ""), r.get("target-integration-name", "")) + for r in resource_data + ] + if len(source_target_integrations) != len(set(source_target_integrations)): + for integration_couple in source_target_integrations: + if source_target_integrations.count(integration_couple) > 1: + errors.append( + f"Duplicated content of \n" + f" '{resource_type}/source-integration-name/{integration_couple[0]}' " + f" '{resource_type}/target-integration-name/{integration_couple[1]}' " + f"in file: {yaml_file_path}" + ) + + +def check_invalid_integration(yaml_files: dict[str, dict]): + print("Detect unregistered integrations") + all_integration_names = set(get_all_integration_names(yaml_files)) + + for (yaml_file_path, provider_data), resource_type in product( + yaml_files.items(), ["sensors", "operators", "hooks"] + ): + resource_data = provider_data.get(resource_type, []) + current_names = {r["integration-name"] for r in resource_data} + invalid_names = current_names - all_integration_names + if invalid_names: + errors.append( + f"Incorrect content of key '{resource_type}/integration-name' in file: {yaml_file_path}. " + f"Invalid values: {invalid_names}" + ) + + for (yaml_file_path, provider_data), key in product( + yaml_files.items(), ["source-integration-name", "target-integration-name"] + ): + resource_data = provider_data.get("transfers", []) + current_names = {r[key] for r in resource_data} + invalid_names = current_names - all_integration_names + if invalid_names: + errors.append( + f"Incorrect content of key 'transfers/{key}' in file: {yaml_file_path}. " + f"Invalid values: {invalid_names}" + ) + + +def check_doc_files(yaml_files: dict[str, dict]): + print("Checking doc files") + current_doc_urls: list[str] = [] + current_logo_urls: list[str] = [] + for provider in yaml_files.values(): + if "integrations" in provider: + current_doc_urls.extend( + guide + for guides in provider["integrations"] + if "how-to-guide" in guides + for guide in guides["how-to-guide"] + ) + current_logo_urls.extend( + integration["logo"] for integration in provider["integrations"] if "logo" in integration + ) + if "transfers" in provider: + current_doc_urls.extend( + op["how-to-guide"] for op in provider["transfers"] if "how-to-guide" in op + ) + + expected_doc_urls = { + f"/docs/{f.relative_to(DOCS_DIR).as_posix()}" + for f in DOCS_DIR.glob("apache-airflow-providers-*/operators/**/*.rst") + if f.name != "index.rst" and "_partials" not in f.parts + } | { + f"/docs/{f.relative_to(DOCS_DIR).as_posix()}" + for f in DOCS_DIR.glob("apache-airflow-providers-*/operators.rst") + } + expected_logo_urls = { + f"/{f.relative_to(DOCS_DIR).as_posix()}" + for f in DOCS_DIR.glob("integration-logos/**/*") + if f.is_file() + } + + try: + print(" -- Checking document urls: expected (left), current (right)") + assert_sets_equal(set(expected_doc_urls), set(current_doc_urls)) + + print(" -- Checking logo urls: expected (left), current (right)") + assert_sets_equal(set(expected_logo_urls), set(current_logo_urls)) + except AssertionError as ex: + print(ex) + sys.exit(1) + + +def check_unique_provider_name(yaml_files: dict[str, dict]): + provider_names = [d["name"] for d in yaml_files.values()] + duplicates = {x for x in provider_names if provider_names.count(x) > 1} + if duplicates: + errors.append(f"Provider name must be unique. Duplicates: {duplicates}") + + +def check_providers_are_mentioned_in_issue_template(yaml_files: dict[str, dict]): + prefix_len = len("apache-airflow-providers-") + short_provider_names = [d["package-name"][prefix_len:] for d in yaml_files.values()] + # exclude deprecated provider that shouldn't be in issue template + deprecated_providers: list[str] = [] + for item in deprecated_providers: + short_provider_names.remove(item) + jsonpath_expr = parse('$.body[?(@.attributes.label == "Apache Airflow Provider(s)")]..options[*]') + with PROVIDER_ISSUE_TEMPLATE_PATH.open() as issue_file: + issue_template = yaml.safe_load(issue_file) + all_mentioned_providers = [match.value for match in jsonpath_expr.find(issue_template)] + try: + print( + f" -- Checking providers: present in code (left), " + f"mentioned in {PROVIDER_ISSUE_TEMPLATE_PATH} (right)" + ) + assert_sets_equal(set(short_provider_names), set(all_mentioned_providers)) + except AssertionError as ex: + print(ex) + sys.exit(1) + + +def check_providers_have_all_documentation_files(yaml_files: dict[str, dict]): + expected_files = ["commits.rst", "index.rst", "installing-providers-from-sources.rst"] + for package_info in yaml_files.values(): + package_name = package_info["package-name"] + provider_dir = DOCS_DIR.joinpath(package_name) + for file in expected_files: + if not provider_dir.joinpath(file).is_file(): + errors.append( + f"The provider {package_name} misses `{file}` in documentation. " + f"Please add the file to {provider_dir}" + ) + + +if __name__ == "__main__": + architecture = Architecture().get_current() + console.print(f"Verifying packages on {architecture} architecture. Platform: {platform.machine()}.") + provider_files_pattern = pathlib.Path(ROOT_DIR).glob("airflow/providers/**/provider.yaml") + all_provider_files = sorted(str(path) for path in provider_files_pattern) + + if len(sys.argv) > 1: + paths = sorted(sys.argv[1:]) + else: + paths = all_provider_files + + all_parsed_yaml_files: dict[str, dict] = _load_package_data(paths) + + all_files_loaded = len(all_provider_files) == len(paths) + check_integration_duplicates(all_parsed_yaml_files) + + check_duplicates_in_integrations_names_of_hooks_sensors_operators(all_parsed_yaml_files) + + check_completeness_of_list_of_transfers(all_parsed_yaml_files) + check_duplicates_in_list_of_transfers(all_parsed_yaml_files) + check_hook_classes(all_parsed_yaml_files) + check_extra_link_classes(all_parsed_yaml_files) + check_correctness_of_list_of_sensors_operators_hook_modules(all_parsed_yaml_files) + check_unique_provider_name(all_parsed_yaml_files) + check_providers_are_mentioned_in_issue_template(all_parsed_yaml_files) + check_providers_have_all_documentation_files(all_parsed_yaml_files) Review Comment: I think those tests are a bit different - they are more for correctness of "provider.yam" - i.e. what is exposed to outside world, where the tests in #28469 are more about naming and structure of the provider packages (though I agree they should be trimmed a bit). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
