This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ce5eebd004 Fix system test for MetastoreHivePartitionSensor (#32861)
ce5eebd004 is described below

commit ce5eebd00403beabc23b4f0b4bedba5b5c397c42
Author: max <[email protected]>
AuthorDate: Mon Jul 31 10:46:26 2023 +0200

    Fix system test for MetastoreHivePartitionSensor (#32861)
---
 airflow/providers/google/provider.yaml             |   2 +-
 generated/provider_dependencies.json               |   2 +-
 ...ple_dataproc_metastore_hive_partition_sensor.py | 200 +++++++++++++++++++--
 3 files changed, 190 insertions(+), 14 deletions(-)

diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index 7d4805925c..1b96ca972a 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -94,7 +94,7 @@ dependencies:
   - google-cloud-dataform>=0.5.0
   - google-cloud-dataplex>=1.4.2
   - google-cloud-dataproc>=5.4.0
-  - google-cloud-dataproc-metastore>=1.10.0
+  - google-cloud-dataproc-metastore>=1.12.0
   - google-cloud-dlp>=3.12.0
   - google-cloud-kms>=2.15.0
   - google-cloud-language>=2.9.0
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index fadffea64a..4b2fc9a639 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -426,7 +426,7 @@
       "google-cloud-dataflow-client>=0.8.2",
       "google-cloud-dataform>=0.5.0",
       "google-cloud-dataplex>=1.4.2",
-      "google-cloud-dataproc-metastore>=1.10.0",
+      "google-cloud-dataproc-metastore>=1.12.0",
       "google-cloud-dataproc>=5.4.0",
       "google-cloud-dlp>=3.12.0",
       "google-cloud-kms>=2.15.0",
diff --git 
a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
 
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
index bd5b9a3736..134caff6db 100644
--- 
a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
+++ 
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
@@ -19,7 +19,7 @@
 Example Airflow DAG that show how to check Hive partitions existence
 using Dataproc Metastore Sensor.
 
-Note that Metastore service must be configured to use gRPC endpoints,
+Note that Metastore service must be configured to use gRPC endpoints.
 """
 from __future__ import annotations
 
@@ -27,37 +27,213 @@ import datetime
 import os
 
 from airflow import models
+from airflow.decorators import task
+from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
+from airflow.providers.google.cloud.operators.dataproc import (
+    DataprocCreateClusterOperator,
+    DataprocDeleteClusterOperator,
+    DataprocSubmitJobOperator,
+)
+from airflow.providers.google.cloud.operators.dataproc_metastore import (
+    DataprocMetastoreCreateServiceOperator,
+    DataprocMetastoreDeleteServiceOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import 
GCSDeleteBucketOperator
 from airflow.providers.google.cloud.sensors.dataproc_metastore import 
MetastoreHivePartitionSensor
+from airflow.providers.google.cloud.transfers.gcs_to_gcs import 
GCSToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
 
-DAG_ID = "dataproc_metastore_hive_partition_sensor"
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "hive_partition_sensor"
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env")
+REGION = "us-central1"
+NETWORK = "default"
 
-SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-")
-REGION = "europe-west1"
-TABLE_NAME = "test_table"
-PARTITION_1 = "column1=value1"
-PARTITION_2 = "column2=value2/column3=value3"
+METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-")
+METASTORE_TIMEOUT = 2400
+METASTORE_SERVICE = {
+    "name": METASTORE_SERVICE_ID,
+    "hive_metastore_config": {
+        "endpoint_protocol": "GRPC",
+    },
+    "network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}",
+}
+METASTORE_SERVICE_QFN = 
f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}"
+DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-")
+DATAPROC_CLUSTER_CONFIG = {
+    "master_config": {
+        "num_instances": 1,
+        "machine_type_uri": "n1-standard-2",
+        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 
1024},
+    },
+    "worker_config": {
+        "num_instances": 2,
+        "machine_type_uri": "n1-standard-2",
+        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 
1024},
+    },
+    "metastore_config": {
+        "dataproc_metastore_service": METASTORE_SERVICE_QFN,
+    },
+    "gce_cluster_config": {
+        "service_account_scopes": [
+            "https://www.googleapis.com/auth/cloud-platform";,
+        ],
+    },
+}
 
+TABLE_NAME = "transactions_partitioned"
+COLUMN = "TransactionType"
+PARTITION_1 = f"{COLUMN}=credit".lower()
+PARTITION_2 = f"{COLUMN}=debit".lower()
+SOURCE_DATA_BUCKET = "airflow-system-tests-resources"
+SOURCE_DATA_PATH = "dataproc/hive"
+SOURCE_DATA_FILE_NAME = "part-00000.parquet"
+EXTERNAL_TABLE_BUCKET = 
"{{task_instance.xcom_pull(task_ids='get_hive_warehouse_bucket_task', 
key='bucket')}}"
+QUERY_CREATE_EXTERNAL_TABLE = f"""
+CREATE EXTERNAL TABLE IF NOT EXISTS transactions
+(SubmissionDate DATE, TransactionAmount DOUBLE, TransactionType STRING)
+STORED AS PARQUET
+LOCATION 'gs://{EXTERNAL_TABLE_BUCKET}/{SOURCE_DATA_PATH}';
+"""
+QUERY_CREATE_PARTITIONED_TABLE = f"""
+CREATE EXTERNAL TABLE IF NOT EXISTS {TABLE_NAME}
+(SubmissionDate DATE, TransactionAmount DOUBLE)
+PARTITIONED BY ({COLUMN} STRING);
+"""
+QUERY_COPY_DATA_WITH_PARTITIONS = f"""
+SET hive.exec.dynamic.partition.mode=nonstrict;
+INSERT INTO TABLE {TABLE_NAME} PARTITION ({COLUMN})
+SELECT SubmissionDate,TransactionAmount,TransactionType FROM transactions;
+"""
 
 with models.DAG(
     DAG_ID,
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@once",
     catchup=False,
-    tags=["example", "dataproc", "metastore"],
+    tags=["example", "dataproc", "metastore", "partition", "hive", "sensor"],
 ) as dag:
 
+    create_metastore_service = DataprocMetastoreCreateServiceOperator(
+        task_id="create_metastore_service",
+        region=REGION,
+        project_id=PROJECT_ID,
+        service=METASTORE_SERVICE,
+        service_id=METASTORE_SERVICE_ID,
+        timeout=METASTORE_TIMEOUT,
+    )
+
+    create_cluster = DataprocCreateClusterOperator(
+        task_id="create_cluster",
+        cluster_name=DATAPROC_CLUSTER_NAME,
+        project_id=PROJECT_ID,
+        cluster_config=DATAPROC_CLUSTER_CONFIG,
+        region=REGION,
+    )
+
+    @task(task_id="get_hive_warehouse_bucket_task")
+    def get_hive_warehouse_bucket(**kwargs):
+        """Returns Hive Metastore Warehouse GCS bucket name."""
+        ti = kwargs["ti"]
+        metastore_service: dict = 
ti.xcom_pull(task_ids="create_metastore_service")
+        config_overrides: dict = 
metastore_service["hive_metastore_config"]["config_overrides"]
+        destination_dir: str = config_overrides["hive.metastore.warehouse.dir"]
+        bucket, _ = _parse_gcs_url(destination_dir)
+        ti.xcom_push(key="bucket", value=bucket)
+
+    get_hive_warehouse_bucket_task = get_hive_warehouse_bucket()
+
+    copy_source_data = GCSToGCSOperator(
+        task_id="copy_source_data",
+        source_bucket=SOURCE_DATA_BUCKET,
+        source_object=f"{SOURCE_DATA_PATH}/{SOURCE_DATA_FILE_NAME}",
+        destination_bucket=EXTERNAL_TABLE_BUCKET,
+        destination_object=f"{SOURCE_DATA_PATH}/{SOURCE_DATA_FILE_NAME}",
+    )
+
+    create_external_table = DataprocSubmitJobOperator(
+        task_id="create_external_table",
+        job={
+            "reference": {"project_id": PROJECT_ID},
+            "placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
+            "hive_job": {"query_list": {"queries": 
[QUERY_CREATE_EXTERNAL_TABLE]}},
+        },
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+
+    create_partitioned_table = DataprocSubmitJobOperator(
+        task_id="create_partitioned_table",
+        job={
+            "reference": {"project_id": PROJECT_ID},
+            "placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
+            "hive_job": {"query_list": {"queries": 
[QUERY_CREATE_PARTITIONED_TABLE]}},
+        },
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+
+    partition_data = DataprocSubmitJobOperator(
+        task_id="partition_data",
+        job={
+            "reference": {"project_id": PROJECT_ID},
+            "placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
+            "hive_job": {"query_list": {"queries": 
[QUERY_COPY_DATA_WITH_PARTITIONS]}},
+        },
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+
     # [START how_to_cloud_dataproc_metastore_hive_partition_sensor]
-    sensor = MetastoreHivePartitionSensor(
+    hive_partition_sensor = MetastoreHivePartitionSensor(
         task_id="hive_partition_sensor",
-        service_id=SERVICE_ID,
+        service_id=METASTORE_SERVICE_ID,
         region=REGION,
         table=TABLE_NAME,
         partitions=[PARTITION_1, PARTITION_2],
     )
     # [END how_to_cloud_dataproc_metastore_hive_partition_sensor]
 
+    delete_dataproc_cluster = DataprocDeleteClusterOperator(
+        task_id="delete_dataproc_cluster",
+        cluster_name=DATAPROC_CLUSTER_NAME,
+        project_id=PROJECT_ID,
+        region=REGION,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_metastore_service = DataprocMetastoreDeleteServiceOperator(
+        task_id="delete_metastore_service",
+        service_id=METASTORE_SERVICE_ID,
+        project_id=PROJECT_ID,
+        region=REGION,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    delete_warehouse_bucket = GCSDeleteBucketOperator(
+        task_id="delete_warehouse_bucket",
+        bucket_name=EXTERNAL_TABLE_BUCKET,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    # TEST SETUP
+    (
+        create_metastore_service
+        >> create_cluster
+        >> get_hive_warehouse_bucket_task
+        >> copy_source_data
+        >> create_external_table
+        >> create_partitioned_table
+        >> partition_data
+    )
+    (
+        create_metastore_service
+        # TEST BODY
+        >> hive_partition_sensor
+        # TEST TEARDOWN
+        >> [delete_dataproc_cluster, delete_metastore_service, 
delete_warehouse_bucket]
+    )
+
     from tests.system.utils.watcher import watcher
 
     # This test needs watcher in order to properly mark success/failure

Reply via email to