This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e80dc6afa Add support for query parameters to BigQueryCheckOperator 
(#40556) (#40558)
7e80dc6afa is described below

commit 7e80dc6afa665bed166f4b78a439ba8ce225dc28
Author: Alden S Page <[email protected]>
AuthorDate: Wed Jul 3 04:13:40 2024 -0400

    Add support for query parameters to BigQueryCheckOperator (#40556) (#40558)
    
    * Add support for query parameters to BigQueryCheckOperator (#40556)
    
    Remove unnecessary space
    
    * Add a unit test for BigQueryCheckOperator query params; fix missing 
'self' reference
    
    * Fix lint (#40558)
    
    ---------
    
    Co-authored-by: Alden S. Page <[email protected]>
---
 .../providers/google/cloud/operators/bigquery.py   | 11 ++++++++
 .../google/cloud/operators/test_bigquery.py        | 31 +++++++++++++++++++++-
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index d3f79c9bbd..43131c549a 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -222,6 +222,12 @@ class BigQueryCheckOperator(
     :param deferrable: Run operator in the deferrable mode.
     :param poll_interval: (Deferrable mode only) polling period in seconds to
         check for the status of job.
+    :param query_params: a list of dictionary containing query parameter types 
and
+        values, passed to BigQuery. The structure of dictionary should look 
like
+        'queryParameters' in Google BigQuery Jobs API:
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs.
+        For example, [{ 'name': 'corpus', 'parameterType': { 'type': 'STRING' 
},
+        'parameterValue': { 'value': 'romeoandjuliet' } }]. (templated)
     """
 
     template_fields: Sequence[str] = (
@@ -229,6 +235,7 @@ class BigQueryCheckOperator(
         "gcp_conn_id",
         "impersonation_chain",
         "labels",
+        "query_params",
     )
     template_ext: Sequence[str] = (".sql",)
     ui_color = BigQueryUIColors.CHECK.value
@@ -246,6 +253,7 @@ class BigQueryCheckOperator(
         encryption_configuration: dict | None = None,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: float = 4.0,
+        query_params: list | None = None,
         **kwargs,
     ) -> None:
         super().__init__(sql=sql, **kwargs)
@@ -257,6 +265,7 @@ class BigQueryCheckOperator(
         self.encryption_configuration = encryption_configuration
         self.deferrable = deferrable
         self.poll_interval = poll_interval
+        self.query_params = query_params
 
     def _submit_job(
         self,
@@ -265,6 +274,8 @@ class BigQueryCheckOperator(
     ) -> BigQueryJob:
         """Submit a new job and get the job id for polling the status using 
Trigger."""
         configuration = {"query": {"query": self.sql, "useLegacySql": 
self.use_legacy_sql}}
+        if self.query_params:
+            configuration["query"]["queryParameters"] = self.query_params
 
         self.include_encryption_configuration(configuration, "query")
 
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index febcfe4871..d49e75c950 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -24,7 +24,7 @@ from unittest.mock import ANY, MagicMock
 
 import pandas as pd
 import pytest
-from google.cloud.bigquery import DEFAULT_RETRY
+from google.cloud.bigquery import DEFAULT_RETRY, ScalarQueryParameter
 from google.cloud.exceptions import Conflict
 from openlineage.client.facet import ErrorMessageRunFacet, 
ExternalQueryRunFacet, SqlJobFacet
 from openlineage.client.run import Dataset
@@ -2293,6 +2293,35 @@ class TestBigQueryCheckOperator:
         mock_defer.assert_not_called()
         mock_validate_records.assert_called_once_with((1, 2, 3))
 
+    @pytest.mark.db_test
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator._validate_records")
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_check_operator_query_parameters_passing(
+        self, mock_hook, mock_validate_records, 
create_task_instance_of_operator
+    ):
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
+        query_params = [ScalarQueryParameter("test_param", "INT64", 1)]
+
+        mocked_job = MagicMock(job_id=real_job_id, error_result=False)
+        mocked_job.result.return_value = iter([(1, 2, 3)])  # mock rows 
generator
+        mock_hook.return_value.insert_job.return_value = mocked_job
+        mock_hook.return_value.insert_job.return_value.running.return_value = 
False
+
+        ti = create_task_instance_of_operator(
+            BigQueryCheckOperator,
+            dag_id="dag_id",
+            task_id="bq_check_operator_query_params_job",
+            sql="SELECT * FROM any WHERE test_param = @test_param",
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+            query_params=query_params,
+        )
+
+        ti.task.execute(MagicMock())
+        mock_validate_records.assert_called_once_with((1, 2, 3))
+
     @pytest.mark.db_test
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_bigquery_check_operator_async_finish_with_error_before_deferred(

Reply via email to