This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 58d61826a3 Migrate Dataproc Metastore system tests according to AIP-47 
(#26858)
58d61826a3 is described below

commit 58d61826a3f47a071c1f0ed4d5b8a5bd01131acb
Author: BartÅ‚omiej Hirsz <[email protected]>
AuthorDate: Thu Oct 27 10:17:09 2022 +0200

    Migrate Dataproc Metastore system tests according to AIP-47 (#26858)
---
 .../google/cloud/hooks/dataproc_metastore.py       |   2 +-
 .../providers/google/cloud/operators/dataproc.py   |   2 +-
 .../google/cloud/operators/dataproc_metastore.py   |   2 +-
 .../operators/cloud/dataproc_metastore.rst         |  28 ++--
 .../operators/test_dataproc_metastore_system.py    |  42 ------
 .../google/cloud/dataproc_metastore/__init__.py    |  16 ++
 .../example_dataproc_metastore.py                  | 165 +++++++++------------
 .../example_dataproc_metastore_backup.py           | 136 +++++++++++++++++
 8 files changed, 239 insertions(+), 154 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dataproc_metastore.py 
b/airflow/providers/google/cloud/hooks/dataproc_metastore.py
index 669ad12cd7..c7dcebee9f 100644
--- a/airflow/providers/google/cloud/hooks/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/hooks/dataproc_metastore.py
@@ -116,7 +116,7 @@ class DataprocMetastoreHook(GoogleBaseHook):
         project_id: str,
         region: str,
         service_id: str,
-        metadata_import: MetadataImport,
+        metadata_import: dict | MetadataImport,
         metadata_import_id: str,
         request_id: str | None = None,
         retry: Retry | _MethodDefault = DEFAULT,
diff --git a/airflow/providers/google/cloud/operators/dataproc.py 
b/airflow/providers/google/cloud/operators/dataproc.py
index b42d1245de..9242877669 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -2128,7 +2128,7 @@ class DataprocCreateBatchOperator(BaseOperator):
                 metadata=self.metadata,
             )
 
-            # The existing batch may be a in a number of states other than 
'SUCCEEDED'
+            # The existing batch may be a number of states other than 
'SUCCEEDED'
             if result.state != Batch.State.SUCCEEDED:
                 if result.state == Batch.State.FAILED or result.state == 
Batch.State.CANCELLED:
                     raise AirflowException(
diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py 
b/airflow/providers/google/cloud/operators/dataproc_metastore.py
index 6e6e9fcfe3..e0585805cc 100644
--- a/airflow/providers/google/cloud/operators/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py
@@ -311,7 +311,7 @@ class 
DataprocMetastoreCreateMetadataImportOperator(BaseOperator):
         project_id: str,
         region: str,
         service_id: str,
-        metadata_import: MetadataImport,
+        metadata_import: dict | MetadataImport,
         metadata_import_id: str,
         request_id: str | None = None,
         retry: Retry | _MethodDefault = DEFAULT,
diff --git 
a/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst 
b/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
index c7ff5305c9..564ddac527 100644
--- 
a/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
+++ 
b/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
@@ -33,7 +33,7 @@ For more information about the available fields to pass when 
creating a service,
 
 A simple service configuration can look as followed:
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
     :language: python
     :dedent: 0
     :start-after: [START how_to_cloud_dataproc_metastore_create_service]
@@ -42,7 +42,7 @@ A simple service configuration can look as followed:
 With this configuration we can create the service:
 
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateServiceOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
     :language: python
     :dedent: 4
     :start-after: [START 
how_to_cloud_dataproc_metastore_create_service_operator]
@@ -55,7 +55,7 @@ To get a service you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreGetServiceOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_dataproc_metastore_get_service_operator]
@@ -69,7 +69,7 @@ For more information on updateMask and other parameters take 
a look at `Dataproc
 
 An example of a new service config and the updateMask:
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
     :language: python
     :dedent: 0
     :start-after: [START how_to_cloud_dataproc_metastore_update_service]
@@ -78,7 +78,7 @@ An example of a new service config and the updateMask:
 To update a service you can use:
 
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreUpdateServiceOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
     :language: python
     :dedent: 4
     :start-after: [START 
how_to_cloud_dataproc_metastore_update_service_operator]
@@ -91,7 +91,7 @@ To delete a service you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteServiceOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
     :language: python
     :dedent: 4
     :start-after: [START 
how_to_cloud_dataproc_metastore_delete_service_operator]
@@ -104,7 +104,7 @@ To export metadata you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreExportMetadataOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
     :language: python
     :dedent: 4
     :start-after: [START 
how_to_cloud_dataproc_metastore_export_metadata_operator]
@@ -117,7 +117,7 @@ To restore a service you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreRestoreServiceOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
     :language: python
     :dedent: 4
     :start-after: [START 
how_to_cloud_dataproc_metastore_restore_service_operator]
@@ -131,7 +131,7 @@ For more information about the available fields to pass 
when creating a metadata
 
 A simple metadata import configuration can look as followed:
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
     :language: python
     :dedent: 0
     :start-after: [START 
how_to_cloud_dataproc_metastore_create_metadata_import]
@@ -140,7 +140,7 @@ A simple metadata import configuration can look as followed:
 To create a metadata import you can use:
 
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateMetadataImportOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
     :language: python
     :dedent: 4
     :start-after: [START 
how_to_cloud_dataproc_metastore_create_metadata_import_operator]
@@ -154,7 +154,7 @@ For more information about the available fields to pass 
when creating a backup,
 
 A simple backup configuration can look as followed:
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
     :language: python
     :dedent: 0
     :start-after: [START how_to_cloud_dataproc_metastore_create_backup]
@@ -163,7 +163,7 @@ A simple backup configuration can look as followed:
 With this configuration we can create the backup:
 
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateBackupOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
     :language: python
     :dedent: 4
     :start-after: [START 
how_to_cloud_dataproc_metastore_create_backup_operator]
@@ -176,7 +176,7 @@ To delete a backup you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteBackupOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
     :language: python
     :dedent: 4
     :start-after: [START 
how_to_cloud_dataproc_metastore_delete_backup_operator]
@@ -189,7 +189,7 @@ To list backups you can use:
 
 
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreListBackupsOperator`
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
     :language: python
     :dedent: 4
     :start-after: [START how_to_cloud_dataproc_metastore_list_backups_operator]
diff --git 
a/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py 
b/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py
deleted file mode 100644
index 2db8f62223..0000000000
--- a/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_dataproc_metastore 
import BUCKET
-from tests.providers.google.cloud.utils.gcp_authenticator import 
GCP_DATAPROC_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, 
GoogleSystemTest, provide_gcp_context
-
-
[email protected]("mysql", "postgres")
[email protected]_file(GCP_DATAPROC_KEY)
-class DataprocMetastoreExampleDagsTest(GoogleSystemTest):
-    @provide_gcp_context(GCP_DATAPROC_KEY)
-    def setUp(self):
-        super().setUp()
-        self.create_gcs_bucket(BUCKET)
-
-    @provide_gcp_context(GCP_DATAPROC_KEY)
-    def tearDown(self):
-        self.delete_gcs_bucket(BUCKET)
-        super().tearDown()
-
-    @provide_gcp_context(GCP_DATAPROC_KEY)
-    def test_run_example_dag(self):
-        self.run_dag(dag_id="example_gcp_dataproc_metastore", 
dag_folder=CLOUD_DAG_FOLDER)
diff --git a/tests/system/providers/google/cloud/dataproc_metastore/__init__.py 
b/tests/system/providers/google/cloud/dataproc_metastore/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataproc_metastore/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py 
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
similarity index 58%
rename from 
airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
rename to 
tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
index c1208510a2..1b6341a73a 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+++ 
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
@@ -23,36 +23,39 @@ from __future__ import annotations
 
 import datetime
 import os
+from pathlib import Path
 
-from google.cloud.metastore_v1 import MetadataImport
 from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow import models
-from airflow.models.baseoperator import chain
 from airflow.providers.google.cloud.operators.dataproc_metastore import (
-    DataprocMetastoreCreateBackupOperator,
     DataprocMetastoreCreateMetadataImportOperator,
     DataprocMetastoreCreateServiceOperator,
-    DataprocMetastoreDeleteBackupOperator,
     DataprocMetastoreDeleteServiceOperator,
     DataprocMetastoreExportMetadataOperator,
     DataprocMetastoreGetServiceOperator,
-    DataprocMetastoreListBackupsOperator,
-    DataprocMetastoreRestoreServiceOperator,
     DataprocMetastoreUpdateServiceOperator,
 )
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<PROJECT_ID>")
-SERVICE_ID = os.environ.get("GCP_DATAPROC_METASTORE_SERVICE_ID", 
"dataproc-metastore-system-tests-service-1")
-BACKUP_ID = os.environ.get("GCP_DATAPROC_METASTORE_BACKUP_ID", 
"dataproc-metastore-system-tests-backup-1")
-REGION = os.environ.get("GCP_REGION", "<REGION>")
-BUCKET = os.environ.get("GCP_DATAPROC_METASTORE_BUCKET", "INVALID BUCKET NAME")
-METADATA_IMPORT_FILE = os.environ.get("GCS_METADATA_IMPORT_FILE", None)
-GCS_URI = os.environ.get("GCS_URI", f"gs://{BUCKET}/data/hive.sql")
-METADATA_IMPORT_ID = "dataproc-metastore-system-tests-metadata-import-1"
-TIMEOUT = 1200
+DAG_ID = "dataproc_metastore"
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+
+SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-")
+METADATA_IMPORT_ID = f"{DAG_ID}-metadata-{ENV_ID}".replace("_", "-")
+
+REGION = "europe-west1"
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+TIMEOUT = 2400
 DB_TYPE = "MYSQL"
-DESTINATION_GCS_FOLDER = f"gs://{BUCKET}/>"
+DESTINATION_GCS_FOLDER = f"gs://{BUCKET_NAME}/>"
+
+HIVE_FILE_SRC = str(Path(__file__).parent.parent / "dataproc" / "resources" / 
"hive.sql")
+HIVE_FILE = "data/hive.sql"
+GCS_URI = f"gs://{BUCKET_NAME}/data/hive.sql"
 
 # Service definition
 # Docs: 
https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services#Service
@@ -62,6 +65,16 @@ SERVICE = {
 }
 # [END how_to_cloud_dataproc_metastore_create_service]
 
+# [START how_to_cloud_dataproc_metastore_create_metadata_import]
+METADATA_IMPORT = {
+    "name": "test-metadata-import",
+    "database_dump": {
+        "gcs_uri": GCS_URI,
+        "database_type": DB_TYPE,
+    },
+}
+# [END how_to_cloud_dataproc_metastore_create_metadata_import]
+
 # Update service
 # [START how_to_cloud_dataproc_metastore_update_service]
 SERVICE_TO_UPDATE = {
@@ -73,31 +86,24 @@ SERVICE_TO_UPDATE = {
 UPDATE_MASK = FieldMask(paths=["labels"])
 # [END how_to_cloud_dataproc_metastore_update_service]
 
-# Backup definition
-# [START how_to_cloud_dataproc_metastore_create_backup]
-BACKUP = {
-    "name": "test-backup",
-}
-# [END how_to_cloud_dataproc_metastore_create_backup]
-
-# Metadata import definition
-# [START how_to_cloud_dataproc_metastore_create_metadata_import]
-METADATA_IMPORT = MetadataImport(
-    {
-        "name": "test-metadata-import",
-        "database_dump": {
-            "gcs_uri": GCS_URI,
-            "database_type": DB_TYPE,
-        },
-    }
-)
-# [END how_to_cloud_dataproc_metastore_create_metadata_import]
-
-
 with models.DAG(
-    dag_id="example_gcp_dataproc_metastore",
+    DAG_ID,
     start_date=datetime.datetime(2021, 1, 1),
+    schedule="@once",
+    catchup=False,
+    tags=["example", "dataproc", "metastore"],
 ) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+    )
+
+    upload_file = LocalFilesystemToGCSOperator(
+        task_id="upload_file",
+        src=HIVE_FILE_SRC,
+        dst=HIVE_FILE,
+        bucket=BUCKET_NAME,
+    )
+
     # [START how_to_cloud_dataproc_metastore_create_service_operator]
     create_service = DataprocMetastoreCreateServiceOperator(
         task_id="create_service",
@@ -153,52 +159,6 @@ with models.DAG(
     )
     # [END how_to_cloud_dataproc_metastore_export_metadata_operator]
 
-    # [START how_to_cloud_dataproc_metastore_create_backup_operator]
-    backup_service = DataprocMetastoreCreateBackupOperator(
-        task_id="create_backup",
-        project_id=PROJECT_ID,
-        region=REGION,
-        service_id=SERVICE_ID,
-        backup=BACKUP,
-        backup_id=BACKUP_ID,
-        timeout=TIMEOUT,
-    )
-    # [END how_to_cloud_dataproc_metastore_create_backup_operator]
-
-    # [START how_to_cloud_dataproc_metastore_list_backups_operator]
-    list_backups = DataprocMetastoreListBackupsOperator(
-        task_id="list_backups",
-        project_id=PROJECT_ID,
-        region=REGION,
-        service_id=SERVICE_ID,
-    )
-    # [END how_to_cloud_dataproc_metastore_list_backups_operator]
-
-    # [START how_to_cloud_dataproc_metastore_delete_backup_operator]
-    delete_backup = DataprocMetastoreDeleteBackupOperator(
-        task_id="delete_backup",
-        project_id=PROJECT_ID,
-        region=REGION,
-        service_id=SERVICE_ID,
-        backup_id=BACKUP_ID,
-        timeout=TIMEOUT,
-    )
-    # [END how_to_cloud_dataproc_metastore_delete_backup_operator]
-
-    # [START how_to_cloud_dataproc_metastore_restore_service_operator]
-    restore_service = DataprocMetastoreRestoreServiceOperator(
-        task_id="restore_metastore",
-        region=REGION,
-        project_id=PROJECT_ID,
-        service_id=SERVICE_ID,
-        backup_id=BACKUP_ID,
-        backup_region=REGION,
-        backup_project_id=PROJECT_ID,
-        backup_service_id=SERVICE_ID,
-        timeout=TIMEOUT,
-    )
-    # [END how_to_cloud_dataproc_metastore_restore_service_operator]
-
     # [START how_to_cloud_dataproc_metastore_delete_service_operator]
     delete_service = DataprocMetastoreDeleteServiceOperator(
         task_id="delete_service",
@@ -209,15 +169,30 @@ with models.DAG(
     )
     # [END how_to_cloud_dataproc_metastore_delete_service_operator]
 
-    chain(
-        create_service,
-        update_service,
-        get_service_details,
-        backup_service,
-        list_backups,
-        restore_service,
-        delete_backup,
-        export_metadata,
-        import_metadata,
-        delete_service,
+    delete_service.trigger_rule = TriggerRule.ALL_DONE
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    (
+        create_bucket
+        >> create_service
+        >> get_service_details
+        >> update_service
+        >> import_metadata
+        >> export_metadata
+        >> delete_service
+        >> delete_bucket
     )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "teardown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git 
a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
 
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
new file mode 100644
index 0000000000..5351d9df7d
--- /dev/null
+++ 
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Airflow System Test DAG that verifies Dataproc Metastore
+operators for managing backups.
+"""
+from __future__ import annotations
+
+import datetime
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataproc_metastore import (
+    DataprocMetastoreCreateBackupOperator,
+    DataprocMetastoreCreateServiceOperator,
+    DataprocMetastoreDeleteBackupOperator,
+    DataprocMetastoreDeleteServiceOperator,
+    DataprocMetastoreListBackupsOperator,
+    DataprocMetastoreRestoreServiceOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+DAG_ID = "dataproc_metastore_backup"
+
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+
+SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-")
+BACKUP_ID = f"{DAG_ID}-backup-{ENV_ID}".replace("_", "-")
+REGION = "europe-west1"
+TIMEOUT = 1200
+# Service definition
+SERVICE = {
+    "name": "test-service",
+}
+# Backup definition
+# [START how_to_cloud_dataproc_metastore_create_backup]
+BACKUP = {
+    "name": "test-backup",
+}
+# [END how_to_cloud_dataproc_metastore_create_backup]
+
+with models.DAG(
+    DAG_ID,
+    start_date=datetime.datetime(2021, 1, 1),
+    schedule="@once",
+    catchup=False,
+    tags=["example", "dataproc", "metastore"],
+) as dag:
+    create_service = DataprocMetastoreCreateServiceOperator(
+        task_id="create_service",
+        region=REGION,
+        project_id=PROJECT_ID,
+        service=SERVICE,
+        service_id=SERVICE_ID,
+        timeout=TIMEOUT,
+    )
+    # [START how_to_cloud_dataproc_metastore_create_backup_operator]
+    backup_service = DataprocMetastoreCreateBackupOperator(
+        task_id="create_backup",
+        project_id=PROJECT_ID,
+        region=REGION,
+        service_id=SERVICE_ID,
+        backup=BACKUP,
+        backup_id=BACKUP_ID,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_create_backup_operator]
+    # [START how_to_cloud_dataproc_metastore_list_backups_operator]
+    list_backups = DataprocMetastoreListBackupsOperator(
+        task_id="list_backups",
+        project_id=PROJECT_ID,
+        region=REGION,
+        service_id=SERVICE_ID,
+    )
+    # [END how_to_cloud_dataproc_metastore_list_backups_operator]
+    # [START how_to_cloud_dataproc_metastore_delete_backup_operator]
+    delete_backup = DataprocMetastoreDeleteBackupOperator(
+        task_id="delete_backup",
+        project_id=PROJECT_ID,
+        region=REGION,
+        service_id=SERVICE_ID,
+        backup_id=BACKUP_ID,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_delete_backup_operator]
+    delete_backup.trigger_rule = TriggerRule.ALL_DONE
+    # [START how_to_cloud_dataproc_metastore_restore_service_operator]
+    restore_service = DataprocMetastoreRestoreServiceOperator(
+        task_id="restore_metastore",
+        region=REGION,
+        project_id=PROJECT_ID,
+        service_id=SERVICE_ID,
+        backup_id=BACKUP_ID,
+        backup_region=REGION,
+        backup_project_id=PROJECT_ID,
+        backup_service_id=SERVICE_ID,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_restore_service_operator]
+    delete_service = DataprocMetastoreDeleteServiceOperator(
+        task_id="delete_service",
+        region=REGION,
+        project_id=PROJECT_ID,
+        service_id=SERVICE_ID,
+        timeout=TIMEOUT,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    (create_service >> backup_service >> list_backups >> restore_service >> 
delete_backup >> delete_service)
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "teardown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to