VladaZakharova commented on code in PR #32256: URL: https://github.com/apache/airflow/pull/32256#discussion_r1275145442
########## tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py: ########## @@ -0,0 +1,308 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG that shows how to use Dataplex Scan Data. +""" +from __future__ import annotations + +import os +from datetime import datetime + +from google.cloud import dataplex_v1 + +from airflow import models +from airflow.models.baseoperator import chain +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + BigQueryCreateEmptyTableOperator, + BigQueryInsertJobOperator, +) +from airflow.providers.google.cloud.operators.dataplex import ( + DataplexCreateAssetOperator, + DataplexCreateDataQualityScanOperator, + DataplexCreateLakeOperator, + DataplexCreateZoneOperator, + DataplexDeleteAssetOperator, + DataplexDeleteDataQualityOperator, + DataplexDeleteLakeOperator, + DataplexDeleteZoneOperator, + DataplexExecuteDataQualityScanOperator, + DataplexGetDataQualityScanResultOperator, + DataplexRunDataQualityScanOperator, +) +from airflow.providers.google.cloud.sensors.dataplex import DataplexDataQualityJobStatusSensor +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "example_dataplex_data_quality" + +LAKE_ID = f"test-lake-{ENV_ID}" +REGION = "us-central1" + +DATASET_NAME = f"dataset_bq_{ENV_ID}" + +TABLE_1 = "table0" +TABLE_2 = "table1" + +SCHEMA = [ + {"name": "value", "type": "INTEGER", "mode": "REQUIRED"}, + {"name": "name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "dt", "type": "STRING", "mode": "NULLABLE"}, +] + +DATASET = DATASET_NAME +INSERT_DATE = datetime.now().strftime("%Y-%m-%d") +INSERT_ROWS_QUERY = f"INSERT {DATASET}.{TABLE_1} VALUES (1, 'test test2', '{INSERT_DATE}');" +LOCATION = "us" + +TRIGGER_SPEC_TYPE = "ON_DEMAND" + +ZONE_ID = "test-zone-id" +DATA_SCAN_ID = "test-data-scan-id" + +EXAMPLE_LAKE_BODY = { + "display_name": "test_display_name", + "labels": [], + "description": "test_description", + "metastore": {"service": ""}, +} + +# [START howto_dataplex_zone_configuration] +EXAMPLE_ZONE = { + "type_": "RAW", + "resource_spec": {"location_type": "SINGLE_REGION"}, +} +# [END howto_dataplex_zone_configuration] + +ASSET_ID = "test-asset-id" + +# [START howto_dataplex_asset_configuration] +EXAMPLE_ASSET = { + "resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET_NAME}", "type_": "BIGQUERY_DATASET"}, + "discovery_spec": {"enabled": True}, +} +# [END howto_dataplex_asset_configuration] + +# [START howto_dataplex_data_quality_configuration] +EXAMPLE_DATA_SCAN = dataplex_v1.DataScan() +EXAMPLE_DATA_SCAN.data.entity = ( + f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}" +) +EXAMPLE_DATA_SCAN.data.resource = ( + f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}" +) +EXAMPLE_DATA_SCAN.data_quality_spec = { + "rules": [ + { + "range_expectation": { + "min_value": "0", + "max_value": "10000", + }, + "column": "value", + "dimension": "VALIDITY", + } + ], +} +# [END howto_dataplex_data_quality_configuration] + + +with models.DAG( + DAG_ID, + start_date=datetime(2021, 1, 1), + schedule="@once", + tags=["example", "dataplex", "data_quality"], +) as dag: + create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME) + create_table_1 = BigQueryCreateEmptyTableOperator( + task_id="create_table_1", + dataset_id=DATASET_NAME, + table_id=TABLE_1, + schema_fields=SCHEMA, + location=LOCATION, + ) + create_table_2 = BigQueryCreateEmptyTableOperator( + task_id="create_table_2", + dataset_id=DATASET_NAME, + table_id=TABLE_2, + schema_fields=SCHEMA, + location=LOCATION, + ) + insert_query_job = BigQueryInsertJobOperator( + task_id="insert_query_job", + configuration={ + "query": { + "query": INSERT_ROWS_QUERY, + "useLegacySql": False, + } + }, + ) + create_lake = DataplexCreateLakeOperator( + task_id="create_lake", project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID + ) + # [START howto_dataplex_create_zone_operator] + create_zone = DataplexCreateZoneOperator( + task_id="create_zone", + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + zone=EXAMPLE_ZONE, + zone_id=ZONE_ID, + ) + # [END howto_dataplex_create_zone_operator] + # [START howto_dataplex_create_asset_operator] + create_asset = DataplexCreateAssetOperator( + task_id="create_asset", + project_id=PROJECT_ID, + region=REGION, + asset=EXAMPLE_ASSET, + lake_id=LAKE_ID, + zone_id=ZONE_ID, + asset_id=ASSET_ID, + ) + # [END howto_dataplex_create_asset_operator] + # [START howto_dataplex_create_data_quality_operator] + create_data_scan = DataplexCreateDataQualityScanOperator( + task_id="create_data_scan", + project_id=PROJECT_ID, + region=REGION, + data_scan=EXAMPLE_DATA_SCAN, + data_scan_id=DATA_SCAN_ID, + ) + # [END howto_dataplex_create_data_quality_operator] + # [START howto_dataplex_execute_data_quality_operator] + execute_data_scan = DataplexExecuteDataQualityScanOperator( + task_id="execute_data_scan", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + ) + # [END howto_dataplex_execute_data_quality_operator] + # [START howto_dataplex_execute_data_quality_def_operator] + execute_data_scan_def = DataplexExecuteDataQualityScanOperator( + task_id="execute_data_scan_def", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + deferrable=True, + ) + # [END howto_dataplex_execute_data_quality_def_operator] + # [START howto_dataplex_run_data_quality_operator] + run_data_scan = DataplexRunDataQualityScanOperator( + task_id="run_data_scan", project_id=PROJECT_ID, region=REGION, data_scan_id=DATA_SCAN_ID + ) + # [END howto_dataplex_run_data_quality_operator] + job_id = run_data_scan.output + # [START howto_dataplex_data_scan_job_state_sensor] + get_data_scan_job_status = DataplexDataQualityJobStatusSensor( + task_id="get_data_scan_job_status", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + job_id=str(job_id), + ) + # [END howto_dataplex_data_scan_job_state_sensor] + # [START howto_dataplex_get_data_quality_job_operator] + get_data_scan_job_result = DataplexGetDataQualityScanResultOperator( + task_id="get_data_scan_job_result", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + ) + # [END howto_dataplex_get_data_quality_job_operator] + # [START howto_dataplex_get_data_quality_job_def_operator] + get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator( + task_id="get_data_scan_job_result_def", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + deferrable=True, + ) + # [END howto_dataplex_get_data_quality_job_def_operator] + # [START howto_dataplex_delete_asset_operator] + delete_asset = DataplexDeleteAssetOperator( + task_id="delete_asset", + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + zone_id=ZONE_ID, + asset_id=ASSET_ID, + ) + # [END howto_dataplex_delete_asset_operator] + delete_asset.trigger_rule = TriggerRule.ALL_DONE + # [START howto_dataplex_delete_zone_operator] + delete_zone = DataplexDeleteZoneOperator( + task_id="delete_zone", + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + zone_id=ZONE_ID, + ) + # [END howto_dataplex_delete_zone_operator] + delete_zone.trigger_rule = TriggerRule.ALL_DONE + # [START howto_dataplex_delete_data_quality_operator] + delete_data_scan = DataplexDeleteDataQualityOperator( + task_id="delete_data_scan", + project_id=PROJECT_ID, + region=REGION, + data_scan_id=DATA_SCAN_ID, + ) + # [END howto_dataplex_delete_data_quality_operator] + delete_data_scan.trigger_rule = TriggerRule.ALL_DONE + delete_lake = DataplexDeleteLakeOperator( + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + task_id="delete_lake", + trigger_rule=TriggerRule.ALL_DONE, + ) + + chain( + # TEST SETUP + create_dataset + >> [create_table_1, create_table_2] + >> insert_query_job + >> create_lake + >> create_zone + >> create_asset + # TEST BODY + >> create_data_scan + >> execute_data_scan + >> execute_data_scan_def + >> run_data_scan + >> get_data_scan_job_status + >> get_data_scan_job_result + >> get_data_scan_job_result_def + # TEST TEARDOWN + >> delete_asset + >> delete_zone + >> delete_data_scan + >> delete_lake + ) Review Comment: I can see that there is a step for creating 2 tables, but no step with deleting them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
