This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c5a495c2c Add encryption_configuration parameter to BigQuery 
operators (#40063)
5c5a495c2c is described below

commit 5c5a495c2cad9faa703eb8ccde47b368fb3eea9a
Author: VladaZakharova <[email protected]>
AuthorDate: Tue Jun 11 11:06:27 2024 +0200

    Add encryption_configuration parameter to BigQuery operators (#40063)
---
 .../providers/google/cloud/operators/bigquery.py   |  86 ++++++++---
 .../google/cloud/operators/test_bigquery.py        | 164 +++++++++++++++++++++
 2 files changed, 233 insertions(+), 17 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 846f884e77..fbce7ca768 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -150,7 +150,12 @@ class _BigQueryOperatorsEncryptionConfigurationMixin:
     # annotation of the `self`. Then you can inherit this class in the target 
operator.
     # e.g: BigQueryCheckOperator, BigQueryTableCheckOperator
     def include_encryption_configuration(  # type:ignore[misc]
-        self: BigQueryCheckOperator | BigQueryTableCheckOperator,
+        self: BigQueryCheckOperator
+        | BigQueryTableCheckOperator
+        | BigQueryValueCheckOperator
+        | BigQueryColumnCheckOperator
+        | BigQueryGetDataOperator
+        | BigQueryIntervalCheckOperator,
         configuration: dict,
         config_key: str,
     ) -> None:
@@ -206,7 +211,7 @@ class BigQueryCheckOperator(
         Token Creator IAM role to the directly preceding identity, with first
         account from the list granting this role to the originating account. 
(templated)
     :param labels: a dictionary containing labels for the table, passed to 
BigQuery.
-    :param encryption_configuration: [Optional] Custom encryption 
configuration (e.g., Cloud KMS keys).
+    :param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
 
         .. code-block:: python
 
@@ -327,7 +332,9 @@ class BigQueryCheckOperator(
         self.log.info("Success.")
 
 
-class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
+class BigQueryValueCheckOperator(
+    _BigQueryDbHookMixin, SQLValueCheckOperator, 
_BigQueryOperatorsEncryptionConfigurationMixin
+):
     """Perform a simple value check using sql code.
 
     .. seealso::
@@ -337,6 +344,13 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
     :param sql: SQL to execute.
     :param use_legacy_sql: Whether to use legacy SQL (true)
         or standard SQL (false).
+    :param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
+
+        .. code-block:: python
+
+            encryption_configuration = {
+                "kmsKeyName": 
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+            }
     :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
     :param location: The geographic location of the job. See details at:
         
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
@@ -371,6 +385,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
         sql: str,
         pass_value: Any,
         tolerance: Any = None,
+        encryption_configuration: dict | None = None,
         gcp_conn_id: str = "google_cloud_default",
         use_legacy_sql: bool = True,
         location: str | None = None,
@@ -384,6 +399,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
         self.location = location
         self.gcp_conn_id = gcp_conn_id
         self.use_legacy_sql = use_legacy_sql
+        self.encryption_configuration = encryption_configuration
         self.impersonation_chain = impersonation_chain
         self.labels = labels
         self.deferrable = deferrable
@@ -402,6 +418,8 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
             },
         }
 
+        self.include_encryption_configuration(configuration, "query")
+
         return hook.insert_job(
             configuration=configuration,
             project_id=hook.project_id,
@@ -461,7 +479,9 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, 
SQLValueCheckOperator):
         )
 
 
-class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperator):
+class BigQueryIntervalCheckOperator(
+    _BigQueryDbHookMixin, SQLIntervalCheckOperator, 
_BigQueryOperatorsEncryptionConfigurationMixin
+):
     """
     Check that the values of metrics given as SQL expressions are within a 
tolerance of the older ones.
 
@@ -482,6 +502,13 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperat
         between the current day, and the prior days_back.
     :param use_legacy_sql: Whether to use legacy SQL (true)
         or standard SQL (false).
+    :param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
+
+        .. code-block:: python
+
+            encryption_configuration = {
+                "kmsKeyName": 
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+            }
     :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
     :param location: The geographic location of the job. See details at:
         
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
@@ -521,6 +548,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperat
         gcp_conn_id: str = "google_cloud_default",
         use_legacy_sql: bool = True,
         location: str | None = None,
+        encryption_configuration: dict | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         labels: dict | None = None,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
@@ -539,6 +567,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperat
         self.gcp_conn_id = gcp_conn_id
         self.use_legacy_sql = use_legacy_sql
         self.location = location
+        self.encryption_configuration = encryption_configuration
         self.impersonation_chain = impersonation_chain
         self.labels = labels
         self.project_id = project_id
@@ -553,6 +582,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperat
     ) -> BigQueryJob:
         """Submit a new job and get the job id for polling the status using 
Triggerer."""
         configuration = {"query": {"query": sql, "useLegacySql": 
self.use_legacy_sql}}
+        self.include_encryption_configuration(configuration, "query")
         return hook.insert_job(
             configuration=configuration,
             project_id=self.project_id or hook.project_id,
@@ -609,7 +639,9 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperat
         )
 
 
-class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, 
SQLColumnCheckOperator):
+class BigQueryColumnCheckOperator(
+    _BigQueryDbHookMixin, SQLColumnCheckOperator, 
_BigQueryOperatorsEncryptionConfigurationMixin
+):
     """
     Subclasses the SQLColumnCheckOperator in order to provide a job id for 
OpenLineage to parse.
 
@@ -624,6 +656,13 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, 
SQLColumnCheckOperator):
     :param partition_clause: a string SQL statement added to a WHERE clause
         to partition data
     :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
+    :param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
+
+        .. code-block:: python
+
+            encryption_configuration = {
+                "kmsKeyName": 
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+            }
     :param use_legacy_sql: Whether to use legacy SQL (true)
         or standard SQL (false).
     :param location: The geographic location of the job. See details at:
@@ -651,6 +690,7 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, 
SQLColumnCheckOperator):
         partition_clause: str | None = None,
         database: str | None = None,
         accept_none: bool = True,
+        encryption_configuration: dict | None = None,
         gcp_conn_id: str = "google_cloud_default",
         use_legacy_sql: bool = True,
         location: str | None = None,
@@ -672,6 +712,7 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, 
SQLColumnCheckOperator):
         self.database = database
         self.accept_none = accept_none
         self.gcp_conn_id = gcp_conn_id
+        self.encryption_configuration = encryption_configuration
         self.use_legacy_sql = use_legacy_sql
         self.location = location
         self.impersonation_chain = impersonation_chain
@@ -684,7 +725,7 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, 
SQLColumnCheckOperator):
     ) -> BigQueryJob:
         """Submit a new job and get the job id for polling the status using 
Trigger."""
         configuration = {"query": {"query": self.sql, "useLegacySql": 
self.use_legacy_sql}}
-
+        self.include_encryption_configuration(configuration, "query")
         return hook.insert_job(
             configuration=configuration,
             project_id=hook.project_id,
@@ -766,7 +807,7 @@ class BigQueryTableCheckOperator(
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
     :param labels: a dictionary containing labels for the table, passed to 
BigQuery
-    :param encryption_configuration: [Optional] Custom encryption 
configuration (e.g., Cloud KMS keys).
+    :param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
 
         .. code-block:: python
 
@@ -852,7 +893,7 @@ class BigQueryTableCheckOperator(
         self.log.info("All tests have passed")
 
 
-class BigQueryGetDataOperator(GoogleCloudBaseOperator):
+class BigQueryGetDataOperator(GoogleCloudBaseOperator, 
_BigQueryOperatorsEncryptionConfigurationMixin):
     """
     Fetch data and return it, either from a BigQuery table, or results of a 
query job.
 
@@ -921,6 +962,13 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
         from the table. (templated)
     :param selected_fields: List of fields to return (comma-separated). If
         unspecified, all fields are returned.
+    :param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
+
+        .. code-block:: python
+
+            encryption_configuration = {
+                "kmsKeyName": 
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+            }
     :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
     :param location: The location used for the operation.
     :param impersonation_chain: Optional service account to impersonate using 
short-term
@@ -965,6 +1013,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
         selected_fields: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
         location: str | None = None,
+        encryption_configuration: dict | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: float = 4.0,
@@ -984,6 +1033,7 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
         self.gcp_conn_id = gcp_conn_id
         self.location = location
         self.impersonation_chain = impersonation_chain
+        self.encryption_configuration = encryption_configuration
         self.project_id = project_id
         self.deferrable = deferrable
         self.poll_interval = poll_interval
@@ -997,6 +1047,8 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
     ) -> BigQueryJob:
         get_query = self.generate_query(hook=hook)
         configuration = {"query": {"query": get_query, "useLegacySql": 
self.use_legacy_sql}}
+        self.include_encryption_configuration(configuration, "query")
+
         """Submit a new job and get the job id for polling the status using 
Triggerer."""
         return hook.insert_job(
             configuration=configuration,
@@ -1199,7 +1251,7 @@ class 
BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
     :param location: The geographic location of the job. Required except for
         US and EU. See details at
         
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
-    :param encryption_configuration: [Optional] Custom encryption 
configuration (e.g., Cloud KMS keys).
+    :param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
 
         .. code-block:: python
 
@@ -1393,9 +1445,9 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
 
         .. seealso::
             
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning
-    :param gcp_conn_id: [Optional] The connection ID used to connect to Google 
Cloud and
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud and
         interact with the Bigquery service.
-    :param google_cloud_storage_conn_id: [Optional] The connection ID used to 
connect to Google Cloud.
+    :param google_cloud_storage_conn_id: (Optional) The connection ID used to 
connect to Google Cloud.
         and interact with the Google Cloud Storage service.
     :param labels: a dictionary containing labels for the table, passed to 
BigQuery
 
@@ -1433,13 +1485,13 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
             google_cloud_storage_conn_id="airflow-conn-id",
         )
 
-    :param view: [Optional] A dictionary containing definition for the view.
+    :param view: (Optional) A dictionary containing definition for the view.
         If set, it will create a view instead of a table:
 
         .. seealso::
             
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
-    :param materialized_view: [Optional] The materialized view definition.
-    :param encryption_configuration: [Optional] Custom encryption 
configuration (e.g., Cloud KMS keys).
+    :param materialized_view: (Optional) The materialized view definition.
+    :param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
 
         .. code-block:: python
 
@@ -1447,7 +1499,7 @@ class 
BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
                 "kmsKeyName": 
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
             }
     :param location: The location used for the operation.
-    :param cluster_fields: [Optional] The fields used for clustering.
+    :param cluster_fields: (Optional) The fields used for clustering.
             BigQuery supports clustering for both partitioned and
             non-partitioned tables.
 
@@ -1645,7 +1697,7 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
     :param autodetect: Try to detect schema and format options automatically.
         The schema_fields and schema_object options will be honored when 
specified explicitly.
         
https://cloud.google.com/bigquery/docs/schema-detect#schema_auto-detection_for_external_data_sources
-    :param compression: [Optional] The compression type of the data source.
+    :param compression: (Optional) The compression type of the data source.
         Possible values include GZIP and NONE.
         The default value is NONE.
         This setting is ignored for Google Cloud Bigtable,
@@ -1667,7 +1719,7 @@ class 
BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
         and interact with the Google Cloud Storage service.
     :param src_fmt_configs: configure optional fields specific to the source 
format
     :param labels: a dictionary containing labels for the table, passed to 
BigQuery
-    :param encryption_configuration: [Optional] Custom encryption 
configuration (e.g., Cloud KMS keys).
+    :param encryption_configuration: (Optional) Custom encryption 
configuration (e.g., Cloud KMS keys).
 
         .. code-block:: python
 
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index 16c9cbdb82..3fa3446761 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -1062,6 +1062,49 @@ class TestBigQueryGetDataOperator:
             operator.execute_complete(context=None, event={"status": 
"success", "records": [20]})
         mock_log_info.assert_called_with("Total extracted rows: %s", 1)
 
+    @pytest.mark.parametrize("as_dict", [True, False])
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+    def test_encryption_configuration(self, mock_job, mock_hook, as_dict):
+        encryption_configuration = {
+            "kmsKeyName": 
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+        }
+
+        mock_hook.return_value.insert_job.return_value = mock_job
+        mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID
+
+        max_results = 1
+        selected_fields = "DATE"
+        operator = BigQueryGetDataOperator(
+            job_project_id=TEST_GCP_PROJECT_ID,
+            gcp_conn_id=GCP_CONN_ID,
+            task_id=TASK_ID,
+            job_id="",
+            max_results=max_results,
+            dataset_id=TEST_DATASET,
+            table_id=TEST_TABLE_ID,
+            selected_fields=selected_fields,
+            location=TEST_DATASET_LOCATION,
+            as_dict=as_dict,
+            encryption_configuration=encryption_configuration,
+            deferrable=True,
+        )
+        with pytest.raises(TaskDeferred):
+            operator.execute(MagicMock())
+        mock_hook.return_value.insert_job.assert_called_with(
+            configuration={
+                "query": {
+                    "query": f"""select DATE from 
`{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}` limit 1""",
+                    "useLegacySql": True,
+                    "destinationEncryptionConfiguration": 
encryption_configuration,
+                }
+            },
+            project_id=TEST_GCP_PROJECT_ID,
+            location=TEST_DATASET_LOCATION,
+            job_id="",
+            nowait=True,
+        )
+
 
 class TestBigQueryTableDeleteOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -2137,6 +2180,40 @@ class TestBigQueryIntervalCheckOperator:
             nowait=True,
         )
 
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+    def test_encryption_configuration_deferrable_mode(self, mock_job, 
mock_hook):
+        encryption_configuration = {
+            "kmsKeyName": 
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+        }
+
+        mock_hook.return_value.insert_job.return_value = mock_job
+        mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID
+
+        operator = BigQueryIntervalCheckOperator(
+            task_id="TASK_ID",
+            encryption_configuration=encryption_configuration,
+            location=TEST_DATASET_LOCATION,
+            metrics_thresholds={"COUNT(*)": 1.5},
+            table=TEST_TABLE_ID,
+            deferrable=True,
+        )
+        with pytest.raises(TaskDeferred):
+            operator.execute(MagicMock())
+        mock_hook.return_value.insert_job.assert_called_with(
+            configuration={
+                "query": {
+                    "query": """SELECT COUNT(*) FROM test-table-id WHERE 
ds='{{ macros.ds_add(ds, -7) }}'""",
+                    "useLegacySql": True,
+                    "destinationEncryptionConfiguration": 
encryption_configuration,
+                }
+            },
+            project_id=TEST_GCP_PROJECT_ID,
+            location=TEST_DATASET_LOCATION,
+            job_id="",
+            nowait=True,
+        )
+
 
 class TestBigQueryCheckOperator:
     @pytest.mark.db_test
@@ -2425,6 +2502,46 @@ class TestBigQueryValueCheckOperator:
                 context=None, event={"status": "error", "message": "test 
failure message"}
             )
 
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+    def test_encryption_configuration_deferrable_mode(self, mock_job, 
mock_hook):
+        encryption_configuration = {
+            "kmsKeyName": 
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+        }
+
+        mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+            {
+                "check_name": ["row_count_check"],
+                "check_result": [1],
+            }
+        )
+        mock_hook.return_value.insert_job.return_value = mock_job
+        mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID
+
+        operator = BigQueryValueCheckOperator(
+            task_id="TASK_ID",
+            encryption_configuration=encryption_configuration,
+            location=TEST_DATASET_LOCATION,
+            pass_value=2,
+            sql=f"SELECT COUNT(*) FROM {TEST_DATASET}.{TEST_TABLE_ID}",
+            deferrable=True,
+        )
+        with pytest.raises(TaskDeferred):
+            operator.execute(MagicMock())
+        mock_hook.return_value.insert_job.assert_called_with(
+            configuration={
+                "query": {
+                    "query": f"""SELECT COUNT(*) FROM 
{TEST_DATASET}.{TEST_TABLE_ID}""",
+                    "useLegacySql": True,
+                    "destinationEncryptionConfiguration": 
encryption_configuration,
+                }
+            },
+            project_id=TEST_GCP_PROJECT_ID,
+            location=TEST_DATASET_LOCATION,
+            job_id="",
+            nowait=True,
+        )
+
 
 @pytest.mark.db_test
 class TestBigQueryColumnCheckOperator:
@@ -2495,6 +2612,53 @@ class TestBigQueryColumnCheckOperator:
         with pytest.raises(AirflowException):
             ti.task.execute(MagicMock())
 
+    @pytest.mark.parametrize(
+        "check_type, check_value, check_result",
+        [
+            ("equal_to", 0, 0),
+            ("greater_than", 0, 1),
+            ("less_than", 0, -1),
+        ],
+    )
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+    def test_encryption_configuration(self, mock_job, mock_hook, check_type, 
check_value, check_result):
+        encryption_configuration = {
+            "kmsKeyName": 
"projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
+        }
+
+        mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+            {"col_name": ["col1"], "check_type": ["min"], "check_result": 
[check_result]}
+        )
+        mock_hook.return_value.insert_job.return_value = mock_job
+        mock_hook.return_value.project_id = TEST_GCP_PROJECT_ID
+
+        operator = BigQueryColumnCheckOperator(
+            task_id="TASK_ID",
+            encryption_configuration=encryption_configuration,
+            table=f"{TEST_DATASET}.{TEST_TABLE_ID}",
+            column_mapping={"col1": {"min": {check_type: check_value}}},
+            location=TEST_DATASET_LOCATION,
+        )
+
+        operator.execute(MagicMock())
+        mock_hook.return_value.insert_job.assert_called_with(
+            configuration={
+                "query": {
+                    "query": f"""SELECT col_name, check_type, check_result 
FROM (
+        SELECT 'col1' AS col_name, 'min' AS check_type, col1_min AS 
check_result
+        FROM (SELECT MIN(col1) AS col1_min FROM {TEST_DATASET}.{TEST_TABLE_ID} 
) AS sq
+    ) AS check_columns""",
+                    "useLegacySql": True,
+                    "destinationEncryptionConfiguration": 
encryption_configuration,
+                }
+            },
+            project_id=TEST_GCP_PROJECT_ID,
+            location=TEST_DATASET_LOCATION,
+            job_id="",
+            nowait=False,
+        )
+
 
 class TestBigQueryTableCheckOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")

Reply via email to