This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 388736b  Detect partial examples DAGs for Google (#12277)
388736b is described below

commit 388736bf97a4313f81aadbeecbb99e5fcb145c31
Author: Kamil BreguĊ‚a <[email protected]>
AuthorDate: Wed Nov 11 19:26:30 2020 +0100

    Detect partial examples DAGs for Google (#12277)
---
 tests/always/test_project_structure.py | 149 +++++++++++++++++++++++++++++++--
 1 file changed, 140 insertions(+), 9 deletions(-)

diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index bcde9bb..25aa752 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -14,12 +14,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import ast
 import glob
 import itertools
 import mmap
 import os
 import unittest
+from typing import List
 
 from parameterized import parameterized
 
@@ -114,6 +115,42 @@ class TestProjectStructure(unittest.TestCase):
                 )
 
 
+def get_imports_from_file(filepath: str):
+    with open(filepath) as py_file:
+        content = py_file.read()
+    doc_node = ast.parse(content, filepath)
+    import_names: List[str] = []
+    for current_node in ast.walk(doc_node):
+        if not isinstance(current_node, (ast.Import, ast.ImportFrom)):
+            continue
+        for alias in current_node.names:
+            name = alias.name
+            fullname = f'{current_node.module}.{name}' if 
isinstance(current_node, ast.ImportFrom) else name
+            import_names.append(fullname)
+    return import_names
+
+
+def filepath_to_module(filepath: str):
+    filepath = os.path.relpath(os.path.abspath(filepath), ROOT_FOLDER)
+    return filepath.replace("/", ".")[: -(len('.py'))]
+
+
+def get_classes_from_file(filepath: str):
+    with open(filepath) as py_file:
+        content = py_file.read()
+    doc_node = ast.parse(content, filepath)
+    module = filepath_to_module(filepath)
+    results: List[str] = []
+    for current_node in ast.walk(doc_node):
+        if not isinstance(current_node, ast.ClassDef):
+            continue
+        name = current_node.name
+        if not name.endswith("Operator") and not name.endswith("Sensor") and 
not name.endswith("Operator"):
+            continue
+        results.append(f"{module}.{name}")
+    return results
+
+
 class TestGoogleProviderProjectStructure(unittest.TestCase):
     MISSING_EXAMPLE_DAGS = {
         ('cloud', 'adls_to_gcs'),
@@ -124,6 +161,79 @@ class 
TestGoogleProviderProjectStructure(unittest.TestCase):
         ('ads', 'ads_to_gcs'),
     }
 
+    MISSING_EXAMPLES_FOR_OPERATORS = {
+        
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueDeleteOperator',
+        
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueResumeOperator',
+        
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuePauseOperator',
+        
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuePurgeOperator',
+        
'airflow.providers.google.cloud.operators.tasks.CloudTasksTaskGetOperator',
+        
'airflow.providers.google.cloud.operators.tasks.CloudTasksTasksListOperator',
+        
'airflow.providers.google.cloud.operators.tasks.CloudTasksTaskDeleteOperator',
+        
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueGetOperator',
+        
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueueUpdateOperator',
+        
'airflow.providers.google.cloud.operators.tasks.CloudTasksQueuesListOperator',
+        # Deprecated operator. Ignore it.
+        
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
+        '.CloudDataTransferServiceS3ToGCSOperator',
+        # Deprecated operator. Ignore it.
+        
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
+        '.CloudDataTransferServiceGCSToGCSOperator',
+        # Base operator. Ignore it.
+        
'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator',
+        # Deprecated operator. Ignore it
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHadoopJobOperator',
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator',
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator',
+        # Deprecated operator. Ignore it
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator',
+        # Base operator. Ignore it
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocJobBaseOperator',
+        # Deprecated operator. Ignore it
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitSparkJobOperator',
+        # Deprecated operator. Ignore it
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitSparkSqlJobOperator',
+        # Deprecated operator. Ignore it
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHiveJobOperator',
+        # Deprecated operator. Ignore it
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitPigJobOperator',
+        # Deprecated operator. Ignore it
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitPySparkJobOperator',
+        
'airflow.providers.google.cloud.operators.mlengine.MLEngineTrainingCancelJobOperator',
+        # Deprecated operator. Ignore it
+        
'airflow.providers.google.cloud.operators.mlengine.MLEngineManageModelOperator',
+        # Deprecated operator. Ignore it
+        
'airflow.providers.google.cloud.operators.mlengine.MLEngineManageVersionOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetStoredInfoTypeOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPReidentifyContentOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDeidentifyTemplateOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDLPJobOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPUpdateDeidentifyTemplateOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeidentifyContentOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDLPJobTriggerOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPListDeidentifyTemplatesOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDeidentifyTemplateOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPListInspectTemplatesOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPListStoredInfoTypesOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPUpdateInspectTemplateOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDLPJobOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPListJobTriggersOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPCancelDLPJobOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDLPJobOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetInspectTemplateOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPListInfoTypesOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeleteDeidentifyTemplateOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPListDLPJobsOperator',
+        
'airflow.providers.google.cloud.operators.dlp.CloudDLPRedactImageOperator',
+        
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator',
+        
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator',
+        # Base operator. Ignore it
+        
'airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator',
+        'airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor',
+        'airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor',
+        
'airflow.providers.google.cloud.sensors.gcs.GCSObjectsWtihPrefixExistenceSensor',
+        
'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor',
+    }
+
     def test_example_dags(self):
         operators_modules = itertools.chain(
             *[self.find_resource_files(resource_type=d) for d in ["operators", 
"sensors", "transfers"]]
@@ -174,15 +284,36 @@ class 
TestGoogleProviderProjectStructure(unittest.TestCase):
                     "Can you remove it from the list of missing example, 
please?"
                 )
 
-    @parameterized.expand(
-        [
-            (
-                resource_type,
-                suffix,
+    def test_missing_example_for_operator(self):
+        missing_operators = []
+
+        for resource_type in ["operators", "sensors", "tranfers"]:
+            operator_files = set(
+                self.find_resource_files(top_level_directory="airflow", 
resource_type=resource_type)
             )
-            for suffix in ["_system.py", "_system_helper.py"]
-            for resource_type in ["operators", "sensors", "tranfers"]
-        ]
+            for filepath in operator_files:
+                service_name = os.path.basename(filepath)[: -(len(".py"))]
+                example_dags = list(
+                    glob.glob(
+                        
f"{ROOT_FOLDER}/airflow/providers/google/*/example_dags/example_{service_name}*.py"
+                    )
+                )
+                if not example_dags:
+                    # Ignore. We have separate tests that detect this.
+                    continue
+                example_paths = {
+                    path for example_dag in example_dags for path in 
get_imports_from_file(example_dag)
+                }
+                example_paths = {
+                    path for path in example_paths if 
f'.{resource_type}.{service_name}.' in path
+                }
+                print("example_paths=", example_paths)
+                operators_paths = 
set(get_classes_from_file(f"{ROOT_FOLDER}/{filepath}"))
+                missing_operators.extend(operators_paths - example_paths)
+        self.assertEqual(set(missing_operators), 
self.MISSING_EXAMPLES_FOR_OPERATORS)
+
+    @parameterized.expand(
+        itertools.product(["_system.py", "_system_helper.py"], ["operators", 
"sensors", "tranfers"])
     )
     def test_detect_invalid_system_tests(self, resource_type, filename_suffix):
         operators_tests = 
self.find_resource_files(top_level_directory="tests", 
resource_type=resource_type)

Reply via email to