This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 19df65a156 Fix BigQuery system tests (#33254)
19df65a156 is described below

commit 19df65a156c2b3389194d87aa25629dc6371fd95
Author: VladaZakharova <[email protected]>
AuthorDate: Wed Aug 9 13:52:04 2023 +0200

    Fix BigQuery system tests (#33254)
---
 .../bigquery/example_bigquery_queries_async.py     | 55 +++++++++++-----------
 .../cloud/bigquery/example_bigquery_sensors.py     | 48 ++++++++++---------
 2 files changed, 54 insertions(+), 49 deletions(-)

diff --git 
a/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
 
b/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
index 38828dcb04..cdedfb0bc2 100644
--- 
a/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
+++ 
b/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
@@ -37,16 +37,16 @@ from airflow.providers.google.cloud.operators.bigquery 
import (
 )
 from airflow.utils.trigger_rule import TriggerRule
 
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
 
-DAG_ID = "bigquery_queries_async"
+DAG_ID = "example_bigquery_queries_async"
 
-DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}".replace("-", "_")
 LOCATION = "us"
 
-TABLE_1 = "table1"
-TABLE_2 = "table2"
+TABLE_NAME_1 = f"table_{DAG_ID}_{ENV_ID}_1".replace("-", "_")
+TABLE_NAME_2 = f"table_{DAG_ID}_{ENV_ID}_2".replace("-", "_")
 
 SCHEMA = [
     {"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
@@ -54,10 +54,9 @@ SCHEMA = [
     {"name": "ds", "type": "STRING", "mode": "NULLABLE"},
 ]
 
-DATASET = DATASET_NAME
 INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
 INSERT_ROWS_QUERY = (
-    f"INSERT {DATASET}.{TABLE_1} VALUES "
+    f"INSERT {DATASET_NAME}.{TABLE_NAME_1} VALUES "
     f"(42, 'monthy python', '{INSERT_DATE}'), "
     f"(42, 'fishy fish', '{INSERT_DATE}');"
 )
@@ -72,7 +71,8 @@ CONFIGURATION = {
         DECLARE WAIT STRING;
         SET success = FALSE;
 
-        SELECT row_count = (SELECT row_count FROM {DATASET}.__TABLES__ WHERE 
table_id='NON_EXISTING_TABLE');
+        SELECT row_count = (SELECT row_count FROM {DATASET_NAME}.__TABLES__
+        WHERE table_id='NON_EXISTING_TABLE');
         IF row_count > 0  THEN
             SELECT 'Table Exists!' as message, retry_count as retries;
             SET success = TRUE;
@@ -103,26 +103,26 @@ with DAG(
     start_date=datetime(2022, 1, 1),
     catchup=False,
     default_args=default_args,
-    tags=["example", "async", "bigquery"],
-    user_defined_macros={"DATASET": DATASET, "TABLE": TABLE_1},
+    tags=["example", "bigquery", "deferrable"],
+    user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME_1},
 ) as dag:
     create_dataset = BigQueryCreateEmptyDatasetOperator(
         task_id="create_dataset",
-        dataset_id=DATASET,
+        dataset_id=DATASET_NAME,
         location=LOCATION,
     )
 
     create_table_1 = BigQueryCreateEmptyTableOperator(
         task_id="create_table_1",
-        dataset_id=DATASET,
-        table_id=TABLE_1,
+        dataset_id=DATASET_NAME,
+        table_id=TABLE_NAME_1,
         schema_fields=SCHEMA,
         location=LOCATION,
     )
 
     delete_dataset = BigQueryDeleteDatasetOperator(
         task_id="delete_dataset",
-        dataset_id=DATASET,
+        dataset_id=DATASET_NAME,
         delete_contents=True,
         trigger_rule=TriggerRule.ALL_DONE,
     )
@@ -159,7 +159,7 @@ with DAG(
     # [START howto_operator_bigquery_value_check_async]
     check_value = BigQueryValueCheckOperator(
         task_id="check_value",
-        sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
+        sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
         pass_value=2,
         use_legacy_sql=False,
         location=LOCATION,
@@ -170,7 +170,7 @@ with DAG(
     # [START howto_operator_bigquery_interval_check_async]
     check_interval = BigQueryIntervalCheckOperator(
         task_id="check_interval",
-        table=f"{DATASET}.{TABLE_1}",
+        table=f"{DATASET_NAME}.{TABLE_NAME_1}",
         days_back=1,
         metrics_thresholds={"COUNT(*)": 1.5},
         use_legacy_sql=False,
@@ -185,8 +185,8 @@ with DAG(
         configuration={
             "query": {
                 "query": [
-                    f"SELECT * FROM {DATASET}.{TABLE_2}",
-                    f"SELECT COUNT(*) FROM {DATASET}.{TABLE_2}",
+                    f"SELECT * FROM {DATASET_NAME}.{TABLE_NAME_2}",
+                    f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_2}",
                 ],
                 "useLegacySql": False,
             }
@@ -199,10 +199,11 @@ with DAG(
     # [START howto_operator_bigquery_get_data_async]
     get_data = BigQueryGetDataOperator(
         task_id="get_data",
-        dataset_id=DATASET,
-        table_id=TABLE_1,
+        dataset_id=DATASET_NAME,
+        table_id=TABLE_NAME_1,
+        use_legacy_sql=False,
         max_results=10,
-        selected_fields="value,name",
+        selected_fields="value",
         location=LOCATION,
         deferrable=True,
     )
@@ -217,7 +218,7 @@ with DAG(
     # [START howto_operator_bigquery_check_async]
     check_count = BigQueryCheckOperator(
         task_id="check_count",
-        sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
+        sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
         use_legacy_sql=False,
         location=LOCATION,
         deferrable=True,
@@ -229,12 +230,12 @@ with DAG(
         task_id="execute_query_save",
         configuration={
             "query": {
-                "query": f"SELECT * FROM {DATASET}.{TABLE_1}",
+                "query": f"SELECT * FROM {DATASET_NAME}.{TABLE_NAME_1}",
                 "useLegacySql": False,
                 "destinationTable": {
                     "projectId": PROJECT_ID,
-                    "datasetId": DATASET,
-                    "tableId": TABLE_2,
+                    "datasetId": DATASET_NAME,
+                    "tableId": TABLE_NAME_2,
                 },
             }
         },
@@ -257,8 +258,6 @@ with DAG(
     insert_query_job >> execute_long_running_query >> check_value >> 
check_interval
     [check_count, check_interval, bigquery_execute_multi_query, 
get_data_result] >> delete_dataset
 
-    # ### Everything below this line is not part of example ###
-    # ### Just for system tests purpose ###
     from tests.system.utils.watcher import watcher
 
     # This test needs watcher in order to properly mark success/failure
diff --git 
a/tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py 
b/tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
index ee5f5b1062..4f8d695787 100644
--- a/tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
+++ b/tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
@@ -24,7 +24,6 @@ import os
 from datetime import datetime
 
 from airflow import models
-from airflow.models.baseoperator import BaseOperator
 from airflow.providers.google.cloud.operators.bigquery import (
     BigQueryCreateEmptyDatasetOperator,
     BigQueryCreateEmptyTableOperator,
@@ -37,18 +36,16 @@ from airflow.providers.google.cloud.sensors.bigquery import 
(
     BigQueryTableExistenceSensor,
     BigQueryTablePartitionExistenceSensor,
 )
-from airflow.sensors.base import BaseSensorOperator
 from airflow.utils.trigger_rule import TriggerRule
 
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "bigquery_sensors"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "example_bigquery_sensors"
 
-DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}".replace("-", "_")
+TABLE_NAME = f"partitioned_table_{DAG_ID}_{ENV_ID}".replace("-", "_")
 
-TABLE_NAME = "partitioned_table"
 INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
-
 PARTITION_NAME = "{{ ds_nodash }}"
 
 INSERT_ROWS_QUERY = f"INSERT {DATASET_NAME}.{TABLE_NAME} VALUES (42, '{{{{ ds 
}}}}')"
@@ -64,7 +61,7 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "bigquery"],
+    tags=["example", "bigquery", "sensors"],
     user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME},
     default_args={"project_id": PROJECT_ID},
 ) as dag:
@@ -83,15 +80,16 @@ with models.DAG(
             "field": "ds",
         },
     )
+
     # [START howto_sensor_bigquery_table]
-    check_table_exists: BaseOperator = BigQueryTableExistenceSensor(
+    check_table_exists = BigQueryTableExistenceSensor(
         task_id="check_table_exists", project_id=PROJECT_ID, 
dataset_id=DATASET_NAME, table_id=TABLE_NAME
     )
     # [END howto_sensor_bigquery_table]
 
     # [START howto_sensor_bigquery_table_defered]
-    check_table_exists: BaseOperator = BigQueryTableExistenceSensor(
-        task_id="check_table_exists_defered",
+    check_table_exists_def = BigQueryTableExistenceSensor(
+        task_id="check_table_exists_def",
         project_id=PROJECT_ID,
         dataset_id=DATASET_NAME,
         table_id=TABLE_NAME,
@@ -108,7 +106,7 @@ with models.DAG(
     )
     # [END howto_sensor_async_bigquery_table]
 
-    execute_insert_query: BaseOperator = BigQueryInsertJobOperator(
+    execute_insert_query = BigQueryInsertJobOperator(
         task_id="execute_insert_query",
         configuration={
             "query": {
@@ -119,7 +117,7 @@ with models.DAG(
     )
 
     # [START howto_sensor_bigquery_table_partition]
-    check_table_partition_exists: BaseSensorOperator = 
BigQueryTablePartitionExistenceSensor(
+    check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
         task_id="check_table_partition_exists",
         project_id=PROJECT_ID,
         dataset_id=DATASET_NAME,
@@ -129,8 +127,8 @@ with models.DAG(
     # [END howto_sensor_bigquery_table_partition]
 
     # [START howto_sensor_bigquery_table_partition_defered]
-    check_table_partition_exists: BaseSensorOperator = 
BigQueryTablePartitionExistenceSensor(
-        task_id="check_table_partition_exists_defered",
+    check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
+        task_id="check_table_partition_exists_def",
         project_id=PROJECT_ID,
         dataset_id=DATASET_NAME,
         table_id=TABLE_NAME,
@@ -140,7 +138,7 @@ with models.DAG(
     # [END howto_sensor_bigquery_table_partition_defered]
 
     # [START howto_sensor_bigquery_table_partition_async]
-    check_table_partition_exists_async: BaseSensorOperator = 
BigQueryTableExistencePartitionAsyncSensor(
+    check_table_partition_exists_async = 
BigQueryTableExistencePartitionAsyncSensor(
         task_id="check_table_partition_exists_async",
         partition_id=PARTITION_NAME,
         project_id=PROJECT_ID,
@@ -156,10 +154,18 @@ with models.DAG(
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    create_dataset >> create_table
-    create_table >> [check_table_exists, execute_insert_query]
-    execute_insert_query >> check_table_partition_exists
-    [check_table_exists, check_table_exists_async, 
check_table_partition_exists] >> delete_dataset
+    (
+        create_dataset
+        >> create_table
+        >> [check_table_exists, check_table_exists_async, 
check_table_exists_def]
+        >> execute_insert_query
+        >> [
+            check_table_partition_exists,
+            check_table_partition_exists_async,
+            check_table_partition_exists_def,
+        ]
+        >> delete_dataset
+    )
 
     from tests.system.utils.watcher import watcher
 

Reply via email to