This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a03870d1c improvement: introduce proejct_id in 
BigQueryIntervalCheckOperator (#34573)
6a03870d1c is described below

commit 6a03870d1c1c5871dc9bcb8ea48039ec47676484
Author: Zhenye (Nathan) Na <[email protected]>
AuthorDate: Sun Sep 24 12:25:55 2023 -0700

    improvement: introduce proejct_id in BigQueryIntervalCheckOperator (#34573)
    
    * improvement: introduce proejct_id in BigQueryIntervalCheckOperator
    
    * Update bigquery.py
    
    ---------
    
    Co-authored-by: Hussein Awala <[email protected]>
---
 .../providers/google/cloud/operators/bigquery.py   |  5 +-
 .../google/cloud/operators/test_bigquery.py        | 74 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index f59dff18f5..a0e0d884a8 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -505,6 +505,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperat
     :param deferrable: Run operator in the deferrable mode
     :param poll_interval: (Deferrable mode only) polling period in seconds to 
check for the status of job.
         Defaults to 4 seconds.
+    :param project_id: a string represents the BigQuery projectId
     """
 
     template_fields: Sequence[str] = (
@@ -532,6 +533,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperat
         labels: dict | None = None,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: float = 4.0,
+        project_id: str | None = None,
         **kwargs,
     ) -> None:
         super().__init__(
@@ -547,6 +549,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperat
         self.location = location
         self.impersonation_chain = impersonation_chain
         self.labels = labels
+        self.project_id = project_id
         self.deferrable = deferrable
         self.poll_interval = poll_interval
 
@@ -560,7 +563,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, 
SQLIntervalCheckOperat
         configuration = {"query": {"query": sql, "useLegacySql": 
self.use_legacy_sql}}
         return hook.insert_job(
             configuration=configuration,
-            project_id=hook.project_id,
+            project_id=self.project_id or hook.project_id,
             location=self.location,
             job_id=job_id,
             nowait=True,
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index 55c3dd2dd7..2a2b6d1738 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -1768,6 +1768,80 @@ class TestBigQueryIntervalCheckOperator:
             exc.value.trigger, BigQueryIntervalCheckTrigger
         ), "Trigger is not a BigQueryIntervalCheckTrigger"
 
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_interval_check_operator_with_project_id(
+        self, mock_hook, create_task_instance_of_operator
+    ):
+        """
+        Test BigQueryIntervalCheckOperator with a specified project_id.
+        Ensure that the bq_project_id is passed correctly when submitting the 
job.
+        """
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
+
+        project_id = "test-project-id"
+        ti = create_task_instance_of_operator(
+            BigQueryIntervalCheckOperator,
+            dag_id="dag_id",
+            task_id="bq_interval_check_operator_with_project_id",
+            table="test_table",
+            metrics_thresholds={"COUNT(*)": 1.5},
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+            project_id=project_id,
+        )
+
+        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+
+        with pytest.raises(TaskDeferred):
+            ti.task.execute(MagicMock())
+
+        mock_hook.return_value.insert_job.assert_called_with(
+            configuration=mock.ANY,
+            project_id=project_id,
+            location=TEST_DATASET_LOCATION,
+            job_id=mock.ANY,
+            nowait=True,
+        )
+
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_bigquery_interval_check_operator_without_project_id(
+        self, mock_hook, create_task_instance_of_operator
+    ):
+        """
+        Test BigQueryIntervalCheckOperator without a specified project_id.
+        Ensure that the project_id falls back to the hook.project_id as 
previously implemented.
+        """
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
+
+        project_id = "test-project-id"
+        ti = create_task_instance_of_operator(
+            BigQueryIntervalCheckOperator,
+            dag_id="dag_id",
+            task_id="bq_interval_check_operator_without_project_id",
+            table="test_table",
+            metrics_thresholds={"COUNT(*)": 1.5},
+            location=TEST_DATASET_LOCATION,
+            deferrable=True,
+        )
+
+        mock_hook.return_value.project_id = project_id
+        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+
+        with pytest.raises(TaskDeferred):
+            ti.task.execute(MagicMock())
+
+        mock_hook.return_value.insert_job.assert_called_with(
+            configuration=mock.ANY,
+            project_id=mock_hook.return_value.project_id,
+            location=TEST_DATASET_LOCATION,
+            job_id=mock.ANY,
+            nowait=True,
+        )
+
 
 class TestBigQueryCheckOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator.execute")

Reply via email to