This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 03718194f4 Tests for provider code structure (#23351)
03718194f4 is described below
commit 03718194f4fa509f16fcaf3d41ff186dbae5d427
Author: Bartłomiej Hirsz <[email protected]>
AuthorDate: Sun May 8 21:32:26 2022 +0200
Tests for provider code structure (#23351)
Improved test for code structure that can be re-used among various
providders.
---
tests/always/test_project_structure.py | 351 +++++++++++++++++++++++----------
1 file changed, 248 insertions(+), 103 deletions(-)
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index 546c4920ce..83430f26cb 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -20,9 +20,9 @@ import itertools
import mmap
import os
import unittest
-from typing import List
+from typing import Dict, Set
-from parameterized import parameterized
+import pytest
ROOT_FOLDER = os.path.realpath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir,
os.pardir)
@@ -119,14 +119,14 @@ def get_imports_from_file(filepath: str):
with open(filepath) as py_file:
content = py_file.read()
doc_node = ast.parse(content, filepath)
- import_names: List[str] = []
+ import_names: Set[str] = set()
for current_node in ast.walk(doc_node):
if not isinstance(current_node, (ast.Import, ast.ImportFrom)):
continue
for alias in current_node.names:
name = alias.name
fullname = f'{current_node.module}.{name}' if
isinstance(current_node, ast.ImportFrom) else name
- import_names.append(fullname)
+ import_names.add(fullname)
return import_names
@@ -135,34 +135,143 @@ def filepath_to_module(filepath: str):
return filepath.replace("/", ".")[: -(len('.py'))]
-def get_classes_from_file(filepath: str):
- with open(filepath) as py_file:
- content = py_file.read()
- doc_node = ast.parse(content, filepath)
- module = filepath_to_module(filepath)
- results: List[str] = []
- for current_node in ast.walk(doc_node):
- if not isinstance(current_node, ast.ClassDef):
- continue
- name = current_node.name
- if not name.endswith("Operator") and not name.endswith("Sensor") and
not name.endswith("Operator"):
- continue
- results.append(f"{module}.{name}")
- return results
-
-
-class TestGoogleProviderProjectStructure(unittest.TestCase):
- MISSING_EXAMPLE_DAGS = {
- 'adls_to_gcs',
- 'sql_to_gcs',
- 'bigquery_to_mysql',
- 'cassandra_to_gcs',
- 'drive',
- 'ads_to_gcs',
- }
+def print_sorted(container: Set, indent: str = " ") -> None:
+ sorted_container = sorted(container)
+ print(f"{indent}" + f"\n{indent}".join(sorted_container))
+
+
+class ProjectStructureTest:
+ PROVIDER = "blank"
+ CLASS_DIRS = {"operators", "sensors", "transfers"}
+ CLASS_SUFFIXES = ["Operator", "Sensor"]
- # Those operators are deprecated and we do not need examples for them
- DEPRECATED_OPERATORS = {
+ def class_paths(self):
+ """Override this method if your classes are located under different
paths"""
+ for resource_type in self.CLASS_DIRS:
+ python_files = glob.glob(
+
f"{ROOT_FOLDER}/airflow/providers/{self.PROVIDER}/**/{resource_type}/**.py",
recursive=True
+ )
+ # Make path relative
+ resource_files = (os.path.relpath(f, ROOT_FOLDER) for f in
python_files)
+ resource_files = (f for f in resource_files if not
f.endswith("__init__.py"))
+ yield from resource_files
+
+ def list_of_classes(self):
+ classes = {}
+ for operator_file in self.class_paths():
+ operators_paths =
self.get_classes_from_file(f"{ROOT_FOLDER}/{operator_file}")
+ classes.update(operators_paths)
+ return classes
+
+ def get_classes_from_file(self, filepath: str):
+ with open(filepath) as py_file:
+ content = py_file.read()
+ doc_node = ast.parse(content, filepath)
+ module = filepath_to_module(filepath)
+ results: Dict = {}
+ for current_node in ast.walk(doc_node):
+ if not isinstance(current_node, ast.ClassDef):
+ continue
+ name = current_node.name
+ if not any(name.endswith(suffix) for suffix in
self.CLASS_SUFFIXES):
+ continue
+ results[f"{module}.{name}"] = current_node
+ return results
+
+
+class ExampleCoverageTest(ProjectStructureTest):
+ """Checks that every operator is covered by example"""
+
+ # Those operators are deprecated, so we do not need examples for them
+ DEPRECATED_CLASSES: Set = set()
+
+ # Those operators should not have examples as they are never used
standalone (they are abstract)
+ BASE_CLASSES: Set = set()
+
+ # Please add the examples to those operators at the earliest convenience :)
+ MISSING_EXAMPLES_FOR_CLASSES: Set = set()
+
+ def example_paths(self):
+ """Override this method if your example dags are located elsewhere"""
+ # old_design:
+ yield from glob.glob(
+
f"{ROOT_FOLDER}/airflow/providers/{self.PROVIDER}/**/example_dags/example_*.py",
recursive=True
+ )
+ # new_design:
+ yield from glob.glob(
+
f"{ROOT_FOLDER}/tests/system/providers/{self.PROVIDER}/**/example_*.py",
recursive=True
+ )
+
+ def test_missing_examples(self):
+ """
+ Assert that all operators defined under operators, sensors and
transfers directories
+ are used in any of the example dags
+ """
+ classes = self.list_of_classes()
+ assert 0 != len(classes), "Failed to retrieve operators, override
class_paths if needed"
+ classes = set(classes.keys())
+ for example in self.example_paths():
+ classes -= get_imports_from_file(example)
+
+ covered_but_omitted = self.MISSING_EXAMPLES_FOR_CLASSES - classes
+ classes -= self.MISSING_EXAMPLES_FOR_CLASSES
+ classes -= self.DEPRECATED_CLASSES
+ classes -= self.BASE_CLASSES
+ if set() != classes:
+ print("Classes with missing examples:")
+ print_sorted(classes)
+ pytest.fail(
+ "Not all classes are covered with example dags. Update
self.MISSING_EXAMPLES_FOR_CLASSES "
+ "if you want to skip this error"
+ )
+ if set() != covered_but_omitted:
+ print("Covered classes that are listed as missing:")
+ print_sorted(covered_but_omitted)
+ pytest.fail("Operator listed in missing examples but is used in
example dag")
+
+
+class AssetsCoverageTest(ProjectStructureTest):
+ """Checks that every operator have operator_extra_links attribute"""
+
+ # These operators should not have assets
+ ASSETS_NOT_REQUIRED: Set = set()
+
+ # Please add assets to following classes
+ MISSING_ASSETS_FOR_CLASSES: Set = set()
+
+ def test_missing_assets(self):
+ classes = self.list_of_classes()
+ assets, no_assets = set(), set()
+ for name, operator in classes.items():
+ for attr in operator.body:
+ if (
+ isinstance(attr, ast.Assign)
+ and attr.targets
+ and getattr(attr.targets[0], "id", "") ==
"operator_extra_links"
+ ):
+ assets.add(name)
+ break
+ else:
+ no_assets.add(name)
+
+ asset_should_be_missing = self.ASSETS_NOT_REQUIRED - no_assets
+ no_assets -= self.ASSETS_NOT_REQUIRED
+ no_assets -= self.MISSING_ASSETS_FOR_CLASSES
+ if set() != no_assets:
+ print("Classes with missing assets:")
+ print_sorted(no_assets)
+ pytest.fail("Some classes are missing assets")
+ if set() != asset_should_be_missing:
+ print("Classes that should not have assets:")
+ print_sorted(asset_should_be_missing)
+ pytest.fail("Class should not have assets")
+
+
+class TestGoogleProviderProjectStructure(ExampleCoverageTest,
AssetsCoverageTest):
+ PROVIDER = "google"
+ CLASS_DIRS = ProjectStructureTest.CLASS_DIRS | {"operators/vertex_ai"}
+
+ DEPRECATED_CLASSES = {
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
'.CloudDataTransferServiceS3ToGCSOperator',
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
@@ -182,16 +291,14 @@ class
TestGoogleProviderProjectStructure(unittest.TestCase):
'airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator',
}
- # Those operators should not have examples as they are never used
standalone (they are abstract)
- BASE_OPERATORS = {
+ BASE_CLASSES = {
'airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator',
'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator',
'airflow.providers.google.cloud.operators.dataproc.DataprocJobBaseOperator',
+
'airflow.providers.google.cloud.operators.vertex_ai.custom_job.CustomTrainingJobBaseOperator',
}
- # Please at the examples to those operators at the earliest convenience :)
- MISSING_EXAMPLES_FOR_OPERATORS = {
-
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator',
+ MISSING_EXAMPLES_FOR_CLASSES = {
'airflow.providers.google.cloud.operators.mlengine.MLEngineTrainingCancelJobOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetStoredInfoTypeOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPReidentifyContentOperator',
@@ -213,76 +320,114 @@ class
TestGoogleProviderProjectStructure(unittest.TestCase):
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDeidentifyTemplateOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPListDLPJobsOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPRedactImageOperator',
+
'airflow.providers.google.cloud.transfers.cassandra_to_gcs.CassandraToGCSOperator',
+
'airflow.providers.google.cloud.transfers.adls_to_gcs.ADLSToGCSOperator',
+
'airflow.providers.google.cloud.transfers.bigquery_to_mysql.BigQueryToMySqlOperator',
+
'airflow.providers.google.cloud.transfers.sql_to_gcs.BaseSQLToGCSOperator',
+
'airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.GetEndpointOperator',
+
'airflow.providers.google.cloud.operators.vertex_ai.auto_ml.AutoMLTrainingJobBaseOperator',
+
'airflow.providers.google.cloud.operators.vertex_ai.endpoint_service.UpdateEndpointOperator',
+
'airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job.'
+ 'GetBatchPredictionJobOperator',
}
- def test_missing_example_for_operator(self):
- """
- Assert that all operators defined under operators, sensors and
transfers directories
- are used in any of the example dags
- """
- all_operators = set()
- services = set()
- for resource_type in ["operators", "sensors", "transfers"]:
- operator_files = set(
- self.find_resource_files(top_level_directory="airflow",
resource_type=resource_type)
- )
- for filepath in operator_files:
- service_name = os.path.basename(filepath)[: -(len(".py"))]
- if service_name in self.MISSING_EXAMPLE_DAGS:
- continue
- services.add(service_name)
- operators_paths =
set(get_classes_from_file(f"{ROOT_FOLDER}/{filepath}"))
- all_operators.update(operators_paths)
-
- for service in services:
- example_dags = self.examples_for_service(service)
- example_paths = {
- path for example_dag in example_dags for path in
get_imports_from_file(example_dag)
- }
- all_operators -= example_paths
-
- all_operators -= self.MISSING_EXAMPLES_FOR_OPERATORS
- all_operators -= self.DEPRECATED_OPERATORS
- all_operators -= self.BASE_OPERATORS
- assert set() == all_operators
-
- @parameterized.expand(
- itertools.product(["_system.py", "_system_helper.py"], ["operators",
"sensors", "transfers"])
- )
- def test_detect_invalid_system_tests(self, resource_type, filename_suffix):
- operators_tests =
self.find_resource_files(top_level_directory="tests",
resource_type=resource_type)
- operators_files =
self.find_resource_files(top_level_directory="airflow",
resource_type=resource_type)
-
- files = {f for f in operators_tests if f.endswith(filename_suffix)}
-
- expected_files = (f"tests/{f[8:]}" for f in operators_files)
- expected_files = (f.replace(".py", filename_suffix).replace("/test_",
"/") for f in expected_files)
- expected_files =
{f'{f.rpartition("/")[0]}/test_{f.rpartition("/")[2]}' for f in expected_files}
-
- assert set() == files - expected_files
-
- @staticmethod
- def find_resource_files(
- top_level_directory: str = "airflow",
- department: str = "*",
- resource_type: str = "*",
- service: str = "*",
- ):
- python_files = glob.glob(
-
f"{ROOT_FOLDER}/{top_level_directory}/providers/google/{department}/{resource_type}/{service}.py"
- )
- # Make path relative
- resource_files = (os.path.relpath(f, ROOT_FOLDER) for f in
python_files)
- # Exclude __init__.py and pycache
- resource_files = (f for f in resource_files if not
f.endswith("__init__.py"))
- return resource_files
+ # These operators should not have assets
+ ASSETS_NOT_REQUIRED = {
+
'airflow.providers.google.cloud.operators.automl.AutoMLDeleteDatasetOperator',
+
'airflow.providers.google.cloud.operators.automl.AutoMLDeleteModelOperator',
+
'airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator',
+
'airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteDatasetOperator',
+
'airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteTableOperator',
+
'airflow.providers.google.cloud.operators.bigquery.BigQueryIntervalCheckOperator',
+
'airflow.providers.google.cloud.operators.bigquery.BigQueryValueCheckOperator',
+
'airflow.providers.google.cloud.operators.bigquery_dts.BigQueryDeleteDataTransferConfigOperator',
+
'airflow.providers.google.cloud.operators.bigtable.BigtableDeleteInstanceOperator',
+
'airflow.providers.google.cloud.operators.bigtable.BigtableDeleteTableOperator',
+
'airflow.providers.google.cloud.operators.cloud_build.CloudBuildDeleteBuildTriggerOperator',
+
'airflow.providers.google.cloud.operators.cloud_memorystore.CloudMemorystoreDeleteInstanceOperator',
+ 'airflow.providers.google.cloud.operators.cloud_memorystore.'
+ 'CloudMemorystoreMemcachedDeleteInstanceOperator',
+
'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator',
+
'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLDeleteInstanceDatabaseOperator',
+
'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLDeleteInstanceOperator',
+
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service.'
+ 'CloudDataTransferServiceDeleteJobOperator',
+
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service.'
+ 'CloudDataTransferServiceGetOperationOperator',
+
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service.'
+ 'CloudDataTransferServiceListOperationsOperator',
+
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service.'
+ 'CloudDataTransferServicePauseOperationOperator',
+
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service.'
+ 'CloudDataTransferServiceResumeOperationOperator',
+
'airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator',
+
'airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteEntryGroupOperator',
+
'airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteEntryOperator',
+
'airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagOperator',
+
'airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagTemplateFieldOperator',
+
'airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagTemplateOperator',
+
'airflow.providers.google.cloud.operators.datafusion.CloudDataFusionDeleteInstanceOperator',
+
'airflow.providers.google.cloud.operators.datafusion.CloudDataFusionDeletePipelineOperator',
+
'airflow.providers.google.cloud.operators.dataproc.DataprocDeleteBatchOperator',
+
'airflow.providers.google.cloud.operators.dataproc.DataprocDeleteClusterOperator',
+
'airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteBackupOperator',
+
'airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteServiceOperator',
+
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreBeginTransactionOperator',
+
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator',
+
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator',
+
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreRollbackOperator',
+
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreRunQueryOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeidentifyContentOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDLPJobOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDeidentifyTemplateOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteInspectTemplateOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteJobTriggerOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteStoredInfoTypeOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPInspectContentOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPRedactImageOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPReidentifyContentOperator',
+
'airflow.providers.google.cloud.operators.functions.CloudFunctionDeleteFunctionOperator',
+ 'airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator',
+
'airflow.providers.google.cloud.operators.gcs.GCSDeleteObjectsOperator',
+
'airflow.providers.google.cloud.operators.kubernetes_engine.GKEDeleteClusterOperator',
+
'airflow.providers.google.cloud.operators.mlengine.MLEngineDeleteModelOperator',
+
'airflow.providers.google.cloud.operators.mlengine.MLEngineDeleteVersionOperator',
+
'airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator',
+
'airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator',
+
'airflow.providers.google.cloud.operators.spanner.SpannerDeleteDatabaseInstanceOperator',
+
'airflow.providers.google.cloud.operators.spanner.SpannerDeleteInstanceOperator',
+
'airflow.providers.google.cloud.operators.stackdriver.StackdriverDeleteAlertOperator',
+
'airflow.providers.google.cloud.operators.stackdriver.StackdriverDeleteNotificationChannelOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueDeleteOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksTaskDeleteOperator',
+
'airflow.providers.google.cloud.operators.translate.CloudTranslateTextOperator',
+
'airflow.providers.google.cloud.operators.translate_speech.CloudTranslateSpeechOperator',
+
'airflow.providers.google.cloud.operators.vision.CloudVisionDeleteProductOperator',
+
'airflow.providers.google.cloud.operators.vision.CloudVisionDeleteProductSetOperator',
+
'airflow.providers.google.cloud.operators.vision.CloudVisionDeleteReferenceImageOperator',
+
'airflow.providers.google.cloud.operators.workflows.WorkflowsDeleteWorkflowOperator',
+ 'airflow.providers.google.marketing_platform.sensors.campaign_manager.'
+ 'GoogleCampaignManagerReportSensor',
+ 'airflow.providers.google.marketing_platform.sensors.display_video.'
+ 'GoogleDisplayVideo360GetSDFDownloadOperationSensor',
+ 'airflow.providers.google.marketing_platform.sensors.display_video.'
+ 'GoogleDisplayVideo360ReportSensor',
+
'airflow.providers.google.marketing_platform.sensors.search_ads.GoogleSearchAdsReportSensor',
+ }
- @staticmethod
- def examples_for_service(service_name):
- yield from glob.glob(
-
f"{ROOT_FOLDER}/airflow/providers/google/*/example_dags/example_{service_name}*.py"
- )
- yield from
glob.glob(f"{ROOT_FOLDER}/tests/system/providers/google/{service_name}/example_*.py")
+ @pytest.mark.xfail(reason="We did not reach full coverage yet")
+ def test_missing_assets(self):
+ super().test_missing_assets()
+
+
+class TestElasticsearchProviderProjectStructure(ExampleCoverageTest):
+ PROVIDER = "elasticsearch"
+ CLASS_DIRS = {"hooks"}
+ CLASS_SUFFIXES = ["Hook"]
+
+
+class TestDockerProviderProjectStructure(ExampleCoverageTest):
+ PROVIDER = "docker"
class TestOperatorsHooks(unittest.TestCase):