This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 930db714f7 Resolve deprecations in `BigQuery` operators (#40182)
930db714f7 is described below
commit 930db714f7a11b5506402b286411d16aaaaedda7
Author: Bora Berke Sahin <[email protected]>
AuthorDate: Wed Jun 12 17:43:25 2024 +0300
Resolve deprecations in `BigQuery` operators (#40182)
---
tests/deprecations_ignore.yml | 12 -
.../google/cloud/operators/test_bigquery.py | 379 +++++++++++----------
2 files changed, 197 insertions(+), 194 deletions(-)
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index bed0c8dcbf..a8debe085b 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -191,19 +191,7 @@
-
tests/providers/google/cloud/hooks/vertex_ai/test_custom_job.py::TestCustomJobWithoutDefaultProjectIdHook::test_delete_pipeline_job
-
tests/providers/google/cloud/hooks/vertex_ai/test_custom_job.py::TestCustomJobWithoutDefaultProjectIdHook::test_get_pipeline_job
-
tests/providers/google/cloud/hooks/vertex_ai/test_custom_job.py::TestCustomJobWithoutDefaultProjectIdHook::test_list_pipeline_jobs
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryCreateExternalTableOperator::test_execute_with_csv_format
-
tests/providers/google/cloud/operators/test_automl.py::TestAutoMLTrainModelOperator::test_execute
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryCreateExternalTableOperator::test_execute_with_parquet_format
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_defaults
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_link_when_missing_job_id
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_link_when_multiple_query
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_link_when_single_query
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_serialized_field_when_multiple_queries
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_serialized_field_when_single_query
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_execute
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_execute_bad_type
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_execute_list
--
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryPatchDatasetOperator::test_execute
-
tests/providers/google/cloud/operators/test_dataflow.py::TestDataflowCreateJavaJobOperator::test_check_job_not_running_exec
-
tests/providers/google/cloud/operators/test_dataflow.py::TestDataflowCreateJavaJobOperator::test_check_job_running_exec
-
tests/providers/google/cloud/operators/test_dataflow.py::TestDataflowCreateJavaJobOperator::test_check_multiple_job_exec
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index 3fa3446761..6a92fe8089 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -29,7 +29,13 @@ from google.cloud.exceptions import Conflict
from openlineage.client.facet import ErrorMessageRunFacet,
ExternalQueryRunFacet, SqlJobFacet
from openlineage.client.run import Dataset
-from airflow.exceptions import AirflowException, AirflowSkipException,
AirflowTaskTimeout, TaskDeferred
+from airflow.exceptions import (
+ AirflowException,
+ AirflowProviderDeprecationWarning,
+ AirflowSkipException,
+ AirflowTaskTimeout,
+ TaskDeferred,
+)
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator,
BigQueryColumnCheckOperator,
@@ -249,15 +255,38 @@ class TestBigQueryCreateEmptyTableOperator:
class TestBigQueryCreateExternalTableOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute_with_csv_format(self, mock_hook):
+ table_resource = {
+ "tableReference": {
+ "projectId": TEST_GCP_PROJECT_ID,
+ "datasetId": TEST_DATASET,
+ "tableId": TEST_TABLE_ID,
+ },
+ "labels": None,
+ "schema": {"fields": []},
+ "externalDataConfiguration": {
+ "source_uris": [
+ f"gs://{TEST_GCS_BUCKET}/{source_object}" for
source_object in TEST_GCS_CSV_DATA
+ ],
+ "source_format": TEST_SOURCE_CSV_FORMAT,
+ "maxBadRecords": 0,
+ "autodetect": True,
+ "compression": "NONE",
+ "csvOptions": {
+ "fieldDelimiter": ",",
+ "skipLeadingRows": 0,
+ "quote": None,
+ "allowQuotedNewlines": False,
+ "allowJaggedRows": False,
+ },
+ },
+ "location": None,
+ "encryptionConfiguration": None,
+ }
operator = BigQueryCreateExternalTableOperator(
task_id=TASK_ID,
-
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
- schema_fields=[],
bucket=TEST_GCS_BUCKET,
- gcs_schema_bucket=TEST_GCS_BUCKET,
source_objects=TEST_GCS_CSV_DATA,
- source_format=TEST_SOURCE_CSV_FORMAT,
- autodetect=True,
+ table_resource=table_resource,
)
mock_hook.return_value.split_tablename.return_value = (
@@ -267,47 +296,35 @@ class TestBigQueryCreateExternalTableOperator:
)
operator.execute(context=MagicMock())
- mock_hook.return_value.create_empty_table.assert_called_once_with(
- table_resource={
- "tableReference": {
- "projectId": TEST_GCP_PROJECT_ID,
- "datasetId": TEST_DATASET,
- "tableId": TEST_TABLE_ID,
- },
- "labels": None,
- "schema": {"fields": []},
- "externalDataConfiguration": {
- "source_uris": [
- f"gs://{TEST_GCS_BUCKET}/{source_object}" for
source_object in TEST_GCS_CSV_DATA
- ],
- "source_format": TEST_SOURCE_CSV_FORMAT,
- "maxBadRecords": 0,
- "autodetect": True,
- "compression": "NONE",
- "csvOptions": {
- "fieldDelimiter": ",",
- "skipLeadingRows": 0,
- "quote": None,
- "allowQuotedNewlines": False,
- "allowJaggedRows": False,
- },
- },
- "location": None,
- "encryptionConfiguration": None,
- }
- )
+
mock_hook.return_value.create_empty_table.assert_called_once_with(table_resource=table_resource)
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute_with_parquet_format(self, mock_hook):
+ table_resource = {
+ "tableReference": {
+ "projectId": TEST_GCP_PROJECT_ID,
+ "datasetId": TEST_DATASET,
+ "tableId": TEST_TABLE_ID,
+ },
+ "labels": None,
+ "schema": {"fields": []},
+ "externalDataConfiguration": {
+ "source_uris": [
+ f"gs://{TEST_GCS_BUCKET}/{source_object}" for
source_object in TEST_GCS_PARQUET_DATA
+ ],
+ "source_format": TEST_SOURCE_PARQUET_FORMAT,
+ "maxBadRecords": 0,
+ "autodetect": True,
+ "compression": "NONE",
+ },
+ "location": None,
+ "encryptionConfiguration": None,
+ }
operator = BigQueryCreateExternalTableOperator(
task_id=TASK_ID,
-
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
- schema_fields=[],
bucket=TEST_GCS_BUCKET,
- gcs_schema_bucket=TEST_GCS_BUCKET,
source_objects=TEST_GCS_PARQUET_DATA,
- source_format=TEST_SOURCE_PARQUET_FORMAT,
- autodetect=True,
+ table_resource=table_resource,
)
mock_hook.return_value.split_tablename.return_value = (
@@ -317,28 +334,7 @@ class TestBigQueryCreateExternalTableOperator:
)
operator.execute(context=MagicMock())
- mock_hook.return_value.create_empty_table.assert_called_once_with(
- table_resource={
- "tableReference": {
- "projectId": TEST_GCP_PROJECT_ID,
- "datasetId": TEST_DATASET,
- "tableId": TEST_TABLE_ID,
- },
- "labels": None,
- "schema": {"fields": []},
- "externalDataConfiguration": {
- "source_uris": [
- f"gs://{TEST_GCS_BUCKET}/{source_object}" for
source_object in TEST_GCS_PARQUET_DATA
- ],
- "source_format": TEST_SOURCE_PARQUET_FORMAT,
- "maxBadRecords": 0,
- "autodetect": True,
- "compression": "NONE",
- },
- "location": None,
- "encryptionConfiguration": None,
- }
- )
+
mock_hook.return_value.create_empty_table.assert_called_once_with(table_resource=table_resource)
class TestBigQueryDeleteDatasetOperator:
@@ -478,12 +474,17 @@ class TestBigQueryPatchDatasetOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute(self, mock_hook):
dataset_resource = {"friendlyName": "Test DS"}
- operator = BigQueryPatchDatasetOperator(
- dataset_resource=dataset_resource,
- task_id=TASK_ID,
- dataset_id=TEST_DATASET,
- project_id=TEST_GCP_PROJECT_ID,
- )
+ deprecation_message = (
+ r"Call to deprecated class BigQueryPatchDatasetOperator\. "
+ r"\(This operator is deprecated\. Please use
BigQueryUpdateDatasetOperator\.\)"
+ )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=deprecation_message):
+ operator = BigQueryPatchDatasetOperator(
+ dataset_resource=dataset_resource,
+ task_id=TASK_ID,
+ dataset_id=TEST_DATASET,
+ project_id=TEST_GCP_PROJECT_ID,
+ )
operator.execute(None)
mock_hook.return_value.patch_dataset.assert_called_once_with(
@@ -513,6 +514,11 @@ class TestBigQueryUpdateDatasetOperator:
@pytest.mark.db_test
class TestBigQueryOperator:
+ deprecation_message = (
+ r"Call to deprecated class BigQueryExecuteQueryOperator\. "
+ r"\(This operator is deprecated\. Please use
`BigQueryInsertJobOperator`\.\)"
+ )
+
def teardown_method(self):
clear_db_xcom()
clear_db_runs()
@@ -523,33 +529,34 @@ class TestBigQueryOperator:
def test_execute(self, mock_hook):
encryption_configuration = {"key": "kk"}
- operator = BigQueryExecuteQueryOperator(
- task_id=TASK_ID,
- sql="Select * from test_table",
- destination_dataset_table=None,
- write_disposition="WRITE_EMPTY",
- allow_large_results=False,
- flatten_results=None,
- gcp_conn_id="google_cloud_default",
- udf_config=None,
- use_legacy_sql=True,
- maximum_billing_tier=None,
- maximum_bytes_billed=None,
- create_disposition="CREATE_IF_NEEDED",
- schema_update_options=(),
- query_params=None,
- labels=None,
- priority="INTERACTIVE",
- time_partitioning=None,
- api_resource_configs=None,
- cluster_fields=None,
- encryption_configuration=encryption_configuration,
-
impersonation_chain=["[email protected]"],
- impersonation_scopes=[
- "https://www.googleapis.com/auth/cloud-platform",
- "https://www.googleapis.com/auth/drive",
- ],
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=self.deprecation_message):
+ operator = BigQueryExecuteQueryOperator(
+ task_id=TASK_ID,
+ sql="Select * from test_table",
+ destination_dataset_table=None,
+ write_disposition="WRITE_EMPTY",
+ allow_large_results=False,
+ flatten_results=None,
+ gcp_conn_id="google_cloud_default",
+ udf_config=None,
+ use_legacy_sql=True,
+ maximum_billing_tier=None,
+ maximum_bytes_billed=None,
+ create_disposition="CREATE_IF_NEEDED",
+ schema_update_options=(),
+ query_params=None,
+ labels=None,
+ priority="INTERACTIVE",
+ time_partitioning=None,
+ api_resource_configs=None,
+ cluster_fields=None,
+ encryption_configuration=encryption_configuration,
+
impersonation_chain=["[email protected]"],
+ impersonation_scopes=[
+ "https://www.googleapis.com/auth/cloud-platform",
+ "https://www.googleapis.com/auth/drive",
+ ],
+ )
operator.execute(MagicMock())
mock_hook.assert_called_with(
@@ -584,31 +591,32 @@ class TestBigQueryOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute_list(self, mock_hook):
- operator = BigQueryExecuteQueryOperator(
- task_id=TASK_ID,
- sql=[
- "Select * from test_table",
- "Select * from other_test_table",
- ],
- destination_dataset_table=None,
- write_disposition="WRITE_EMPTY",
- allow_large_results=False,
- flatten_results=None,
- gcp_conn_id="google_cloud_default",
- udf_config=None,
- use_legacy_sql=True,
- maximum_billing_tier=None,
- maximum_bytes_billed=None,
- create_disposition="CREATE_IF_NEEDED",
- schema_update_options=(),
- query_params=None,
- labels=None,
- priority="INTERACTIVE",
- time_partitioning=None,
- api_resource_configs=None,
- cluster_fields=None,
- encryption_configuration=None,
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=self.deprecation_message):
+ operator = BigQueryExecuteQueryOperator(
+ task_id=TASK_ID,
+ sql=[
+ "Select * from test_table",
+ "Select * from other_test_table",
+ ],
+ destination_dataset_table=None,
+ write_disposition="WRITE_EMPTY",
+ allow_large_results=False,
+ flatten_results=None,
+ gcp_conn_id="google_cloud_default",
+ udf_config=None,
+ use_legacy_sql=True,
+ maximum_billing_tier=None,
+ maximum_bytes_billed=None,
+ create_disposition="CREATE_IF_NEEDED",
+ schema_update_options=(),
+ query_params=None,
+ labels=None,
+ priority="INTERACTIVE",
+ time_partitioning=None,
+ api_resource_configs=None,
+ cluster_fields=None,
+ encryption_configuration=None,
+ )
operator.execute(MagicMock())
mock_hook.return_value.run_query.assert_has_calls(
@@ -656,40 +664,42 @@ class TestBigQueryOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute_bad_type(self, mock_hook):
- operator = BigQueryExecuteQueryOperator(
- task_id=TASK_ID,
- sql=1,
- destination_dataset_table=None,
- write_disposition="WRITE_EMPTY",
- allow_large_results=False,
- flatten_results=None,
- gcp_conn_id="google_cloud_default",
- udf_config=None,
- use_legacy_sql=True,
- maximum_billing_tier=None,
- maximum_bytes_billed=None,
- create_disposition="CREATE_IF_NEEDED",
- schema_update_options=(),
- query_params=None,
- labels=None,
- priority="INTERACTIVE",
- time_partitioning=None,
- api_resource_configs=None,
- cluster_fields=None,
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=self.deprecation_message):
+ operator = BigQueryExecuteQueryOperator(
+ task_id=TASK_ID,
+ sql=1,
+ destination_dataset_table=None,
+ write_disposition="WRITE_EMPTY",
+ allow_large_results=False,
+ flatten_results=None,
+ gcp_conn_id="google_cloud_default",
+ udf_config=None,
+ use_legacy_sql=True,
+ maximum_billing_tier=None,
+ maximum_bytes_billed=None,
+ create_disposition="CREATE_IF_NEEDED",
+ schema_update_options=(),
+ query_params=None,
+ labels=None,
+ priority="INTERACTIVE",
+ time_partitioning=None,
+ api_resource_configs=None,
+ cluster_fields=None,
+ )
with pytest.raises(AirflowException):
operator.execute(MagicMock())
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_operator_defaults(self, mock_hook,
create_task_instance_of_operator):
- ti = create_task_instance_of_operator(
- BigQueryExecuteQueryOperator,
- dag_id=TEST_DAG_ID,
- task_id=TASK_ID,
- sql="Select * from test_table",
- schema_update_options=None,
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=self.deprecation_message):
+ ti = create_task_instance_of_operator(
+ BigQueryExecuteQueryOperator,
+ dag_id=TEST_DAG_ID,
+ task_id=TASK_ID,
+ sql="Select * from test_table",
+ schema_update_options=None,
+ )
operator = ti.task
operator.execute(MagicMock())
@@ -722,13 +732,14 @@ class TestBigQueryOperator:
dag_maker,
create_task_instance_of_operator,
):
- ti = create_task_instance_of_operator(
- BigQueryExecuteQueryOperator,
- dag_id=TEST_DAG_ID,
- execution_date=DEFAULT_DATE,
- task_id=TASK_ID,
- sql="SELECT * FROM test_table",
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=self.deprecation_message):
+ ti = create_task_instance_of_operator(
+ BigQueryExecuteQueryOperator,
+ dag_id=TEST_DAG_ID,
+ execution_date=DEFAULT_DATE,
+ task_id=TASK_ID,
+ sql="SELECT * FROM test_table",
+ )
serialized_dag = dag_maker.get_serialized_data()
deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
assert hasattr(deserialized_dag.tasks[0], "sql")
@@ -757,13 +768,14 @@ class TestBigQueryOperator:
dag_maker,
create_task_instance_of_operator,
):
- ti = create_task_instance_of_operator(
- BigQueryExecuteQueryOperator,
- dag_id=TEST_DAG_ID,
- execution_date=DEFAULT_DATE,
- task_id=TASK_ID,
- sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"],
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=self.deprecation_message):
+ ti = create_task_instance_of_operator(
+ BigQueryExecuteQueryOperator,
+ dag_id=TEST_DAG_ID,
+ execution_date=DEFAULT_DATE,
+ task_id=TASK_ID,
+ sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"],
+ )
serialized_dag = dag_maker.get_serialized_data()
deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
assert hasattr(deserialized_dag.tasks[0], "sql")
@@ -801,12 +813,13 @@ class TestBigQueryOperator:
def test_bigquery_operator_extra_link_when_missing_job_id(
self, mock_hook, create_task_instance_of_operator
):
- ti = create_task_instance_of_operator(
- BigQueryExecuteQueryOperator,
- dag_id=TEST_DAG_ID,
- task_id=TASK_ID,
- sql="SELECT * FROM test_table",
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=self.deprecation_message):
+ ti = create_task_instance_of_operator(
+ BigQueryExecuteQueryOperator,
+ dag_id=TEST_DAG_ID,
+ task_id=TASK_ID,
+ sql="SELECT * FROM test_table",
+ )
bigquery_task = ti.task
assert "" == bigquery_task.get_extra_links(ti,
BigQueryConsoleLink.name)
@@ -817,13 +830,14 @@ class TestBigQueryOperator:
mock_hook,
create_task_instance_of_operator,
):
- ti = create_task_instance_of_operator(
- BigQueryExecuteQueryOperator,
- dag_id=TEST_DAG_ID,
- execution_date=DEFAULT_DATE,
- task_id=TASK_ID,
- sql="SELECT * FROM test_table",
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=self.deprecation_message):
+ ti = create_task_instance_of_operator(
+ BigQueryExecuteQueryOperator,
+ dag_id=TEST_DAG_ID,
+ execution_date=DEFAULT_DATE,
+ task_id=TASK_ID,
+ sql="SELECT * FROM test_table",
+ )
bigquery_task = ti.task
ti.xcom_push(key="job_id_path", value=TEST_FULL_JOB_ID)
@@ -837,13 +851,14 @@ class TestBigQueryOperator:
def test_bigquery_operator_extra_link_when_multiple_query(
self, mock_hook, create_task_instance_of_operator
):
- ti = create_task_instance_of_operator(
- BigQueryExecuteQueryOperator,
- dag_id=TEST_DAG_ID,
- execution_date=DEFAULT_DATE,
- task_id=TASK_ID,
- sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"],
- )
+ with pytest.warns(AirflowProviderDeprecationWarning,
match=self.deprecation_message):
+ ti = create_task_instance_of_operator(
+ BigQueryExecuteQueryOperator,
+ dag_id=TEST_DAG_ID,
+ execution_date=DEFAULT_DATE,
+ task_id=TASK_ID,
+ sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"],
+ )
bigquery_task = ti.task
ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID,
TEST_FULL_JOB_ID_2])