This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ce5eebd004 Fix system test for MetastoreHivePartitionSensor (#32861)
ce5eebd004 is described below
commit ce5eebd00403beabc23b4f0b4bedba5b5c397c42
Author: max <[email protected]>
AuthorDate: Mon Jul 31 10:46:26 2023 +0200
Fix system test for MetastoreHivePartitionSensor (#32861)
---
airflow/providers/google/provider.yaml | 2 +-
generated/provider_dependencies.json | 2 +-
...ple_dataproc_metastore_hive_partition_sensor.py | 200 +++++++++++++++++++--
3 files changed, 190 insertions(+), 14 deletions(-)
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index 7d4805925c..1b96ca972a 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -94,7 +94,7 @@ dependencies:
- google-cloud-dataform>=0.5.0
- google-cloud-dataplex>=1.4.2
- google-cloud-dataproc>=5.4.0
- - google-cloud-dataproc-metastore>=1.10.0
+ - google-cloud-dataproc-metastore>=1.12.0
- google-cloud-dlp>=3.12.0
- google-cloud-kms>=2.15.0
- google-cloud-language>=2.9.0
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index fadffea64a..4b2fc9a639 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -426,7 +426,7 @@
"google-cloud-dataflow-client>=0.8.2",
"google-cloud-dataform>=0.5.0",
"google-cloud-dataplex>=1.4.2",
- "google-cloud-dataproc-metastore>=1.10.0",
+ "google-cloud-dataproc-metastore>=1.12.0",
"google-cloud-dataproc>=5.4.0",
"google-cloud-dlp>=3.12.0",
"google-cloud-kms>=2.15.0",
diff --git
a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
index bd5b9a3736..134caff6db 100644
---
a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
+++
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
@@ -19,7 +19,7 @@
Example Airflow DAG that show how to check Hive partitions existence
using Dataproc Metastore Sensor.
-Note that Metastore service must be configured to use gRPC endpoints,
+Note that Metastore service must be configured to use gRPC endpoints.
"""
from __future__ import annotations
@@ -27,37 +27,213 @@ import datetime
import os
from airflow import models
+from airflow.decorators import task
+from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
+from airflow.providers.google.cloud.operators.dataproc import (
+ DataprocCreateClusterOperator,
+ DataprocDeleteClusterOperator,
+ DataprocSubmitJobOperator,
+)
+from airflow.providers.google.cloud.operators.dataproc_metastore import (
+ DataprocMetastoreCreateServiceOperator,
+ DataprocMetastoreDeleteServiceOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import
GCSDeleteBucketOperator
from airflow.providers.google.cloud.sensors.dataproc_metastore import
MetastoreHivePartitionSensor
+from airflow.providers.google.cloud.transfers.gcs_to_gcs import
GCSToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
-DAG_ID = "dataproc_metastore_hive_partition_sensor"
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "hive_partition_sensor"
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env")
+REGION = "us-central1"
+NETWORK = "default"
-SERVICE_ID = f"{DAG_ID}-service-{ENV_ID}".replace("_", "-")
-REGION = "europe-west1"
-TABLE_NAME = "test_table"
-PARTITION_1 = "column1=value1"
-PARTITION_2 = "column2=value2/column3=value3"
+METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-")
+METASTORE_TIMEOUT = 2400
+METASTORE_SERVICE = {
+ "name": METASTORE_SERVICE_ID,
+ "hive_metastore_config": {
+ "endpoint_protocol": "GRPC",
+ },
+ "network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}",
+}
+METASTORE_SERVICE_QFN =
f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}"
+DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-")
+DATAPROC_CLUSTER_CONFIG = {
+ "master_config": {
+ "num_instances": 1,
+ "machine_type_uri": "n1-standard-2",
+ "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb":
1024},
+ },
+ "worker_config": {
+ "num_instances": 2,
+ "machine_type_uri": "n1-standard-2",
+ "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb":
1024},
+ },
+ "metastore_config": {
+ "dataproc_metastore_service": METASTORE_SERVICE_QFN,
+ },
+ "gce_cluster_config": {
+ "service_account_scopes": [
+ "https://www.googleapis.com/auth/cloud-platform",
+ ],
+ },
+}
+TABLE_NAME = "transactions_partitioned"
+COLUMN = "TransactionType"
+PARTITION_1 = f"{COLUMN}=credit".lower()
+PARTITION_2 = f"{COLUMN}=debit".lower()
+SOURCE_DATA_BUCKET = "airflow-system-tests-resources"
+SOURCE_DATA_PATH = "dataproc/hive"
+SOURCE_DATA_FILE_NAME = "part-00000.parquet"
+EXTERNAL_TABLE_BUCKET =
"{{task_instance.xcom_pull(task_ids='get_hive_warehouse_bucket_task',
key='bucket')}}"
+QUERY_CREATE_EXTERNAL_TABLE = f"""
+CREATE EXTERNAL TABLE IF NOT EXISTS transactions
+(SubmissionDate DATE, TransactionAmount DOUBLE, TransactionType STRING)
+STORED AS PARQUET
+LOCATION 'gs://{EXTERNAL_TABLE_BUCKET}/{SOURCE_DATA_PATH}';
+"""
+QUERY_CREATE_PARTITIONED_TABLE = f"""
+CREATE EXTERNAL TABLE IF NOT EXISTS {TABLE_NAME}
+(SubmissionDate DATE, TransactionAmount DOUBLE)
+PARTITIONED BY ({COLUMN} STRING);
+"""
+QUERY_COPY_DATA_WITH_PARTITIONS = f"""
+SET hive.exec.dynamic.partition.mode=nonstrict;
+INSERT INTO TABLE {TABLE_NAME} PARTITION ({COLUMN})
+SELECT SubmissionDate,TransactionAmount,TransactionType FROM transactions;
+"""
with models.DAG(
DAG_ID,
start_date=datetime.datetime(2021, 1, 1),
schedule="@once",
catchup=False,
- tags=["example", "dataproc", "metastore"],
+ tags=["example", "dataproc", "metastore", "partition", "hive", "sensor"],
) as dag:
+ create_metastore_service = DataprocMetastoreCreateServiceOperator(
+ task_id="create_metastore_service",
+ region=REGION,
+ project_id=PROJECT_ID,
+ service=METASTORE_SERVICE,
+ service_id=METASTORE_SERVICE_ID,
+ timeout=METASTORE_TIMEOUT,
+ )
+
+ create_cluster = DataprocCreateClusterOperator(
+ task_id="create_cluster",
+ cluster_name=DATAPROC_CLUSTER_NAME,
+ project_id=PROJECT_ID,
+ cluster_config=DATAPROC_CLUSTER_CONFIG,
+ region=REGION,
+ )
+
+ @task(task_id="get_hive_warehouse_bucket_task")
+ def get_hive_warehouse_bucket(**kwargs):
+ """Returns Hive Metastore Warehouse GCS bucket name."""
+ ti = kwargs["ti"]
+ metastore_service: dict =
ti.xcom_pull(task_ids="create_metastore_service")
+ config_overrides: dict =
metastore_service["hive_metastore_config"]["config_overrides"]
+ destination_dir: str = config_overrides["hive.metastore.warehouse.dir"]
+ bucket, _ = _parse_gcs_url(destination_dir)
+ ti.xcom_push(key="bucket", value=bucket)
+
+ get_hive_warehouse_bucket_task = get_hive_warehouse_bucket()
+
+ copy_source_data = GCSToGCSOperator(
+ task_id="copy_source_data",
+ source_bucket=SOURCE_DATA_BUCKET,
+ source_object=f"{SOURCE_DATA_PATH}/{SOURCE_DATA_FILE_NAME}",
+ destination_bucket=EXTERNAL_TABLE_BUCKET,
+ destination_object=f"{SOURCE_DATA_PATH}/{SOURCE_DATA_FILE_NAME}",
+ )
+
+ create_external_table = DataprocSubmitJobOperator(
+ task_id="create_external_table",
+ job={
+ "reference": {"project_id": PROJECT_ID},
+ "placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
+ "hive_job": {"query_list": {"queries":
[QUERY_CREATE_EXTERNAL_TABLE]}},
+ },
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+
+ create_partitioned_table = DataprocSubmitJobOperator(
+ task_id="create_partitioned_table",
+ job={
+ "reference": {"project_id": PROJECT_ID},
+ "placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
+ "hive_job": {"query_list": {"queries":
[QUERY_CREATE_PARTITIONED_TABLE]}},
+ },
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+
+ partition_data = DataprocSubmitJobOperator(
+ task_id="partition_data",
+ job={
+ "reference": {"project_id": PROJECT_ID},
+ "placement": {"cluster_name": DATAPROC_CLUSTER_NAME},
+ "hive_job": {"query_list": {"queries":
[QUERY_COPY_DATA_WITH_PARTITIONS]}},
+ },
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+
# [START how_to_cloud_dataproc_metastore_hive_partition_sensor]
- sensor = MetastoreHivePartitionSensor(
+ hive_partition_sensor = MetastoreHivePartitionSensor(
task_id="hive_partition_sensor",
- service_id=SERVICE_ID,
+ service_id=METASTORE_SERVICE_ID,
region=REGION,
table=TABLE_NAME,
partitions=[PARTITION_1, PARTITION_2],
)
# [END how_to_cloud_dataproc_metastore_hive_partition_sensor]
+ delete_dataproc_cluster = DataprocDeleteClusterOperator(
+ task_id="delete_dataproc_cluster",
+ cluster_name=DATAPROC_CLUSTER_NAME,
+ project_id=PROJECT_ID,
+ region=REGION,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_metastore_service = DataprocMetastoreDeleteServiceOperator(
+ task_id="delete_metastore_service",
+ service_id=METASTORE_SERVICE_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ delete_warehouse_bucket = GCSDeleteBucketOperator(
+ task_id="delete_warehouse_bucket",
+ bucket_name=EXTERNAL_TABLE_BUCKET,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ # TEST SETUP
+ (
+ create_metastore_service
+ >> create_cluster
+ >> get_hive_warehouse_bucket_task
+ >> copy_source_data
+ >> create_external_table
+ >> create_partitioned_table
+ >> partition_data
+ )
+ (
+ create_metastore_service
+ # TEST BODY
+ >> hive_partition_sensor
+ # TEST TEARDOWN
+ >> [delete_dataproc_cluster, delete_metastore_service,
delete_warehouse_bucket]
+ )
+
from tests.system.utils.watcher import watcher
# This test needs watcher in order to properly mark success/failure