This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2a79fb74fd Fix `BIGQUERY_JOB_DETAILS_LINK_FMT` in
`BigQueryConsoleLink` (#31953)
2a79fb74fd is described below
commit 2a79fb74fd7203fe82b9384af42a59b3a41f84e9
Author: Beata Kossakowska <[email protected]>
AuthorDate: Tue Jun 20 15:57:08 2023 +0200
Fix `BIGQUERY_JOB_DETAILS_LINK_FMT` in `BigQueryConsoleLink` (#31953)
Co-authored-by: Beata Kossakowska <[email protected]>
---
.../providers/google/cloud/operators/bigquery.py | 19 ++++++---
airflow/providers/google/cloud/utils/bigquery.py | 17 ++++++++
.../endpoints/test_extra_link_endpoint.py | 4 +-
.../google/cloud/operators/test_bigquery.py | 46 +++++++++++++---------
4 files changed, 59 insertions(+), 27 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index 2fd6aacf5c..be7a8673ff 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -51,6 +51,7 @@ from airflow.providers.google.cloud.triggers.bigquery import (
BigQueryIntervalCheckTrigger,
BigQueryValueCheckTrigger,
)
+from airflow.providers.google.cloud.utils.bigquery import convert_job_id
if TYPE_CHECKING:
from google.cloud.bigquery import UnknownJob
@@ -90,8 +91,8 @@ class BigQueryConsoleLink(BaseOperatorLink):
*,
ti_key: TaskInstanceKey,
):
- job_id = XCom.get_value(key="job_id", ti_key=ti_key)
- return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id
else ""
+ job_id_path = XCom.get_value(key="job_id_path", ti_key=ti_key)
+ return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id_path) if
job_id_path else ""
@attr.s(auto_attribs=True)
@@ -110,7 +111,7 @@ class BigQueryConsoleIndexableLink(BaseOperatorLink):
*,
ti_key: TaskInstanceKey,
):
- job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
+ job_ids = XCom.get_value(key="job_id_path", ti_key=ti_key)
if not job_ids:
return None
if len(job_ids) < self.index:
@@ -1184,7 +1185,11 @@ class
BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
]
else:
raise AirflowException(f"argument 'sql' of type {type(str)} is
neither a string nor an iterable")
- context["task_instance"].xcom_push(key="job_id", value=job_id)
+ project_id = self.hook.project_id
+ if project_id:
+ job_id_path = convert_job_id(job_id=job_id, project_id=project_id,
location=self.location)
+ context["task_instance"].xcom_push(key="job_id_path",
value=job_id_path)
+ return job_id
def on_kill(self) -> None:
super().on_kill()
@@ -2727,9 +2732,11 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
persist_kwargs["dataset_id"] =
table["datasetId"]
persist_kwargs["project_id"] =
table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)
-
self.job_id = job.job_id
- context["ti"].xcom_push(key="job_id", value=self.job_id)
+ project_id = self.project_id or self.hook.project_id
+ if project_id:
+ job_id_path = convert_job_id(job_id=job_id, project_id=project_id,
location=self.location)
+ context["ti"].xcom_push(key="job_id_path", value=job_id_path)
# Wait for the job to complete
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
diff --git a/airflow/providers/google/cloud/utils/bigquery.py
b/airflow/providers/google/cloud/utils/bigquery.py
index 03753c2423..e96e10a770 100644
--- a/airflow/providers/google/cloud/utils/bigquery.py
+++ b/airflow/providers/google/cloud/utils/bigquery.py
@@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations
+from typing import Any
+
def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool |
str:
"""
@@ -34,3 +36,18 @@ def bq_cast(string_field: str, bq_type: str) -> None | int |
float | bool | str:
return string_field == "true"
else:
return string_field
+
+
+def convert_job_id(job_id: str | list[str], project_id: str, location: str |
None) -> Any:
+ """
+ Helper method that converts to path: project_id:location:job_id
+ :param project_id: Required. The ID of the Google Cloud project where
workspace located.
+ :param location: Optional. The ID of the Google Cloud region where
workspace located.
+ :param job_id: Required. The ID of the job.
+ :return: str or list[str] of project_id:location:job_id.
+ """
+ location = location if location else "US"
+ if isinstance(job_id, list):
+ return [f"{project_id}:{location}:{i}" for i in job_id]
+ else:
+ return f"{project_id}:{location}:{job_id}"
diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
index 53b5ba8c24..4e6ce29932 100644
--- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
+++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
@@ -142,7 +142,7 @@ class TestGetExtraLinks:
@mock_plugin_manager(plugins=[])
def test_should_respond_200(self):
XCom.set(
- key="job_id",
+ key="job_id_path",
value="TEST_JOB_ID",
task_id="TEST_SINGLE_QUERY",
dag_id=self.dag.dag_id,
@@ -171,7 +171,7 @@ class TestGetExtraLinks:
@mock_plugin_manager(plugins=[])
def test_should_respond_200_multiple_links(self):
XCom.set(
- key="job_id",
+ key="job_id_path",
value=["TEST_JOB_ID_1", "TEST_JOB_ID_2"],
task_id="TEST_MULTIPLE_QUERY",
dag_id=self.dag.dag_id,
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index 25b341a4c3..34c1a803cf 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -83,6 +83,10 @@ MATERIALIZED_VIEW_DEFINITION = {
}
TEST_TABLE = "test-table"
GCP_CONN_ID = "google_cloud_default"
+TEST_JOB_ID_1 = "test-job-id"
+TEST_JOB_ID_2 = "test-123"
+TEST_FULL_JOB_ID =
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
+TEST_FULL_JOB_ID_2 =
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"
class TestBigQueryCreateEmptyTableOperator:
@@ -673,10 +677,10 @@ class TestBigQueryOperator:
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0],
BigQueryConsoleLink)
- ti.xcom_push("job_id", 12345)
+ ti.xcom_push("job_id_path", TEST_FULL_JOB_ID)
url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
- assert url == "https://console.cloud.google.com/bigquery?j=12345"
+ assert url ==
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
@pytest.mark.need_serialized_dag
def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
@@ -711,17 +715,18 @@ class TestBigQueryOperator:
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0],
BigQueryConsoleIndexableLink)
- job_id = ["123", "45"]
- ti.xcom_push(key="job_id", value=job_id)
+ ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID,
TEST_FULL_JOB_ID_2])
assert {"BigQuery Console #1", "BigQuery Console #2"} ==
simple_task.operator_extra_link_dict.keys()
- assert "https://console.cloud.google.com/bigquery?j=123" ==
simple_task.get_extra_links(
- ti, "BigQuery Console #1"
+ assert (
+ f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
+ == simple_task.get_extra_links(ti, "BigQuery Console #1")
)
- assert "https://console.cloud.google.com/bigquery?j=45" ==
simple_task.get_extra_links(
- ti, "BigQuery Console #2"
+ assert (
+ f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
+ == simple_task.get_extra_links(ti, "BigQuery Console #2")
)
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -740,7 +745,9 @@ class TestBigQueryOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_operator_extra_link_when_single_query(
- self, mock_hook, create_task_instance_of_operator
+ self,
+ mock_hook,
+ create_task_instance_of_operator,
):
ti = create_task_instance_of_operator(
BigQueryExecuteQueryOperator,
@@ -751,11 +758,11 @@ class TestBigQueryOperator:
)
bigquery_task = ti.task
- job_id = "12345"
- ti.xcom_push(key="job_id", value=job_id)
+ ti.xcom_push(key="job_id_path", value=TEST_FULL_JOB_ID)
- assert f"https://console.cloud.google.com/bigquery?j={job_id}" ==
bigquery_task.get_extra_links(
- ti, BigQueryConsoleLink.name
+ assert (
+ f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
+ == bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
)
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -771,17 +778,18 @@ class TestBigQueryOperator:
)
bigquery_task = ti.task
- job_id = ["123", "45"]
- ti.xcom_push(key="job_id", value=job_id)
+ ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID,
TEST_FULL_JOB_ID_2])
assert {"BigQuery Console #1", "BigQuery Console #2"} ==
bigquery_task.operator_extra_link_dict.keys()
- assert "https://console.cloud.google.com/bigquery?j=123" ==
bigquery_task.get_extra_links(
- ti, "BigQuery Console #1"
+ assert (
+ f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
+ == bigquery_task.get_extra_links(ti, "BigQuery Console #1")
)
- assert "https://console.cloud.google.com/bigquery?j=45" ==
bigquery_task.get_extra_links(
- ti, "BigQuery Console #2"
+ assert (
+ f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
+ == bigquery_task.get_extra_links(ti, "BigQuery Console #2")
)