This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 19df65a156 Fix BigQuery system tests (#33254)
19df65a156 is described below
commit 19df65a156c2b3389194d87aa25629dc6371fd95
Author: VladaZakharova <[email protected]>
AuthorDate: Wed Aug 9 13:52:04 2023 +0200
Fix BigQuery system tests (#33254)
---
.../bigquery/example_bigquery_queries_async.py | 55 +++++++++++-----------
.../cloud/bigquery/example_bigquery_sensors.py | 48 ++++++++++---------
2 files changed, 54 insertions(+), 49 deletions(-)
diff --git
a/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
b/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
index 38828dcb04..cdedfb0bc2 100644
---
a/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
+++
b/tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py
@@ -37,16 +37,16 @@ from airflow.providers.google.cloud.operators.bigquery
import (
)
from airflow.utils.trigger_rule import TriggerRule
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "bigquery_queries_async"
+DAG_ID = "example_bigquery_queries_async"
-DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}".replace("-", "_")
LOCATION = "us"
-TABLE_1 = "table1"
-TABLE_2 = "table2"
+TABLE_NAME_1 = f"table_{DAG_ID}_{ENV_ID}_1".replace("-", "_")
+TABLE_NAME_2 = f"table_{DAG_ID}_{ENV_ID}_2".replace("-", "_")
SCHEMA = [
{"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
@@ -54,10 +54,9 @@ SCHEMA = [
{"name": "ds", "type": "STRING", "mode": "NULLABLE"},
]
-DATASET = DATASET_NAME
INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
INSERT_ROWS_QUERY = (
- f"INSERT {DATASET}.{TABLE_1} VALUES "
+ f"INSERT {DATASET_NAME}.{TABLE_NAME_1} VALUES "
f"(42, 'monthy python', '{INSERT_DATE}'), "
f"(42, 'fishy fish', '{INSERT_DATE}');"
)
@@ -72,7 +71,8 @@ CONFIGURATION = {
DECLARE WAIT STRING;
SET success = FALSE;
- SELECT row_count = (SELECT row_count FROM {DATASET}.__TABLES__ WHERE
table_id='NON_EXISTING_TABLE');
+ SELECT row_count = (SELECT row_count FROM {DATASET_NAME}.__TABLES__
+ WHERE table_id='NON_EXISTING_TABLE');
IF row_count > 0 THEN
SELECT 'Table Exists!' as message, retry_count as retries;
SET success = TRUE;
@@ -103,26 +103,26 @@ with DAG(
start_date=datetime(2022, 1, 1),
catchup=False,
default_args=default_args,
- tags=["example", "async", "bigquery"],
- user_defined_macros={"DATASET": DATASET, "TABLE": TABLE_1},
+ tags=["example", "bigquery", "deferrable"],
+ user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME_1},
) as dag:
create_dataset = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset",
- dataset_id=DATASET,
+ dataset_id=DATASET_NAME,
location=LOCATION,
)
create_table_1 = BigQueryCreateEmptyTableOperator(
task_id="create_table_1",
- dataset_id=DATASET,
- table_id=TABLE_1,
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_NAME_1,
schema_fields=SCHEMA,
location=LOCATION,
)
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset",
- dataset_id=DATASET,
+ dataset_id=DATASET_NAME,
delete_contents=True,
trigger_rule=TriggerRule.ALL_DONE,
)
@@ -159,7 +159,7 @@ with DAG(
# [START howto_operator_bigquery_value_check_async]
check_value = BigQueryValueCheckOperator(
task_id="check_value",
- sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
+ sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
pass_value=2,
use_legacy_sql=False,
location=LOCATION,
@@ -170,7 +170,7 @@ with DAG(
# [START howto_operator_bigquery_interval_check_async]
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
- table=f"{DATASET}.{TABLE_1}",
+ table=f"{DATASET_NAME}.{TABLE_NAME_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
@@ -185,8 +185,8 @@ with DAG(
configuration={
"query": {
"query": [
- f"SELECT * FROM {DATASET}.{TABLE_2}",
- f"SELECT COUNT(*) FROM {DATASET}.{TABLE_2}",
+ f"SELECT * FROM {DATASET_NAME}.{TABLE_NAME_2}",
+ f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_2}",
],
"useLegacySql": False,
}
@@ -199,10 +199,11 @@ with DAG(
# [START howto_operator_bigquery_get_data_async]
get_data = BigQueryGetDataOperator(
task_id="get_data",
- dataset_id=DATASET,
- table_id=TABLE_1,
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_NAME_1,
+ use_legacy_sql=False,
max_results=10,
- selected_fields="value,name",
+ selected_fields="value",
location=LOCATION,
deferrable=True,
)
@@ -217,7 +218,7 @@ with DAG(
# [START howto_operator_bigquery_check_async]
check_count = BigQueryCheckOperator(
task_id="check_count",
- sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
+ sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
@@ -229,12 +230,12 @@ with DAG(
task_id="execute_query_save",
configuration={
"query": {
- "query": f"SELECT * FROM {DATASET}.{TABLE_1}",
+ "query": f"SELECT * FROM {DATASET_NAME}.{TABLE_NAME_1}",
"useLegacySql": False,
"destinationTable": {
"projectId": PROJECT_ID,
- "datasetId": DATASET,
- "tableId": TABLE_2,
+ "datasetId": DATASET_NAME,
+ "tableId": TABLE_NAME_2,
},
}
},
@@ -257,8 +258,6 @@ with DAG(
insert_query_job >> execute_long_running_query >> check_value >>
check_interval
[check_count, check_interval, bigquery_execute_multi_query,
get_data_result] >> delete_dataset
- # ### Everything below this line is not part of example ###
- # ### Just for system tests purpose ###
from tests.system.utils.watcher import watcher
# This test needs watcher in order to properly mark success/failure
diff --git
a/tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
b/tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
index ee5f5b1062..4f8d695787 100644
--- a/tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
+++ b/tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py
@@ -24,7 +24,6 @@ import os
from datetime import datetime
from airflow import models
-from airflow.models.baseoperator import BaseOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateEmptyTableOperator,
@@ -37,18 +36,16 @@ from airflow.providers.google.cloud.sensors.bigquery import
(
BigQueryTableExistenceSensor,
BigQueryTablePartitionExistenceSensor,
)
-from airflow.sensors.base import BaseSensorOperator
from airflow.utils.trigger_rule import TriggerRule
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "bigquery_sensors"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "example_bigquery_sensors"
-DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "")
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}".replace("-", "_")
+TABLE_NAME = f"partitioned_table_{DAG_ID}_{ENV_ID}".replace("-", "_")
-TABLE_NAME = "partitioned_table"
INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
-
PARTITION_NAME = "{{ ds_nodash }}"
INSERT_ROWS_QUERY = f"INSERT {DATASET_NAME}.{TABLE_NAME} VALUES (42, '{{{{ ds
}}}}')"
@@ -64,7 +61,7 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "bigquery"],
+ tags=["example", "bigquery", "sensors"],
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME},
default_args={"project_id": PROJECT_ID},
) as dag:
@@ -83,15 +80,16 @@ with models.DAG(
"field": "ds",
},
)
+
# [START howto_sensor_bigquery_table]
- check_table_exists: BaseOperator = BigQueryTableExistenceSensor(
+ check_table_exists = BigQueryTableExistenceSensor(
task_id="check_table_exists", project_id=PROJECT_ID,
dataset_id=DATASET_NAME, table_id=TABLE_NAME
)
# [END howto_sensor_bigquery_table]
# [START howto_sensor_bigquery_table_defered]
- check_table_exists: BaseOperator = BigQueryTableExistenceSensor(
- task_id="check_table_exists_defered",
+ check_table_exists_def = BigQueryTableExistenceSensor(
+ task_id="check_table_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
@@ -108,7 +106,7 @@ with models.DAG(
)
# [END howto_sensor_async_bigquery_table]
- execute_insert_query: BaseOperator = BigQueryInsertJobOperator(
+ execute_insert_query = BigQueryInsertJobOperator(
task_id="execute_insert_query",
configuration={
"query": {
@@ -119,7 +117,7 @@ with models.DAG(
)
# [START howto_sensor_bigquery_table_partition]
- check_table_partition_exists: BaseSensorOperator =
BigQueryTablePartitionExistenceSensor(
+ check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
@@ -129,8 +127,8 @@ with models.DAG(
# [END howto_sensor_bigquery_table_partition]
# [START howto_sensor_bigquery_table_partition_defered]
- check_table_partition_exists: BaseSensorOperator =
BigQueryTablePartitionExistenceSensor(
- task_id="check_table_partition_exists_defered",
+ check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
+ task_id="check_table_partition_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
@@ -140,7 +138,7 @@ with models.DAG(
# [END howto_sensor_bigquery_table_partition_defered]
# [START howto_sensor_bigquery_table_partition_async]
- check_table_partition_exists_async: BaseSensorOperator =
BigQueryTableExistencePartitionAsyncSensor(
+ check_table_partition_exists_async =
BigQueryTableExistencePartitionAsyncSensor(
task_id="check_table_partition_exists_async",
partition_id=PARTITION_NAME,
project_id=PROJECT_ID,
@@ -156,10 +154,18 @@ with models.DAG(
trigger_rule=TriggerRule.ALL_DONE,
)
- create_dataset >> create_table
- create_table >> [check_table_exists, execute_insert_query]
- execute_insert_query >> check_table_partition_exists
- [check_table_exists, check_table_exists_async,
check_table_partition_exists] >> delete_dataset
+ (
+ create_dataset
+ >> create_table
+ >> [check_table_exists, check_table_exists_async,
check_table_exists_def]
+ >> execute_insert_query
+ >> [
+ check_table_partition_exists,
+ check_table_partition_exists_async,
+ check_table_partition_exists_def,
+ ]
+ >> delete_dataset
+ )
from tests.system.utils.watcher import watcher