This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e6960f1ad6 Revert "Fix `BIGQUERY_JOB_DETAILS_LINK_FMT` in 
`BigQueryConsoleLink` (#31457)" (#31935)
e6960f1ad6 is described below

commit e6960f1ad63f40ff4ccde6c86b17e051b302c104
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Jun 15 16:00:06 2023 +0200

    Revert "Fix `BIGQUERY_JOB_DETAILS_LINK_FMT` in `BigQueryConsoleLink` 
(#31457)" (#31935)
    
    This reverts commit c7072c0490cb80b448622a27eb1056576d6b92a4.
---
 .../providers/google/cloud/operators/bigquery.py   | 38 +++---------
 .../google/cloud/operators/test_bigquery.py        | 69 +++++++---------------
 2 files changed, 30 insertions(+), 77 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 6fb94f21d7..2fd6aacf5c 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -58,7 +58,7 @@ if TYPE_CHECKING:
     from airflow.models.taskinstancekey import TaskInstanceKey
     from airflow.utils.context import Context
 
-BIGQUERY_JOB_DETAILS_LINK_FMT = 
"https://console.cloud.google.com/bigquery?j={project_id}:{location}:{job_id}";
+BIGQUERY_JOB_DETAILS_LINK_FMT = 
"https://console.cloud.google.com/bigquery?j={job_id}";
 
 
 class BigQueryUIColors(enum.Enum):
@@ -90,17 +90,8 @@ class BigQueryConsoleLink(BaseOperatorLink):
         *,
         ti_key: TaskInstanceKey,
     ):
-        job_id_params = XCom.get_value(key="job_id_params", ti_key=ti_key)
-
-        return (
-            BIGQUERY_JOB_DETAILS_LINK_FMT.format(
-                job_id=job_id_params["job_id"],
-                project_id=job_id_params["project_id"],
-                location=job_id_params["location"],
-            )
-            if job_id_params
-            else ""
-        )
+        job_id = XCom.get_value(key="job_id", ti_key=ti_key)
+        return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id 
else ""
 
 
 @attr.s(auto_attribs=True)
@@ -119,16 +110,13 @@ class BigQueryConsoleIndexableLink(BaseOperatorLink):
         *,
         ti_key: TaskInstanceKey,
     ):
-        job_ids_params = XCom.get_value(key="job_id_params", ti_key=ti_key)
-        job_ids = job_ids_params["job_id"]
+        job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
         if not job_ids:
             return None
         if len(job_ids) < self.index:
             return None
         job_id = job_ids[self.index]
-        return BIGQUERY_JOB_DETAILS_LINK_FMT.format(
-            job_id=job_id, project_id=job_ids_params["project_id"], 
location=job_ids_params["location"]
-        )
+        return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
 
 
 class _BigQueryDbHookMixin:
@@ -1196,13 +1184,7 @@ class 
BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
             ]
         else:
             raise AirflowException(f"argument 'sql' of type {type(str)} is 
neither a string nor an iterable")
-        job_id_params = {
-            "job_id": job_id,
-            "project_id": self.hook.project_id,
-            "location": self.location if self.location else "US",
-        }
-        context["task_instance"].xcom_push(key="job_id_params", 
value=job_id_params)
-        return job_id
+        context["task_instance"].xcom_push(key="job_id", value=job_id)
 
     def on_kill(self) -> None:
         super().on_kill()
@@ -2745,13 +2727,9 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
                                 persist_kwargs["dataset_id"] = 
table["datasetId"]
                                 persist_kwargs["project_id"] = 
table["projectId"]
                             BigQueryTableLink.persist(**persist_kwargs)
+
         self.job_id = job.job_id
-        job_id_params = {
-            "job_id": job_id,
-            "project_id": self.project_id or self.hook.project_id,
-            "location": self.location if self.location else "US",
-        }
-        context["ti"].xcom_push(key="job_id_params", value=job_id_params)
+        context["ti"].xcom_push(key="job_id", value=self.job_id)
         # Wait for the job to complete
         if not self.deferrable:
             job.result(timeout=self.result_timeout, retry=self.result_retry)
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index 367ef99cbb..25b341a4c3 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -83,10 +83,6 @@ MATERIALIZED_VIEW_DEFINITION = {
 }
 TEST_TABLE = "test-table"
 GCP_CONN_ID = "google_cloud_default"
-TEST_JOB_ID_1 = "test-job-id"
-TEST_JOB_ID_2 = "test-123"
-TEST_FULL_JOB_ID = 
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
-TEST_FULL_JOB_ID_2 = 
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"
 
 
 class TestBigQueryCreateEmptyTableOperator:
@@ -676,15 +672,11 @@ class TestBigQueryOperator:
 
         # Check DeSerialized version of operator link
         assert isinstance(list(simple_task.operator_extra_links)[0], 
BigQueryConsoleLink)
-        test_job_id_params = {
-            "job_id": TEST_JOB_ID_1,
-            "project_id": TEST_GCP_PROJECT_ID,
-            "location": TEST_DATASET_LOCATION,
-        }
-        ti.xcom_push("job_id_params", test_job_id_params)
+
+        ti.xcom_push("job_id", 12345)
 
         url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
-        assert url == 
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}";
+        assert url == "https://console.cloud.google.com/bigquery?j=12345";
 
     @pytest.mark.need_serialized_dag
     def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
@@ -719,23 +711,17 @@ class TestBigQueryOperator:
         # Check DeSerialized version of operator link
         assert isinstance(list(simple_task.operator_extra_links)[0], 
BigQueryConsoleIndexableLink)
 
-        test_job_id_params = {
-            "job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
-            "project_id": TEST_GCP_PROJECT_ID,
-            "location": TEST_DATASET_LOCATION,
-        }
-        ti.xcom_push(key="job_id_params", value=test_job_id_params)
+        job_id = ["123", "45"]
+        ti.xcom_push(key="job_id", value=job_id)
 
         assert {"BigQuery Console #1", "BigQuery Console #2"} == 
simple_task.operator_extra_link_dict.keys()
 
-        assert (
-            f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}";
-            == simple_task.get_extra_links(ti, "BigQuery Console #1")
+        assert "https://console.cloud.google.com/bigquery?j=123"; == 
simple_task.get_extra_links(
+            ti, "BigQuery Console #1"
         )
 
-        assert (
-            f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}";
-            == simple_task.get_extra_links(ti, "BigQuery Console #2")
+        assert "https://console.cloud.google.com/bigquery?j=45"; == 
simple_task.get_extra_links(
+            ti, "BigQuery Console #2"
         )
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -754,9 +740,7 @@ class TestBigQueryOperator:
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_bigquery_operator_extra_link_when_single_query(
-        self,
-        mock_hook,
-        create_task_instance_of_operator,
+        self, mock_hook, create_task_instance_of_operator
     ):
         ti = create_task_instance_of_operator(
             BigQueryExecuteQueryOperator,
@@ -767,15 +751,11 @@ class TestBigQueryOperator:
         )
         bigquery_task = ti.task
 
-        test_job_id_params = {
-            "job_id": TEST_JOB_ID_1,
-            "project_id": TEST_GCP_PROJECT_ID,
-            "location": TEST_DATASET_LOCATION,
-        }
-        ti.xcom_push(key="job_id_params", value=test_job_id_params)
-        assert (
-            f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}";
-            == bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
+        job_id = "12345"
+        ti.xcom_push(key="job_id", value=job_id)
+
+        assert f"https://console.cloud.google.com/bigquery?j={job_id}"; == 
bigquery_task.get_extra_links(
+            ti, BigQueryConsoleLink.name
         )
 
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -791,22 +771,17 @@ class TestBigQueryOperator:
         )
         bigquery_task = ti.task
 
-        test_job_id_params = {
-            "job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
-            "project_id": TEST_GCP_PROJECT_ID,
-            "location": TEST_DATASET_LOCATION,
-        }
-        ti.xcom_push(key="job_id_params", value=test_job_id_params)
+        job_id = ["123", "45"]
+        ti.xcom_push(key="job_id", value=job_id)
+
         assert {"BigQuery Console #1", "BigQuery Console #2"} == 
bigquery_task.operator_extra_link_dict.keys()
 
-        assert (
-            f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}";
-            == bigquery_task.get_extra_links(ti, "BigQuery Console #1")
+        assert "https://console.cloud.google.com/bigquery?j=123"; == 
bigquery_task.get_extra_links(
+            ti, "BigQuery Console #1"
         )
 
-        assert (
-            f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}";
-            == bigquery_task.get_extra_links(ti, "BigQuery Console #2")
+        assert "https://console.cloud.google.com/bigquery?j=45"; == 
bigquery_task.get_extra_links(
+            ti, "BigQuery Console #2"
         )
 
 

Reply via email to