This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f7708acab9 Fix hive_partition_sensor system test (#40023)
f7708acab9 is described below
commit f7708acab91a4728131e4a4ccabe22819cb45885
Author: Eugene <[email protected]>
AuthorDate: Mon Jun 10 10:58:50 2024 +0000
Fix hive_partition_sensor system test (#40023)
---
.../google/cloud/operators/dataproc_metastore.py | 23 ++++++++++++++++------
...ple_dataproc_metastore_hive_partition_sensor.py | 16 ++++++---------
2 files changed, 23 insertions(+), 16 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py
b/airflow/providers/google/cloud/operators/dataproc_metastore.py
index 10056cdff5..996d31fad5 100644
--- a/airflow/providers/google/cloud/operators/dataproc_metastore.py
+++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py
@@ -431,7 +431,7 @@ class
DataprocMetastoreCreateServiceOperator(GoogleCloudBaseOperator):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain
)
- self.log.info("Creating Dataproc Metastore service: %s",
self.project_id)
+ self.log.info("Creating Dataproc Metastore service: %s",
self.service_id)
try:
operation = hook.create_service(
region=self.region,
@@ -548,13 +548,24 @@ class
DataprocMetastoreDeleteBackupOperator(GoogleCloudBaseOperator):
class DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator):
"""Delete a single service.
- :param request: The request object. Request message for
-
[DataprocMetastore.DeleteService][google.cloud.metastore.v1.DataprocMetastore.DeleteService].
+ :param region: Required. The ID of the Google Cloud region that the
service belongs to.
:param project_id: Required. The ID of the Google Cloud project that the
service belongs to.
+ :param service_id: Required. The ID of the metastore service, which is
used as the final component of
+ the metastore service's name. This value must be between 2 and 63
characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of
alphanumeric ASCII characters or
+ hyphens.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as
metadata.
- :param gcp_conn_id:
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
"""
template_fields: Sequence[str] = (
@@ -589,7 +600,7 @@ class
DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain
)
- self.log.info("Deleting Dataproc Metastore service: %s",
self.project_id)
+ self.log.info("Deleting Dataproc Metastore service: %s",
self.service_id)
operation = hook.delete_service(
region=self.region,
project_id=self.project_id,
@@ -599,7 +610,7 @@ class
DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator):
metadata=self.metadata,
)
hook.wait_for_operation(self.timeout, operation)
- self.log.info("Service %s deleted successfully", self.project_id)
+ self.log.info("Service %s deleted successfully", self.service_id)
class DataprocMetastoreExportMetadataOperator(GoogleCloudBaseOperator):
diff --git
a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
index 4058a5544f..5c2be86bd8 100644
---
a/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
+++
b/tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py
@@ -16,8 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""
-Example Airflow DAG that show how to check Hive partitions existence
-using Dataproc Metastore Sensor.
+Example Airflow DAG that shows how to check Hive partitions existence with
Dataproc Metastore Sensor.
Note that Metastore service must be configured to use gRPC endpoints.
"""
@@ -47,7 +46,7 @@ from airflow.utils.trigger_rule import TriggerRule
DAG_ID = "hive_partition_sensor"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env")
-REGION = "us-central1"
+REGION = "europe-west1"
NETWORK = "default"
METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-")
@@ -60,7 +59,7 @@ METASTORE_SERVICE = {
"network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}",
}
METASTORE_SERVICE_QFN =
f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}"
-DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-")
+DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}-{ENV_ID}".replace("_", "-")
DATAPROC_CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
@@ -133,7 +132,7 @@ with DAG(
@task(task_id="get_hive_warehouse_bucket_task")
def get_hive_warehouse_bucket(**kwargs):
- """Returns Hive Metastore Warehouse GCS bucket name."""
+ """Return Hive Metastore Warehouse GCS bucket name."""
ti = kwargs["ti"]
metastore_service: dict =
ti.xcom_pull(task_ids="create_metastore_service")
config_overrides: dict =
metastore_service["hive_metastore_config"]["config_overrides"]
@@ -216,19 +215,16 @@ with DAG(
trigger_rule=TriggerRule.ALL_DONE,
)
- # TEST SETUP
(
+ # TEST SETUP
create_metastore_service
>> create_cluster
>> get_hive_warehouse_bucket_task
>> copy_source_data
>> create_external_table
>> create_partitioned_table
- >> partition_data
- )
- (
- create_metastore_service
# TEST BODY
+ >> partition_data
>> hive_partition_sensor
# TEST TEARDOWN
>> [delete_dataproc_cluster, delete_metastore_service,
delete_warehouse_bucket]