This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 388736b Detect partial examples DAGs for Google (#12277)
388736b is described below
commit 388736bf97a4313f81aadbeecbb99e5fcb145c31
Author: Kamil BreguĊa <[email protected]>
AuthorDate: Wed Nov 11 19:26:30 2020 +0100
Detect partial examples DAGs for Google (#12277)
---
tests/always/test_project_structure.py | 149 +++++++++++++++++++++++++++++++--
1 file changed, 140 insertions(+), 9 deletions(-)
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index bcde9bb..25aa752 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -14,12 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import ast
import glob
import itertools
import mmap
import os
import unittest
+from typing import List
from parameterized import parameterized
@@ -114,6 +115,42 @@ class TestProjectStructure(unittest.TestCase):
)
+def get_imports_from_file(filepath: str):
+ with open(filepath) as py_file:
+ content = py_file.read()
+ doc_node = ast.parse(content, filepath)
+ import_names: List[str] = []
+ for current_node in ast.walk(doc_node):
+ if not isinstance(current_node, (ast.Import, ast.ImportFrom)):
+ continue
+ for alias in current_node.names:
+ name = alias.name
+ fullname = f'{current_node.module}.{name}' if
isinstance(current_node, ast.ImportFrom) else name
+ import_names.append(fullname)
+ return import_names
+
+
+def filepath_to_module(filepath: str):
+ filepath = os.path.relpath(os.path.abspath(filepath), ROOT_FOLDER)
+ return filepath.replace("/", ".")[: -(len('.py'))]
+
+
+def get_classes_from_file(filepath: str):
+ with open(filepath) as py_file:
+ content = py_file.read()
+ doc_node = ast.parse(content, filepath)
+ module = filepath_to_module(filepath)
+ results: List[str] = []
+ for current_node in ast.walk(doc_node):
+ if not isinstance(current_node, ast.ClassDef):
+ continue
+ name = current_node.name
+ if not name.endswith("Operator") and not name.endswith("Sensor") and
not name.endswith("Operator"):
+ continue
+ results.append(f"{module}.{name}")
+ return results
+
+
class TestGoogleProviderProjectStructure(unittest.TestCase):
MISSING_EXAMPLE_DAGS = {
('cloud', 'adls_to_gcs'),
@@ -124,6 +161,79 @@ class
TestGoogleProviderProjectStructure(unittest.TestCase):
('ads', 'ads_to_gcs'),
}
+ MISSING_EXAMPLES_FOR_OPERATORS = {
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueDeleteOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueResumeOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuePauseOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuePurgeOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksTaskGetOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksTasksListOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksTaskDeleteOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueGetOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueUpdateOperator',
+
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuesListOperator',
+ # Deprecated operator. Ignore it.
+
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
+ '.CloudDataTransferServiceS3ToGCSOperator',
+ # Deprecated operator. Ignore it.
+
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
+ '.CloudDataTransferServiceGCSToGCSOperator',
+ # Base operator. Ignore it.
+
'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator',
+ # Deprecated operator. Ignore it
+
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHadoopJobOperator',
+
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator',
+
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator',
+ # Deprecated operator. Ignore it
+
'airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator',
+ # Base operator. Ignore it
+
'airflow.providers.google.cloud.operators.dataproc.DataprocJobBaseOperator',
+ # Deprecated operator. Ignore it
+
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitSparkJobOperator',
+ # Deprecated operator. Ignore it
+
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitSparkSqlJobOperator',
+ # Deprecated operator. Ignore it
+
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHiveJobOperator',
+ # Deprecated operator. Ignore it
+
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitPigJobOperator',
+ # Deprecated operator. Ignore it
+
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitPySparkJobOperator',
+
'airflow.providers.google.cloud.operators.mlengine.MLEngineTrainingCancelJobOperator',
+ # Deprecated operator. Ignore it
+
'airflow.providers.google.cloud.operators.mlengine.MLEngineManageModelOperator',
+ # Deprecated operator. Ignore it
+
'airflow.providers.google.cloud.operators.mlengine.MLEngineManageVersionOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetStoredInfoTypeOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPReidentifyContentOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDeidentifyTemplateOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDLPJobOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPUpdateDeidentifyTemplateOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeidentifyContentOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDLPJobTriggerOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPListDeidentifyTemplatesOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDeidentifyTemplateOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPListInspectTemplatesOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPListStoredInfoTypesOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPUpdateInspectTemplateOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDLPJobOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPListJobTriggersOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPCancelDLPJobOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDLPJobOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetInspectTemplateOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPListInfoTypesOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDeidentifyTemplateOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPListDLPJobsOperator',
+
'airflow.providers.google.cloud.operators.dlp.CloudDLPRedactImageOperator',
+
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator',
+
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator',
+ # Base operator. Ignore it
+
'airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator',
+ 'airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor',
+ 'airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor',
+
'airflow.providers.google.cloud.sensors.gcs.GCSObjectsWtihPrefixExistenceSensor',
+
'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor',
+ }
+
def test_example_dags(self):
operators_modules = itertools.chain(
*[self.find_resource_files(resource_type=d) for d in ["operators",
"sensors", "transfers"]]
@@ -174,15 +284,36 @@ class
TestGoogleProviderProjectStructure(unittest.TestCase):
"Can you remove it from the list of missing example,
please?"
)
- @parameterized.expand(
- [
- (
- resource_type,
- suffix,
+ def test_missing_example_for_operator(self):
+ missing_operators = []
+
+ for resource_type in ["operators", "sensors", "tranfers"]:
+ operator_files = set(
+ self.find_resource_files(top_level_directory="airflow",
resource_type=resource_type)
)
- for suffix in ["_system.py", "_system_helper.py"]
- for resource_type in ["operators", "sensors", "tranfers"]
- ]
+ for filepath in operator_files:
+ service_name = os.path.basename(filepath)[: -(len(".py"))]
+ example_dags = list(
+ glob.glob(
+
f"{ROOT_FOLDER}/airflow/providers/google/*/example_dags/example_{service_name}*.py"
+ )
+ )
+ if not example_dags:
+ # Ignore. We have separate tests that detect this.
+ continue
+ example_paths = {
+ path for example_dag in example_dags for path in
get_imports_from_file(example_dag)
+ }
+ example_paths = {
+ path for path in example_paths if
f'.{resource_type}.{service_name}.' in path
+ }
+ print("example_paths=", example_paths)
+ operators_paths =
set(get_classes_from_file(f"{ROOT_FOLDER}/{filepath}"))
+ missing_operators.extend(operators_paths - example_paths)
+ self.assertEqual(set(missing_operators),
self.MISSING_EXAMPLES_FOR_OPERATORS)
+
+ @parameterized.expand(
+ itertools.product(["_system.py", "_system_helper.py"], ["operators",
"sensors", "tranfers"])
)
def test_detect_invalid_system_tests(self, resource_type, filename_suffix):
operators_tests =
self.find_resource_files(top_level_directory="tests",
resource_type=resource_type)