This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a79fb74fd Fix `BIGQUERY_JOB_DETAILS_LINK_FMT` in 
`BigQueryConsoleLink` (#31953)
2a79fb74fd is described below

commit 2a79fb74fd7203fe82b9384af42a59b3a41f84e9
Author: Beata Kossakowska <[email protected]>
AuthorDate: Tue Jun 20 15:57:08 2023 +0200

    Fix `BIGQUERY_JOB_DETAILS_LINK_FMT` in `BigQueryConsoleLink` (#31953)
    
    Co-authored-by: Beata Kossakowska <[email protected]>
---
 .../providers/google/cloud/operators/bigquery.py   | 19 ++++++---
 airflow/providers/google/cloud/utils/bigquery.py   | 17 ++++++++
 .../endpoints/test_extra_link_endpoint.py          |  4 +-
 .../google/cloud/operators/test_bigquery.py        | 46 +++++++++++++---------
 4 files changed, 59 insertions(+), 27 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 2fd6aacf5c..be7a8673ff 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -51,6 +51,7 @@ from airflow.providers.google.cloud.triggers.bigquery import (
     BigQueryIntervalCheckTrigger,
     BigQueryValueCheckTrigger,
 )
+from airflow.providers.google.cloud.utils.bigquery import convert_job_id
 
 if TYPE_CHECKING:
     from google.cloud.bigquery import UnknownJob
@@ -90,8 +91,8 @@ class BigQueryConsoleLink(BaseOperatorLink):
         *,
         ti_key: TaskInstanceKey,
     ):
-        job_id = XCom.get_value(key="job_id", ti_key=ti_key)
-        return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id 
else ""
+        job_id_path = XCom.get_value(key="job_id_path", ti_key=ti_key)
+        return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id_path) if 
job_id_path else ""
 
 
 @attr.s(auto_attribs=True)
@@ -110,7 +111,7 @@ class BigQueryConsoleIndexableLink(BaseOperatorLink):
         *,
         ti_key: TaskInstanceKey,
     ):
-        job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
+        job_ids = XCom.get_value(key="job_id_path", ti_key=ti_key)
         if not job_ids:
             return None
         if len(job_ids) < self.index:
@@ -1184,7 +1185,11 @@ class 
BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
             ]
         else:
             raise AirflowException(f"argument 'sql' of type {type(str)} is 
neither a string nor an iterable")
-        context["task_instance"].xcom_push(key="job_id", value=job_id)
+        project_id = self.hook.project_id
+        if project_id:
+            job_id_path = convert_job_id(job_id=job_id, project_id=project_id, 
location=self.location)
+            context["task_instance"].xcom_push(key="job_id_path", 
value=job_id_path)
+        return job_id
 
     def on_kill(self) -> None:
         super().on_kill()
@@ -2727,9 +2732,11 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
                                 persist_kwargs["dataset_id"] = 
table["datasetId"]
                                 persist_kwargs["project_id"] = 
table["projectId"]
                             BigQueryTableLink.persist(**persist_kwargs)
-
         self.job_id = job.job_id
-        context["ti"].xcom_push(key="job_id", value=self.job_id)
+        project_id = self.project_id or self.hook.project_id
+        if project_id:
+            job_id_path = convert_job_id(job_id=job_id, project_id=project_id, 
location=self.location)
+            context["ti"].xcom_push(key="job_id_path", value=job_id_path)
         # Wait for the job to complete
         if not self.deferrable:
             job.result(timeout=self.result_timeout, retry=self.result_retry)
diff --git a/airflow/providers/google/cloud/utils/bigquery.py 
b/airflow/providers/google/cloud/utils/bigquery.py
index 03753c2423..e96e10a770 100644
--- a/airflow/providers/google/cloud/utils/bigquery.py
+++ b/airflow/providers/google/cloud/utils/bigquery.py
@@ -16,6 +16,8 @@
 # under the License.
 from __future__ import annotations
 
+from typing import Any
+
 
 def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | 
str:
     """
@@ -34,3 +36,18 @@ def bq_cast(string_field: str, bq_type: str) -> None | int | 
float | bool | str:
         return string_field == "true"
     else:
         return string_field
+
+
+def convert_job_id(job_id: str | list[str], project_id: str, location: str | 
None) -> Any:
+    """
+    Helper method that converts to path: project_id:location:job_id
+    :param project_id: Required. The ID of the Google Cloud project where 
workspace located.
+    :param location: Optional. The ID of the Google Cloud region where 
workspace located.
+    :param job_id: Required. The ID of the job.
+    :return: str or list[str] of project_id:location:job_id.
+    """
+    location = location if location else "US"
+    if isinstance(job_id, list):
+        return [f"{project_id}:{location}:{i}" for i in job_id]
+    else:
+        return f"{project_id}:{location}:{job_id}"
diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py 
b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
index 53b5ba8c24..4e6ce29932 100644
--- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
+++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
@@ -142,7 +142,7 @@ class TestGetExtraLinks:
     @mock_plugin_manager(plugins=[])
     def test_should_respond_200(self):
         XCom.set(
-            key="job_id",
+            key="job_id_path",
             value="TEST_JOB_ID",
             task_id="TEST_SINGLE_QUERY",
             dag_id=self.dag.dag_id,
@@ -171,7 +171,7 @@ class TestGetExtraLinks:
     @mock_plugin_manager(plugins=[])
     def test_should_respond_200_multiple_links(self):
         XCom.set(
-            key="job_id",
+            key="job_id_path",
             value=["TEST_JOB_ID_1", "TEST_JOB_ID_2"],
             task_id="TEST_MULTIPLE_QUERY",
             dag_id=self.dag.dag_id,
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index 25b341a4c3..34c1a803cf 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -83,6 +83,10 @@ MATERIALIZED_VIEW_DEFINITION = {
 }
 TEST_TABLE = "test-table"
 GCP_CONN_ID = "google_cloud_default"
+TEST_JOB_ID_1 = "test-job-id"
+TEST_JOB_ID_2 = "test-123"
+TEST_FULL_JOB_ID = 
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
+TEST_FULL_JOB_ID_2 = 
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"
 
 
 class TestBigQueryCreateEmptyTableOperator:
@@ -673,10 +677,10 @@ class TestBigQueryOperator:
         # Check DeSerialized version of operator link
         assert isinstance(list(simple_task.operator_extra_links)[0], 
BigQueryConsoleLink)
 
-        ti.xcom_push("job_id", 12345)
+        ti.xcom_push("job_id_path", TEST_FULL_JOB_ID)
 
         url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
-        assert url == "https://console.cloud.google.com/bigquery?j=12345";
+        assert url == 
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}";
 
     @pytest.mark.need_serialized_dag
     def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
@@ -711,17 +715,18 @@ class TestBigQueryOperator:
         # Check DeSerialized version of operator link
         assert isinstance(list(simple_task.operator_extra_links)[0], 
BigQueryConsoleIndexableLink)
 
-        job_id = ["123", "45"]
-        ti.xcom_push(key="job_id", value=job_id)
+        ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID, 
TEST_FULL_JOB_ID_2])
 
         assert {"BigQuery Console #1", "BigQuery Console #2"} == 
simple_task.operator_extra_link_dict.keys()
 
-        assert "https://console.cloud.google.com/bigquery?j=123"; == 
simple_task.get_extra_links(
-            ti, "BigQuery Console #1"
+        assert (
+            f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}";
+            == simple_task.get_extra_links(ti, "BigQuery Console #1")
         )
 
-        assert "https://console.cloud.google.com/bigquery?j=45"; == 
simple_task.get_extra_links(
-            ti, "BigQuery Console #2"
+        assert (
+            f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}";
+            == simple_task.get_extra_links(ti, "BigQuery Console #2")
         )
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -740,7 +745,9 @@ class TestBigQueryOperator:
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_bigquery_operator_extra_link_when_single_query(
-        self, mock_hook, create_task_instance_of_operator
+        self,
+        mock_hook,
+        create_task_instance_of_operator,
     ):
         ti = create_task_instance_of_operator(
             BigQueryExecuteQueryOperator,
@@ -751,11 +758,11 @@ class TestBigQueryOperator:
         )
         bigquery_task = ti.task
 
-        job_id = "12345"
-        ti.xcom_push(key="job_id", value=job_id)
+        ti.xcom_push(key="job_id_path", value=TEST_FULL_JOB_ID)
 
-        assert f"https://console.cloud.google.com/bigquery?j={job_id}"; == 
bigquery_task.get_extra_links(
-            ti, BigQueryConsoleLink.name
+        assert (
+            f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}";
+            == bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
         )
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -771,17 +778,18 @@ class TestBigQueryOperator:
         )
         bigquery_task = ti.task
 
-        job_id = ["123", "45"]
-        ti.xcom_push(key="job_id", value=job_id)
+        ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID, 
TEST_FULL_JOB_ID_2])
 
         assert {"BigQuery Console #1", "BigQuery Console #2"} == 
bigquery_task.operator_extra_link_dict.keys()
 
-        assert "https://console.cloud.google.com/bigquery?j=123"; == 
bigquery_task.get_extra_links(
-            ti, "BigQuery Console #1"
+        assert (
+            f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}";
+            == bigquery_task.get_extra_links(ti, "BigQuery Console #1")
         )
 
-        assert "https://console.cloud.google.com/bigquery?j=45"; == 
bigquery_task.get_extra_links(
-            ti, "BigQuery Console #2"
+        assert (
+            f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}";
+            == bigquery_task.get_extra_links(ti, "BigQuery Console #2")
         )
 
 

Reply via email to