This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 58d61826a3 Migrate Dataproc Metastore system tests according to AIP-47
(#26858)
58d61826a3 is described below
commit 58d61826a3f47a071c1f0ed4d5b8a5bd01131acb
Author: Bartłomiej Hirsz <[email protected]>
AuthorDate: Thu Oct 27 10:17:09 2022 +0200
Migrate Dataproc Metastore system tests according to AIP-47 (#26858)
---
.../google/cloud/hooks/dataproc_metastore.py | 2 +-
.../providers/google/cloud/operators/dataproc.py | 2 +-
.../google/cloud/operators/dataproc_metastore.py | 2 +-
.../operators/cloud/dataproc_metastore.rst | 28 ++--
.../operators/test_dataproc_metastore_system.py | 42 ------
.../google/cloud/dataproc_metastore/__init__.py | 16 ++
.../example_dataproc_metastore.py | 165 +++++++++------------
.../example_dataproc_metastore_backup.py | 136 +++++++++++++++++
8 files changed, 239 insertions(+), 154 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/dataproc_metastore.py
b/airflow/providers/google/cloud/hooks/dataproc_metastore.py
index 669ad12cd7..c7dcebee9f 100644
--- a/airflow/providers/google/cloud/hooks/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/hooks/dataproc_metastore.py
@@ -116,7 +116,7 @@ class DataprocMetastoreHook(GoogleBaseHook):
project_id: str,
region: str,
service_id: str,
- metadata_import: MetadataImport,
+ metadata_import: dict | MetadataImport,
metadata_import_id: str,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index b42d1245de..9242877669 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -2128,7 +2128,7 @@ class DataprocCreateBatchOperator(BaseOperator):
metadata=self.metadata,
)
- # The existing batch may be a in a number of states other than
'SUCCEEDED'
+ # The existing batch may be a number of states other than
'SUCCEEDED'
if result.state != Batch.State.SUCCEEDED:
if result.state == Batch.State.FAILED or result.state ==
Batch.State.CANCELLED:
raise AirflowException(
diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py
b/airflow/providers/google/cloud/operators/dataproc_metastore.py
index 6e6e9fcfe3..e0585805cc 100644
--- a/airflow/providers/google/cloud/operators/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py
@@ -311,7 +311,7 @@ class
DataprocMetastoreCreateMetadataImportOperator(BaseOperator):
project_id: str,
region: str,
service_id: str,
- metadata_import: MetadataImport,
+ metadata_import: dict | MetadataImport,
metadata_import_id: str,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
diff --git
a/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
b/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
index c7ff5305c9..564ddac527 100644
---
a/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
+++
b/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
@@ -33,7 +33,7 @@ For more information about the available fields to pass when
creating a service,
A simple service configuration can look as followed:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
:language: python
:dedent: 0
:start-after: [START how_to_cloud_dataproc_metastore_create_service]
@@ -42,7 +42,7 @@ A simple service configuration can look as followed:
With this configuration we can create the service:
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateServiceOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
:language: python
:dedent: 4
:start-after: [START
how_to_cloud_dataproc_metastore_create_service_operator]
@@ -55,7 +55,7 @@ To get a service you can use:
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreGetServiceOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_dataproc_metastore_get_service_operator]
@@ -69,7 +69,7 @@ For more information on updateMask and other parameters take
a look at `Dataproc
An example of a new service config and the updateMask:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
:language: python
:dedent: 0
:start-after: [START how_to_cloud_dataproc_metastore_update_service]
@@ -78,7 +78,7 @@ An example of a new service config and the updateMask:
To update a service you can use:
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreUpdateServiceOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
:language: python
:dedent: 4
:start-after: [START
how_to_cloud_dataproc_metastore_update_service_operator]
@@ -91,7 +91,7 @@ To delete a service you can use:
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteServiceOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
:language: python
:dedent: 4
:start-after: [START
how_to_cloud_dataproc_metastore_delete_service_operator]
@@ -104,7 +104,7 @@ To export metadata you can use:
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreExportMetadataOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
:language: python
:dedent: 4
:start-after: [START
how_to_cloud_dataproc_metastore_export_metadata_operator]
@@ -117,7 +117,7 @@ To restore a service you can use:
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreRestoreServiceOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
:language: python
:dedent: 4
:start-after: [START
how_to_cloud_dataproc_metastore_restore_service_operator]
@@ -131,7 +131,7 @@ For more information about the available fields to pass
when creating a metadata
A simple metadata import configuration can look as followed:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
:language: python
:dedent: 0
:start-after: [START
how_to_cloud_dataproc_metastore_create_metadata_import]
@@ -140,7 +140,7 @@ A simple metadata import configuration can look as followed:
To create a metadata import you can use:
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateMetadataImportOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
:language: python
:dedent: 4
:start-after: [START
how_to_cloud_dataproc_metastore_create_metadata_import_operator]
@@ -154,7 +154,7 @@ For more information about the available fields to pass
when creating a backup,
A simple backup configuration can look as followed:
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
:language: python
:dedent: 0
:start-after: [START how_to_cloud_dataproc_metastore_create_backup]
@@ -163,7 +163,7 @@ A simple backup configuration can look as followed:
With this configuration we can create the backup:
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateBackupOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
:language: python
:dedent: 4
:start-after: [START
how_to_cloud_dataproc_metastore_create_backup_operator]
@@ -176,7 +176,7 @@ To delete a backup you can use:
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteBackupOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
:language: python
:dedent: 4
:start-after: [START
how_to_cloud_dataproc_metastore_delete_backup_operator]
@@ -189,7 +189,7 @@ To list backups you can use:
:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreListBackupsOperator`
-.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_dataproc_metastore_list_backups_operator]
diff --git
a/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py
b/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py
deleted file mode 100644
index 2db8f62223..0000000000
--- a/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_dataproc_metastore
import BUCKET
-from tests.providers.google.cloud.utils.gcp_authenticator import
GCP_DATAPROC_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER,
GoogleSystemTest, provide_gcp_context
-
-
[email protected]("mysql", "postgres")
[email protected]_file(GCP_DATAPROC_KEY)
-class DataprocMetastoreExampleDagsTest(GoogleSystemTest):
- @provide_gcp_context(GCP_DATAPROC_KEY)
- def setUp(self):
- super().setUp()
- self.create_gcs_bucket(BUCKET)
-
- @provide_gcp_context(GCP_DATAPROC_KEY)
- def tearDown(self):
- self.delete_gcs_bucket(BUCKET)
- super().tearDown()
-
- @provide_gcp_context(GCP_DATAPROC_KEY)
- def test_run_example_dag(self):
- self.run_dag(dag_id="example_gcp_dataproc_metastore",
dag_folder=CLOUD_DAG_FOLDER)
diff --git a/tests/system/providers/google/cloud/dataproc_metastore/__init__.py
b/tests/system/providers/google/cloud/dataproc_metastore/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataproc_metastore/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
similarity index 58%
rename from
airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
rename to
tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
index c1208510a2..1b6341a73a 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+++
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py
@@ -23,36 +23,39 @@ from __future__ import annotations
import datetime
import os
+from pathlib import Path
-from google.cloud.metastore_v1 import MetadataImport
from google.protobuf.field_mask_pb2 import FieldMask
from airflow import models
-from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataproc_metastore import (
- DataprocMetastoreCreateBackupOperator,
DataprocMetastoreCreateMetadataImportOperator,
DataprocMetastoreCreateServiceOperator,
- DataprocMetastoreDeleteBackupOperator,
DataprocMetastoreDeleteServiceOperator,
DataprocMetastoreExportMetadataOperator,
DataprocMetastoreGetServiceOperator,
- DataprocMetastoreListBackupsOperator,
- DataprocMetastoreRestoreServiceOperator,
DataprocMetastoreUpdateServiceOperator,
)
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<PROJECT_ID>")
-SERVICE_ID = os.environ.get("GCP_DATAPROC_METASTORE_SERVICE_ID",
"dataproc-metastore-system-tests-service-1")
-BACKUP_ID = os.environ.get("GCP_DATAPROC_METASTORE_BACKUP_ID",
"dataproc-metastore-system-tests-backup-1")
-REGION = os.environ.get("GCP_REGION", "<REGION>")
-BUCKET = os.environ.get("GCP_DATAPROC_METASTORE_BUCKET", "INVALID BUCKET NAME")
-METADATA_IMPORT_FILE = os.environ.get("GCS_METADATA_IMPORT_FILE", None)
-GCS_URI = os.environ.get("GCS_URI", f"gs://{BUCKET}/data/hive.sql")
-METADATA_IMPORT_ID = "dataproc-metastore-system-tests-metadata-import-1"
-TIMEOUT = 1200
+DAG_ID = "dataproc_metastore"
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+
+SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-")
+METADATA_IMPORT_ID = f"{DAG_ID}-metadata-{ENV_ID}".replace("_", "-")
+
+REGION = "europe-west1"
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+TIMEOUT = 2400
DB_TYPE = "MYSQL"
-DESTINATION_GCS_FOLDER = f"gs://{BUCKET}/>"
+DESTINATION_GCS_FOLDER = f"gs://{BUCKET_NAME}/>"
+
+HIVE_FILE_SRC = str(Path(__file__).parent.parent / "dataproc" / "resources" /
"hive.sql")
+HIVE_FILE = "data/hive.sql"
+GCS_URI = f"gs://{BUCKET_NAME}/data/hive.sql"
# Service definition
# Docs:
https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services#Service
@@ -62,6 +65,16 @@ SERVICE = {
}
# [END how_to_cloud_dataproc_metastore_create_service]
+# [START how_to_cloud_dataproc_metastore_create_metadata_import]
+METADATA_IMPORT = {
+ "name": "test-metadata-import",
+ "database_dump": {
+ "gcs_uri": GCS_URI,
+ "database_type": DB_TYPE,
+ },
+}
+# [END how_to_cloud_dataproc_metastore_create_metadata_import]
+
# Update service
# [START how_to_cloud_dataproc_metastore_update_service]
SERVICE_TO_UPDATE = {
@@ -73,31 +86,24 @@ SERVICE_TO_UPDATE = {
UPDATE_MASK = FieldMask(paths=["labels"])
# [END how_to_cloud_dataproc_metastore_update_service]
-# Backup definition
-# [START how_to_cloud_dataproc_metastore_create_backup]
-BACKUP = {
- "name": "test-backup",
-}
-# [END how_to_cloud_dataproc_metastore_create_backup]
-
-# Metadata import definition
-# [START how_to_cloud_dataproc_metastore_create_metadata_import]
-METADATA_IMPORT = MetadataImport(
- {
- "name": "test-metadata-import",
- "database_dump": {
- "gcs_uri": GCS_URI,
- "database_type": DB_TYPE,
- },
- }
-)
-# [END how_to_cloud_dataproc_metastore_create_metadata_import]
-
-
with models.DAG(
- dag_id="example_gcp_dataproc_metastore",
+ DAG_ID,
start_date=datetime.datetime(2021, 1, 1),
+ schedule="@once",
+ catchup=False,
+ tags=["example", "dataproc", "metastore"],
) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+ )
+
+ upload_file = LocalFilesystemToGCSOperator(
+ task_id="upload_file",
+ src=HIVE_FILE_SRC,
+ dst=HIVE_FILE,
+ bucket=BUCKET_NAME,
+ )
+
# [START how_to_cloud_dataproc_metastore_create_service_operator]
create_service = DataprocMetastoreCreateServiceOperator(
task_id="create_service",
@@ -153,52 +159,6 @@ with models.DAG(
)
# [END how_to_cloud_dataproc_metastore_export_metadata_operator]
- # [START how_to_cloud_dataproc_metastore_create_backup_operator]
- backup_service = DataprocMetastoreCreateBackupOperator(
- task_id="create_backup",
- project_id=PROJECT_ID,
- region=REGION,
- service_id=SERVICE_ID,
- backup=BACKUP,
- backup_id=BACKUP_ID,
- timeout=TIMEOUT,
- )
- # [END how_to_cloud_dataproc_metastore_create_backup_operator]
-
- # [START how_to_cloud_dataproc_metastore_list_backups_operator]
- list_backups = DataprocMetastoreListBackupsOperator(
- task_id="list_backups",
- project_id=PROJECT_ID,
- region=REGION,
- service_id=SERVICE_ID,
- )
- # [END how_to_cloud_dataproc_metastore_list_backups_operator]
-
- # [START how_to_cloud_dataproc_metastore_delete_backup_operator]
- delete_backup = DataprocMetastoreDeleteBackupOperator(
- task_id="delete_backup",
- project_id=PROJECT_ID,
- region=REGION,
- service_id=SERVICE_ID,
- backup_id=BACKUP_ID,
- timeout=TIMEOUT,
- )
- # [END how_to_cloud_dataproc_metastore_delete_backup_operator]
-
- # [START how_to_cloud_dataproc_metastore_restore_service_operator]
- restore_service = DataprocMetastoreRestoreServiceOperator(
- task_id="restore_metastore",
- region=REGION,
- project_id=PROJECT_ID,
- service_id=SERVICE_ID,
- backup_id=BACKUP_ID,
- backup_region=REGION,
- backup_project_id=PROJECT_ID,
- backup_service_id=SERVICE_ID,
- timeout=TIMEOUT,
- )
- # [END how_to_cloud_dataproc_metastore_restore_service_operator]
-
# [START how_to_cloud_dataproc_metastore_delete_service_operator]
delete_service = DataprocMetastoreDeleteServiceOperator(
task_id="delete_service",
@@ -209,15 +169,30 @@ with models.DAG(
)
# [END how_to_cloud_dataproc_metastore_delete_service_operator]
- chain(
- create_service,
- update_service,
- get_service_details,
- backup_service,
- list_backups,
- restore_service,
- delete_backup,
- export_metadata,
- import_metadata,
- delete_service,
+ delete_service.trigger_rule = TriggerRule.ALL_DONE
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ create_bucket
+ >> create_service
+ >> get_service_details
+ >> update_service
+ >> import_metadata
+ >> export_metadata
+ >> delete_service
+ >> delete_bucket
)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "teardown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git
a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
new file mode 100644
index 0000000000..5351d9df7d
--- /dev/null
+++
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Airflow System Test DAG that verifies Dataproc Metastore
+operators for managing backups.
+"""
+from __future__ import annotations
+
+import datetime
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataproc_metastore import (
+ DataprocMetastoreCreateBackupOperator,
+ DataprocMetastoreCreateServiceOperator,
+ DataprocMetastoreDeleteBackupOperator,
+ DataprocMetastoreDeleteServiceOperator,
+ DataprocMetastoreListBackupsOperator,
+ DataprocMetastoreRestoreServiceOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+DAG_ID = "dataproc_metastore_backup"
+
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+
+SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-")
+BACKUP_ID = f"{DAG_ID}-backup-{ENV_ID}".replace("_", "-")
+REGION = "europe-west1"
+TIMEOUT = 1200
+# Service definition
+SERVICE = {
+ "name": "test-service",
+}
+# Backup definition
+# [START how_to_cloud_dataproc_metastore_create_backup]
+BACKUP = {
+ "name": "test-backup",
+}
+# [END how_to_cloud_dataproc_metastore_create_backup]
+
+with models.DAG(
+ DAG_ID,
+ start_date=datetime.datetime(2021, 1, 1),
+ schedule="@once",
+ catchup=False,
+ tags=["example", "dataproc", "metastore"],
+) as dag:
+ create_service = DataprocMetastoreCreateServiceOperator(
+ task_id="create_service",
+ region=REGION,
+ project_id=PROJECT_ID,
+ service=SERVICE,
+ service_id=SERVICE_ID,
+ timeout=TIMEOUT,
+ )
+ # [START how_to_cloud_dataproc_metastore_create_backup_operator]
+ backup_service = DataprocMetastoreCreateBackupOperator(
+ task_id="create_backup",
+ project_id=PROJECT_ID,
+ region=REGION,
+ service_id=SERVICE_ID,
+ backup=BACKUP,
+ backup_id=BACKUP_ID,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_create_backup_operator]
+ # [START how_to_cloud_dataproc_metastore_list_backups_operator]
+ list_backups = DataprocMetastoreListBackupsOperator(
+ task_id="list_backups",
+ project_id=PROJECT_ID,
+ region=REGION,
+ service_id=SERVICE_ID,
+ )
+ # [END how_to_cloud_dataproc_metastore_list_backups_operator]
+ # [START how_to_cloud_dataproc_metastore_delete_backup_operator]
+ delete_backup = DataprocMetastoreDeleteBackupOperator(
+ task_id="delete_backup",
+ project_id=PROJECT_ID,
+ region=REGION,
+ service_id=SERVICE_ID,
+ backup_id=BACKUP_ID,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_delete_backup_operator]
+ delete_backup.trigger_rule = TriggerRule.ALL_DONE
+ # [START how_to_cloud_dataproc_metastore_restore_service_operator]
+ restore_service = DataprocMetastoreRestoreServiceOperator(
+ task_id="restore_metastore",
+ region=REGION,
+ project_id=PROJECT_ID,
+ service_id=SERVICE_ID,
+ backup_id=BACKUP_ID,
+ backup_region=REGION,
+ backup_project_id=PROJECT_ID,
+ backup_service_id=SERVICE_ID,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_restore_service_operator]
+ delete_service = DataprocMetastoreDeleteServiceOperator(
+ task_id="delete_service",
+ region=REGION,
+ project_id=PROJECT_ID,
+ service_id=SERVICE_ID,
+ timeout=TIMEOUT,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ (create_service >> backup_service >> list_backups >> restore_service >>
delete_backup >> delete_service)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "teardown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)