bhirsz commented on code in PR #23351:
URL: https://github.com/apache/airflow/pull/23351#discussion_r863806713
##########
tests/always/test_project_structure.py:
##########
@@ -135,33 +133,123 @@ def filepath_to_module(filepath: str):
return filepath.replace("/", ".")[: -(len('.py'))]
-def get_classes_from_file(filepath: str):
- with open(filepath) as py_file:
- content = py_file.read()
- doc_node = ast.parse(content, filepath)
- module = filepath_to_module(filepath)
- results: List[str] = []
- for current_node in ast.walk(doc_node):
- if not isinstance(current_node, ast.ClassDef):
- continue
- name = current_node.name
- if not name.endswith("Operator") and not name.endswith("Sensor") and
not name.endswith("Operator"):
- continue
- results.append(f"{module}.{name}")
- return results
-
-
-class TestGoogleProviderProjectStructure(unittest.TestCase):
- MISSING_EXAMPLE_DAGS = {
- 'adls_to_gcs',
- 'sql_to_gcs',
- 'bigquery_to_mysql',
- 'cassandra_to_gcs',
- 'drive',
- 'ads_to_gcs',
- }
+class ProjectStructureTest:
+ PROVIDER = "dummy"
+ OPERATOR_DIRS = {"operators", "sensors", "transfers"}
+ CLASS_SUFFIXES = ["Operator", "Sensor"]
+
+ def operator_paths(self):
+ """Override this method if your operators are located under different
paths"""
+ for resource_type in self.OPERATOR_DIRS:
+ python_files = glob.glob(
+
f"{ROOT_FOLDER}/airflow/providers/{self.PROVIDER}/**/{resource_type}/**.py",
recursive=True
+ )
+ # Make path relative
+ resource_files = (os.path.relpath(f, ROOT_FOLDER) for f in
python_files)
+ # Exclude __init__.py and pycache
+ resource_files = (f for f in resource_files if not
f.endswith("__init__.py"))
+ yield from resource_files
+
+ def list_of_operators(self):
+ all_operators = {}
+ for operator_file in self.operator_paths():
+ operators_paths =
self.get_classes_from_file(f"{ROOT_FOLDER}/{operator_file}")
+ all_operators.update(operators_paths)
+ return all_operators
+
+ def get_classes_from_file(self, filepath: str):
+ with open(filepath) as py_file:
+ content = py_file.read()
+ doc_node = ast.parse(content, filepath)
+ module = filepath_to_module(filepath)
+ results: Dict = {}
+ for current_node in ast.walk(doc_node):
+ if not isinstance(current_node, ast.ClassDef):
+ continue
+ name = current_node.name
+ if not any(name.endswith(suffix) for suffix in
self.CLASS_SUFFIXES):
+ continue
+ results[f"{module}.{name}"] = current_node
+ return results
+
+
+class ExampleCoverageTest(ProjectStructureTest):
+ """Checks that every operator is covered by example"""
+
+ # Those operators are deprecated, so we do not need examples for them
+ DEPRECATED_OPERATORS: Set = set()
+
+ # Those operators should not have examples as they are never used
standalone (they are abstract)
+ BASE_OPERATORS: Set = set()
+
+ # Please add the examples to those operators at the earliest convenience :)
+ MISSING_EXAMPLES_FOR_OPERATORS: Set = set()
+
+ def example_paths(self):
+ """Override this method if your example dags are located elsewhere"""
+ # old_design:
+ yield from glob.glob(
+
f"{ROOT_FOLDER}/airflow/providers/{self.PROVIDER}/**/example_dags/example_*.py",
recursive=True
+ )
+ # new_design:
+ yield from glob.glob(
+
f"{ROOT_FOLDER}/tests/system/providers/{self.PROVIDER}/**/example_*.py",
recursive=True
+ )
+
+ def test_missing_example_for_operator(self):
+ """
+ Assert that all operators defined under operators, sensors and
transfers directories
+ are used in any of the example dags
+ """
+ all_operators = self.list_of_operators()
+ assert 0 != len(all_operators), "Failed to retrieve operators,
override operator_paths if needed"
+ all_operators = set(all_operators.keys())
+ for example in self.example_paths():
+ all_operators -= get_imports_from_file(example)
+
+ covered_but_omitted = self.MISSING_EXAMPLES_FOR_OPERATORS -
all_operators
+ all_operators -= self.MISSING_EXAMPLES_FOR_OPERATORS
+ all_operators -= self.DEPRECATED_OPERATORS
+ all_operators -= self.BASE_OPERATORS
+ assert set() == all_operators, (
+ "Not all operators are covered with example dags. "
+ "Update self.MISSING_EXAMPLES_FOR_OPERATORS if you want to skip
this error"
+ )
+ assert set() == covered_but_omitted, "Operator listed in missing
examples but is used in example dag"
+
+
+class AssetsCoverageTest(ProjectStructureTest):
+ """Checks that every operator have operator_extra_links attribute"""
+
+ # These operators should not have assets
+ ASSETS_NOT_REQUIRED: Set = set()
+
+ def test_missing_assets_for_operator(self):
+ all_operators = self.list_of_operators()
+ assets, no_assets = set(), set()
+ for name, operator in all_operators.items():
+ for attr in operator.body:
+ if (
+ isinstance(attr, ast.Assign)
+ and attr.targets
+ and getattr(attr.targets[0], "id", "") ==
"operator_extra_links"
+ ):
+ assets.add(name)
+ break
+ else:
+ no_assets.add(name)
+
+ asset_should_be_missing = self.ASSETS_NOT_REQUIRED - no_assets
+ no_assets -= self.ASSETS_NOT_REQUIRED
+ # TODO: (bhirsz): uncomment when we reach full coverage
+ # assert set() == no_assets, "Operator is missing assets"
Review Comment:
I've added ``MISSING_ASSETS_FOR_OPERATORS`` so it's possible to define list
of missing assets but it's also possible to temporarily expect test to fail:
```
@pytest.mark.xfail(reason="We did not reach full coverage yet")
def test_missing_assets_for_operator(self):
super().test_missing_assets_for_operator()
```
(example from the Google provider). When we will decrease number of missing
assets I will override ``MISSING_ASSETS_FOR_OPERATORS`` with missing assets and
expect test to pass.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]