This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 930db714f7 Resolve deprecations in `BigQuery` operators (#40182)
930db714f7 is described below

commit 930db714f7a11b5506402b286411d16aaaaedda7
Author: Bora Berke Sahin <[email protected]>
AuthorDate: Wed Jun 12 17:43:25 2024 +0300

    Resolve deprecations in `BigQuery` operators (#40182)
---
 tests/deprecations_ignore.yml                      |  12 -
 .../google/cloud/operators/test_bigquery.py        | 379 +++++++++++----------
 2 files changed, 197 insertions(+), 194 deletions(-)

diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index bed0c8dcbf..a8debe085b 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -191,19 +191,7 @@
 - 
tests/providers/google/cloud/hooks/vertex_ai/test_custom_job.py::TestCustomJobWithoutDefaultProjectIdHook::test_delete_pipeline_job
 - 
tests/providers/google/cloud/hooks/vertex_ai/test_custom_job.py::TestCustomJobWithoutDefaultProjectIdHook::test_get_pipeline_job
 - 
tests/providers/google/cloud/hooks/vertex_ai/test_custom_job.py::TestCustomJobWithoutDefaultProjectIdHook::test_list_pipeline_jobs
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryCreateExternalTableOperator::test_execute_with_csv_format
 - 
tests/providers/google/cloud/operators/test_automl.py::TestAutoMLTrainModelOperator::test_execute
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryCreateExternalTableOperator::test_execute_with_parquet_format
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_defaults
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_link_when_missing_job_id
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_link_when_multiple_query
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_link_when_single_query
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_serialized_field_when_multiple_queries
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_bigquery_operator_extra_serialized_field_when_single_query
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_execute
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_execute_bad_type
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryOperator::test_execute_list
-- 
tests/providers/google/cloud/operators/test_bigquery.py::TestBigQueryPatchDatasetOperator::test_execute
 - 
tests/providers/google/cloud/operators/test_dataflow.py::TestDataflowCreateJavaJobOperator::test_check_job_not_running_exec
 - 
tests/providers/google/cloud/operators/test_dataflow.py::TestDataflowCreateJavaJobOperator::test_check_job_running_exec
 - 
tests/providers/google/cloud/operators/test_dataflow.py::TestDataflowCreateJavaJobOperator::test_check_multiple_job_exec
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index 3fa3446761..6a92fe8089 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -29,7 +29,13 @@ from google.cloud.exceptions import Conflict
 from openlineage.client.facet import ErrorMessageRunFacet, 
ExternalQueryRunFacet, SqlJobFacet
 from openlineage.client.run import Dataset
 
-from airflow.exceptions import AirflowException, AirflowSkipException, 
AirflowTaskTimeout, TaskDeferred
+from airflow.exceptions import (
+    AirflowException,
+    AirflowProviderDeprecationWarning,
+    AirflowSkipException,
+    AirflowTaskTimeout,
+    TaskDeferred,
+)
 from airflow.providers.google.cloud.operators.bigquery import (
     BigQueryCheckOperator,
     BigQueryColumnCheckOperator,
@@ -249,15 +255,38 @@ class TestBigQueryCreateEmptyTableOperator:
 class TestBigQueryCreateExternalTableOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute_with_csv_format(self, mock_hook):
+        table_resource = {
+            "tableReference": {
+                "projectId": TEST_GCP_PROJECT_ID,
+                "datasetId": TEST_DATASET,
+                "tableId": TEST_TABLE_ID,
+            },
+            "labels": None,
+            "schema": {"fields": []},
+            "externalDataConfiguration": {
+                "source_uris": [
+                    f"gs://{TEST_GCS_BUCKET}/{source_object}" for 
source_object in TEST_GCS_CSV_DATA
+                ],
+                "source_format": TEST_SOURCE_CSV_FORMAT,
+                "maxBadRecords": 0,
+                "autodetect": True,
+                "compression": "NONE",
+                "csvOptions": {
+                    "fieldDelimiter": ",",
+                    "skipLeadingRows": 0,
+                    "quote": None,
+                    "allowQuotedNewlines": False,
+                    "allowJaggedRows": False,
+                },
+            },
+            "location": None,
+            "encryptionConfiguration": None,
+        }
         operator = BigQueryCreateExternalTableOperator(
             task_id=TASK_ID,
-            
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
-            schema_fields=[],
             bucket=TEST_GCS_BUCKET,
-            gcs_schema_bucket=TEST_GCS_BUCKET,
             source_objects=TEST_GCS_CSV_DATA,
-            source_format=TEST_SOURCE_CSV_FORMAT,
-            autodetect=True,
+            table_resource=table_resource,
         )
 
         mock_hook.return_value.split_tablename.return_value = (
@@ -267,47 +296,35 @@ class TestBigQueryCreateExternalTableOperator:
         )
 
         operator.execute(context=MagicMock())
-        mock_hook.return_value.create_empty_table.assert_called_once_with(
-            table_resource={
-                "tableReference": {
-                    "projectId": TEST_GCP_PROJECT_ID,
-                    "datasetId": TEST_DATASET,
-                    "tableId": TEST_TABLE_ID,
-                },
-                "labels": None,
-                "schema": {"fields": []},
-                "externalDataConfiguration": {
-                    "source_uris": [
-                        f"gs://{TEST_GCS_BUCKET}/{source_object}" for 
source_object in TEST_GCS_CSV_DATA
-                    ],
-                    "source_format": TEST_SOURCE_CSV_FORMAT,
-                    "maxBadRecords": 0,
-                    "autodetect": True,
-                    "compression": "NONE",
-                    "csvOptions": {
-                        "fieldDelimiter": ",",
-                        "skipLeadingRows": 0,
-                        "quote": None,
-                        "allowQuotedNewlines": False,
-                        "allowJaggedRows": False,
-                    },
-                },
-                "location": None,
-                "encryptionConfiguration": None,
-            }
-        )
+        
mock_hook.return_value.create_empty_table.assert_called_once_with(table_resource=table_resource)
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute_with_parquet_format(self, mock_hook):
+        table_resource = {
+            "tableReference": {
+                "projectId": TEST_GCP_PROJECT_ID,
+                "datasetId": TEST_DATASET,
+                "tableId": TEST_TABLE_ID,
+            },
+            "labels": None,
+            "schema": {"fields": []},
+            "externalDataConfiguration": {
+                "source_uris": [
+                    f"gs://{TEST_GCS_BUCKET}/{source_object}" for 
source_object in TEST_GCS_PARQUET_DATA
+                ],
+                "source_format": TEST_SOURCE_PARQUET_FORMAT,
+                "maxBadRecords": 0,
+                "autodetect": True,
+                "compression": "NONE",
+            },
+            "location": None,
+            "encryptionConfiguration": None,
+        }
         operator = BigQueryCreateExternalTableOperator(
             task_id=TASK_ID,
-            
destination_project_dataset_table=f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}",
-            schema_fields=[],
             bucket=TEST_GCS_BUCKET,
-            gcs_schema_bucket=TEST_GCS_BUCKET,
             source_objects=TEST_GCS_PARQUET_DATA,
-            source_format=TEST_SOURCE_PARQUET_FORMAT,
-            autodetect=True,
+            table_resource=table_resource,
         )
 
         mock_hook.return_value.split_tablename.return_value = (
@@ -317,28 +334,7 @@ class TestBigQueryCreateExternalTableOperator:
         )
 
         operator.execute(context=MagicMock())
-        mock_hook.return_value.create_empty_table.assert_called_once_with(
-            table_resource={
-                "tableReference": {
-                    "projectId": TEST_GCP_PROJECT_ID,
-                    "datasetId": TEST_DATASET,
-                    "tableId": TEST_TABLE_ID,
-                },
-                "labels": None,
-                "schema": {"fields": []},
-                "externalDataConfiguration": {
-                    "source_uris": [
-                        f"gs://{TEST_GCS_BUCKET}/{source_object}" for 
source_object in TEST_GCS_PARQUET_DATA
-                    ],
-                    "source_format": TEST_SOURCE_PARQUET_FORMAT,
-                    "maxBadRecords": 0,
-                    "autodetect": True,
-                    "compression": "NONE",
-                },
-                "location": None,
-                "encryptionConfiguration": None,
-            }
-        )
+        
mock_hook.return_value.create_empty_table.assert_called_once_with(table_resource=table_resource)
 
 
 class TestBigQueryDeleteDatasetOperator:
@@ -478,12 +474,17 @@ class TestBigQueryPatchDatasetOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute(self, mock_hook):
         dataset_resource = {"friendlyName": "Test DS"}
-        operator = BigQueryPatchDatasetOperator(
-            dataset_resource=dataset_resource,
-            task_id=TASK_ID,
-            dataset_id=TEST_DATASET,
-            project_id=TEST_GCP_PROJECT_ID,
-        )
+        deprecation_message = (
+            r"Call to deprecated class BigQueryPatchDatasetOperator\. "
+            r"\(This operator is deprecated\. Please use 
BigQueryUpdateDatasetOperator\.\)"
+        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match=deprecation_message):
+            operator = BigQueryPatchDatasetOperator(
+                dataset_resource=dataset_resource,
+                task_id=TASK_ID,
+                dataset_id=TEST_DATASET,
+                project_id=TEST_GCP_PROJECT_ID,
+            )
 
         operator.execute(None)
         mock_hook.return_value.patch_dataset.assert_called_once_with(
@@ -513,6 +514,11 @@ class TestBigQueryUpdateDatasetOperator:
 
 @pytest.mark.db_test
 class TestBigQueryOperator:
+    deprecation_message = (
+        r"Call to deprecated class BigQueryExecuteQueryOperator\. "
+        r"\(This operator is deprecated\. Please use 
`BigQueryInsertJobOperator`\.\)"
+    )
+
     def teardown_method(self):
         clear_db_xcom()
         clear_db_runs()
@@ -523,33 +529,34 @@ class TestBigQueryOperator:
     def test_execute(self, mock_hook):
         encryption_configuration = {"key": "kk"}
 
-        operator = BigQueryExecuteQueryOperator(
-            task_id=TASK_ID,
-            sql="Select * from test_table",
-            destination_dataset_table=None,
-            write_disposition="WRITE_EMPTY",
-            allow_large_results=False,
-            flatten_results=None,
-            gcp_conn_id="google_cloud_default",
-            udf_config=None,
-            use_legacy_sql=True,
-            maximum_billing_tier=None,
-            maximum_bytes_billed=None,
-            create_disposition="CREATE_IF_NEEDED",
-            schema_update_options=(),
-            query_params=None,
-            labels=None,
-            priority="INTERACTIVE",
-            time_partitioning=None,
-            api_resource_configs=None,
-            cluster_fields=None,
-            encryption_configuration=encryption_configuration,
-            
impersonation_chain=["[email protected]"],
-            impersonation_scopes=[
-                "https://www.googleapis.com/auth/cloud-platform";,
-                "https://www.googleapis.com/auth/drive";,
-            ],
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match=self.deprecation_message):
+            operator = BigQueryExecuteQueryOperator(
+                task_id=TASK_ID,
+                sql="Select * from test_table",
+                destination_dataset_table=None,
+                write_disposition="WRITE_EMPTY",
+                allow_large_results=False,
+                flatten_results=None,
+                gcp_conn_id="google_cloud_default",
+                udf_config=None,
+                use_legacy_sql=True,
+                maximum_billing_tier=None,
+                maximum_bytes_billed=None,
+                create_disposition="CREATE_IF_NEEDED",
+                schema_update_options=(),
+                query_params=None,
+                labels=None,
+                priority="INTERACTIVE",
+                time_partitioning=None,
+                api_resource_configs=None,
+                cluster_fields=None,
+                encryption_configuration=encryption_configuration,
+                
impersonation_chain=["[email protected]"],
+                impersonation_scopes=[
+                    "https://www.googleapis.com/auth/cloud-platform";,
+                    "https://www.googleapis.com/auth/drive";,
+                ],
+            )
 
         operator.execute(MagicMock())
         mock_hook.assert_called_with(
@@ -584,31 +591,32 @@ class TestBigQueryOperator:
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute_list(self, mock_hook):
-        operator = BigQueryExecuteQueryOperator(
-            task_id=TASK_ID,
-            sql=[
-                "Select * from test_table",
-                "Select * from other_test_table",
-            ],
-            destination_dataset_table=None,
-            write_disposition="WRITE_EMPTY",
-            allow_large_results=False,
-            flatten_results=None,
-            gcp_conn_id="google_cloud_default",
-            udf_config=None,
-            use_legacy_sql=True,
-            maximum_billing_tier=None,
-            maximum_bytes_billed=None,
-            create_disposition="CREATE_IF_NEEDED",
-            schema_update_options=(),
-            query_params=None,
-            labels=None,
-            priority="INTERACTIVE",
-            time_partitioning=None,
-            api_resource_configs=None,
-            cluster_fields=None,
-            encryption_configuration=None,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match=self.deprecation_message):
+            operator = BigQueryExecuteQueryOperator(
+                task_id=TASK_ID,
+                sql=[
+                    "Select * from test_table",
+                    "Select * from other_test_table",
+                ],
+                destination_dataset_table=None,
+                write_disposition="WRITE_EMPTY",
+                allow_large_results=False,
+                flatten_results=None,
+                gcp_conn_id="google_cloud_default",
+                udf_config=None,
+                use_legacy_sql=True,
+                maximum_billing_tier=None,
+                maximum_bytes_billed=None,
+                create_disposition="CREATE_IF_NEEDED",
+                schema_update_options=(),
+                query_params=None,
+                labels=None,
+                priority="INTERACTIVE",
+                time_partitioning=None,
+                api_resource_configs=None,
+                cluster_fields=None,
+                encryption_configuration=None,
+            )
 
         operator.execute(MagicMock())
         mock_hook.return_value.run_query.assert_has_calls(
@@ -656,40 +664,42 @@ class TestBigQueryOperator:
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute_bad_type(self, mock_hook):
-        operator = BigQueryExecuteQueryOperator(
-            task_id=TASK_ID,
-            sql=1,
-            destination_dataset_table=None,
-            write_disposition="WRITE_EMPTY",
-            allow_large_results=False,
-            flatten_results=None,
-            gcp_conn_id="google_cloud_default",
-            udf_config=None,
-            use_legacy_sql=True,
-            maximum_billing_tier=None,
-            maximum_bytes_billed=None,
-            create_disposition="CREATE_IF_NEEDED",
-            schema_update_options=(),
-            query_params=None,
-            labels=None,
-            priority="INTERACTIVE",
-            time_partitioning=None,
-            api_resource_configs=None,
-            cluster_fields=None,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match=self.deprecation_message):
+            operator = BigQueryExecuteQueryOperator(
+                task_id=TASK_ID,
+                sql=1,
+                destination_dataset_table=None,
+                write_disposition="WRITE_EMPTY",
+                allow_large_results=False,
+                flatten_results=None,
+                gcp_conn_id="google_cloud_default",
+                udf_config=None,
+                use_legacy_sql=True,
+                maximum_billing_tier=None,
+                maximum_bytes_billed=None,
+                create_disposition="CREATE_IF_NEEDED",
+                schema_update_options=(),
+                query_params=None,
+                labels=None,
+                priority="INTERACTIVE",
+                time_partitioning=None,
+                api_resource_configs=None,
+                cluster_fields=None,
+            )
 
         with pytest.raises(AirflowException):
             operator.execute(MagicMock())
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_bigquery_operator_defaults(self, mock_hook, 
create_task_instance_of_operator):
-        ti = create_task_instance_of_operator(
-            BigQueryExecuteQueryOperator,
-            dag_id=TEST_DAG_ID,
-            task_id=TASK_ID,
-            sql="Select * from test_table",
-            schema_update_options=None,
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match=self.deprecation_message):
+            ti = create_task_instance_of_operator(
+                BigQueryExecuteQueryOperator,
+                dag_id=TEST_DAG_ID,
+                task_id=TASK_ID,
+                sql="Select * from test_table",
+                schema_update_options=None,
+            )
         operator = ti.task
 
         operator.execute(MagicMock())
@@ -722,13 +732,14 @@ class TestBigQueryOperator:
         dag_maker,
         create_task_instance_of_operator,
     ):
-        ti = create_task_instance_of_operator(
-            BigQueryExecuteQueryOperator,
-            dag_id=TEST_DAG_ID,
-            execution_date=DEFAULT_DATE,
-            task_id=TASK_ID,
-            sql="SELECT * FROM test_table",
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match=self.deprecation_message):
+            ti = create_task_instance_of_operator(
+                BigQueryExecuteQueryOperator,
+                dag_id=TEST_DAG_ID,
+                execution_date=DEFAULT_DATE,
+                task_id=TASK_ID,
+                sql="SELECT * FROM test_table",
+            )
         serialized_dag = dag_maker.get_serialized_data()
         deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
         assert hasattr(deserialized_dag.tasks[0], "sql")
@@ -757,13 +768,14 @@ class TestBigQueryOperator:
         dag_maker,
         create_task_instance_of_operator,
     ):
-        ti = create_task_instance_of_operator(
-            BigQueryExecuteQueryOperator,
-            dag_id=TEST_DAG_ID,
-            execution_date=DEFAULT_DATE,
-            task_id=TASK_ID,
-            sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"],
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match=self.deprecation_message):
+            ti = create_task_instance_of_operator(
+                BigQueryExecuteQueryOperator,
+                dag_id=TEST_DAG_ID,
+                execution_date=DEFAULT_DATE,
+                task_id=TASK_ID,
+                sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"],
+            )
         serialized_dag = dag_maker.get_serialized_data()
         deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag["dag"])
         assert hasattr(deserialized_dag.tasks[0], "sql")
@@ -801,12 +813,13 @@ class TestBigQueryOperator:
     def test_bigquery_operator_extra_link_when_missing_job_id(
         self, mock_hook, create_task_instance_of_operator
     ):
-        ti = create_task_instance_of_operator(
-            BigQueryExecuteQueryOperator,
-            dag_id=TEST_DAG_ID,
-            task_id=TASK_ID,
-            sql="SELECT * FROM test_table",
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match=self.deprecation_message):
+            ti = create_task_instance_of_operator(
+                BigQueryExecuteQueryOperator,
+                dag_id=TEST_DAG_ID,
+                task_id=TASK_ID,
+                sql="SELECT * FROM test_table",
+            )
         bigquery_task = ti.task
 
         assert "" == bigquery_task.get_extra_links(ti, 
BigQueryConsoleLink.name)
@@ -817,13 +830,14 @@ class TestBigQueryOperator:
         mock_hook,
         create_task_instance_of_operator,
     ):
-        ti = create_task_instance_of_operator(
-            BigQueryExecuteQueryOperator,
-            dag_id=TEST_DAG_ID,
-            execution_date=DEFAULT_DATE,
-            task_id=TASK_ID,
-            sql="SELECT * FROM test_table",
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match=self.deprecation_message):
+            ti = create_task_instance_of_operator(
+                BigQueryExecuteQueryOperator,
+                dag_id=TEST_DAG_ID,
+                execution_date=DEFAULT_DATE,
+                task_id=TASK_ID,
+                sql="SELECT * FROM test_table",
+            )
         bigquery_task = ti.task
 
         ti.xcom_push(key="job_id_path", value=TEST_FULL_JOB_ID)
@@ -837,13 +851,14 @@ class TestBigQueryOperator:
     def test_bigquery_operator_extra_link_when_multiple_query(
         self, mock_hook, create_task_instance_of_operator
     ):
-        ti = create_task_instance_of_operator(
-            BigQueryExecuteQueryOperator,
-            dag_id=TEST_DAG_ID,
-            execution_date=DEFAULT_DATE,
-            task_id=TASK_ID,
-            sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"],
-        )
+        with pytest.warns(AirflowProviderDeprecationWarning, 
match=self.deprecation_message):
+            ti = create_task_instance_of_operator(
+                BigQueryExecuteQueryOperator,
+                dag_id=TEST_DAG_ID,
+                execution_date=DEFAULT_DATE,
+                task_id=TASK_ID,
+                sql=["SELECT * FROM test_table", "SELECT * FROM test_table2"],
+            )
         bigquery_task = ti.task
 
         ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID, 
TEST_FULL_JOB_ID_2])

Reply via email to