This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f7708acab9 Fix hive_partition_sensor system test (#40023)
f7708acab9 is described below

commit f7708acab91a4728131e4a4ccabe22819cb45885
Author: Eugene <[email protected]>
AuthorDate: Mon Jun 10 10:58:50 2024 +0000

    Fix hive_partition_sensor system test (#40023)
---
 .../google/cloud/operators/dataproc_metastore.py   | 23 ++++++++++++++++------
 ...ple_dataproc_metastore_hive_partition_sensor.py | 16 ++++++---------
 2 files changed, 23 insertions(+), 16 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py 
b/airflow/providers/google/cloud/operators/dataproc_metastore.py
index 10056cdff5..996d31fad5 100644
--- a/airflow/providers/google/cloud/operators/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py
@@ -431,7 +431,7 @@ class 
DataprocMetastoreCreateServiceOperator(GoogleCloudBaseOperator):
         hook = DataprocMetastoreHook(
             gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain
         )
-        self.log.info("Creating Dataproc Metastore service: %s", 
self.project_id)
+        self.log.info("Creating Dataproc Metastore service: %s", 
self.service_id)
         try:
             operation = hook.create_service(
                 region=self.region,
@@ -548,13 +548,24 @@ class 
DataprocMetastoreDeleteBackupOperator(GoogleCloudBaseOperator):
 class DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator):
     """Delete a single service.
 
-    :param request:  The request object. Request message for
-        
[DataprocMetastore.DeleteService][google.cloud.metastore.v1.DataprocMetastore.DeleteService].
+    :param region: Required. The ID of the Google Cloud region that the 
service belongs to.
     :param project_id: Required. The ID of the Google Cloud project that the 
service belongs to.
+    :param service_id:  Required. The ID of the metastore service, which is 
used as the final component of
+        the metastore service's name. This value must be between 2 and 63 
characters long inclusive, begin
+        with a letter, end with a letter or number, and consist of 
alphanumeric ASCII characters or
+        hyphens.
     :param retry: Designation of what errors, if any, should be retried.
     :param timeout: The timeout for this request.
     :param metadata: Strings which should be sent along with the request as 
metadata.
-    :param gcp_conn_id:
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
     """
 
     template_fields: Sequence[str] = (
@@ -589,7 +600,7 @@ class 
DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator):
         hook = DataprocMetastoreHook(
             gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain
         )
-        self.log.info("Deleting Dataproc Metastore service: %s", 
self.project_id)
+        self.log.info("Deleting Dataproc Metastore service: %s", 
self.service_id)
         operation = hook.delete_service(
             region=self.region,
             project_id=self.project_id,
@@ -599,7 +610,7 @@ class 
DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator):
             metadata=self.metadata,
         )
         hook.wait_for_operation(self.timeout, operation)
-        self.log.info("Service %s deleted successfully", self.project_id)
+        self.log.info("Service %s deleted successfully", self.service_id)
 
 
 class DataprocMetastoreExportMetadataOperator(GoogleCloudBaseOperator):
diff --git 
a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
 
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
index 4058a5544f..5c2be86bd8 100644
--- 
a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
+++ 
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
@@ -16,8 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example Airflow DAG that show how to check Hive partitions existence
-using Dataproc Metastore Sensor.
+Example Airflow DAG that shows how to check Hive partitions existence with 
Dataproc Metastore Sensor.
 
 Note that Metastore service must be configured to use gRPC endpoints.
 """
@@ -47,7 +46,7 @@ from airflow.utils.trigger_rule import TriggerRule
 DAG_ID = "hive_partition_sensor"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project")
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env")
-REGION = "us-central1"
+REGION = "europe-west1"
 NETWORK = "default"
 
 METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-")
@@ -60,7 +59,7 @@ METASTORE_SERVICE = {
     "network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}",
 }
 METASTORE_SERVICE_QFN = 
f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}"
-DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-")
+DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}-{ENV_ID}".replace("_", "-")
 DATAPROC_CLUSTER_CONFIG = {
     "master_config": {
         "num_instances": 1,
@@ -133,7 +132,7 @@ with DAG(
 
     @task(task_id="get_hive_warehouse_bucket_task")
     def get_hive_warehouse_bucket(**kwargs):
-        """Returns Hive Metastore Warehouse GCS bucket name."""
+        """Return Hive Metastore Warehouse GCS bucket name."""
         ti = kwargs["ti"]
         metastore_service: dict = 
ti.xcom_pull(task_ids="create_metastore_service")
         config_overrides: dict = 
metastore_service["hive_metastore_config"]["config_overrides"]
@@ -216,19 +215,16 @@ with DAG(
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    # TEST SETUP
     (
+        # TEST SETUP
         create_metastore_service
         >> create_cluster
         >> get_hive_warehouse_bucket_task
         >> copy_source_data
         >> create_external_table
         >> create_partitioned_table
-        >> partition_data
-    )
-    (
-        create_metastore_service
         # TEST BODY
+        >> partition_data
         >> hive_partition_sensor
         # TEST TEARDOWN
         >> [delete_dataproc_cluster, delete_metastore_service, 
delete_warehouse_bucket]

Reply via email to