This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e6960f1ad6 Revert "Fix `BIGQUERY_JOB_DETAILS_LINK_FMT` in
`BigQueryConsoleLink` (#31457)" (#31935)
e6960f1ad6 is described below
commit e6960f1ad63f40ff4ccde6c86b17e051b302c104
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Jun 15 16:00:06 2023 +0200
Revert "Fix `BIGQUERY_JOB_DETAILS_LINK_FMT` in `BigQueryConsoleLink`
(#31457)" (#31935)
This reverts commit c7072c0490cb80b448622a27eb1056576d6b92a4.
---
.../providers/google/cloud/operators/bigquery.py | 38 +++---------
.../google/cloud/operators/test_bigquery.py | 69 +++++++---------------
2 files changed, 30 insertions(+), 77 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index 6fb94f21d7..2fd6aacf5c 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -58,7 +58,7 @@ if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
-BIGQUERY_JOB_DETAILS_LINK_FMT =
"https://console.cloud.google.com/bigquery?j={project_id}:{location}:{job_id}"
+BIGQUERY_JOB_DETAILS_LINK_FMT =
"https://console.cloud.google.com/bigquery?j={job_id}"
class BigQueryUIColors(enum.Enum):
@@ -90,17 +90,8 @@ class BigQueryConsoleLink(BaseOperatorLink):
*,
ti_key: TaskInstanceKey,
):
- job_id_params = XCom.get_value(key="job_id_params", ti_key=ti_key)
-
- return (
- BIGQUERY_JOB_DETAILS_LINK_FMT.format(
- job_id=job_id_params["job_id"],
- project_id=job_id_params["project_id"],
- location=job_id_params["location"],
- )
- if job_id_params
- else ""
- )
+ job_id = XCom.get_value(key="job_id", ti_key=ti_key)
+ return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id
else ""
@attr.s(auto_attribs=True)
@@ -119,16 +110,13 @@ class BigQueryConsoleIndexableLink(BaseOperatorLink):
*,
ti_key: TaskInstanceKey,
):
- job_ids_params = XCom.get_value(key="job_id_params", ti_key=ti_key)
- job_ids = job_ids_params["job_id"]
+ job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
if not job_ids:
return None
if len(job_ids) < self.index:
return None
job_id = job_ids[self.index]
- return BIGQUERY_JOB_DETAILS_LINK_FMT.format(
- job_id=job_id, project_id=job_ids_params["project_id"],
location=job_ids_params["location"]
- )
+ return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
class _BigQueryDbHookMixin:
@@ -1196,13 +1184,7 @@ class
BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
]
else:
raise AirflowException(f"argument 'sql' of type {type(str)} is
neither a string nor an iterable")
- job_id_params = {
- "job_id": job_id,
- "project_id": self.hook.project_id,
- "location": self.location if self.location else "US",
- }
- context["task_instance"].xcom_push(key="job_id_params",
value=job_id_params)
- return job_id
+ context["task_instance"].xcom_push(key="job_id", value=job_id)
def on_kill(self) -> None:
super().on_kill()
@@ -2745,13 +2727,9 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator):
persist_kwargs["dataset_id"] =
table["datasetId"]
persist_kwargs["project_id"] =
table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)
+
self.job_id = job.job_id
- job_id_params = {
- "job_id": job_id,
- "project_id": self.project_id or self.hook.project_id,
- "location": self.location if self.location else "US",
- }
- context["ti"].xcom_push(key="job_id_params", value=job_id_params)
+ context["ti"].xcom_push(key="job_id", value=self.job_id)
# Wait for the job to complete
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index 367ef99cbb..25b341a4c3 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -83,10 +83,6 @@ MATERIALIZED_VIEW_DEFINITION = {
}
TEST_TABLE = "test-table"
GCP_CONN_ID = "google_cloud_default"
-TEST_JOB_ID_1 = "test-job-id"
-TEST_JOB_ID_2 = "test-123"
-TEST_FULL_JOB_ID =
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
-TEST_FULL_JOB_ID_2 =
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"
class TestBigQueryCreateEmptyTableOperator:
@@ -676,15 +672,11 @@ class TestBigQueryOperator:
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0],
BigQueryConsoleLink)
- test_job_id_params = {
- "job_id": TEST_JOB_ID_1,
- "project_id": TEST_GCP_PROJECT_ID,
- "location": TEST_DATASET_LOCATION,
- }
- ti.xcom_push("job_id_params", test_job_id_params)
+
+ ti.xcom_push("job_id", 12345)
url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
- assert url ==
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
+ assert url == "https://console.cloud.google.com/bigquery?j=12345"
@pytest.mark.need_serialized_dag
def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
@@ -719,23 +711,17 @@ class TestBigQueryOperator:
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0],
BigQueryConsoleIndexableLink)
- test_job_id_params = {
- "job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
- "project_id": TEST_GCP_PROJECT_ID,
- "location": TEST_DATASET_LOCATION,
- }
- ti.xcom_push(key="job_id_params", value=test_job_id_params)
+ job_id = ["123", "45"]
+ ti.xcom_push(key="job_id", value=job_id)
assert {"BigQuery Console #1", "BigQuery Console #2"} ==
simple_task.operator_extra_link_dict.keys()
- assert (
- f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
- == simple_task.get_extra_links(ti, "BigQuery Console #1")
+ assert "https://console.cloud.google.com/bigquery?j=123" ==
simple_task.get_extra_links(
+ ti, "BigQuery Console #1"
)
- assert (
- f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
- == simple_task.get_extra_links(ti, "BigQuery Console #2")
+ assert "https://console.cloud.google.com/bigquery?j=45" ==
simple_task.get_extra_links(
+ ti, "BigQuery Console #2"
)
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -754,9 +740,7 @@ class TestBigQueryOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_operator_extra_link_when_single_query(
- self,
- mock_hook,
- create_task_instance_of_operator,
+ self, mock_hook, create_task_instance_of_operator
):
ti = create_task_instance_of_operator(
BigQueryExecuteQueryOperator,
@@ -767,15 +751,11 @@ class TestBigQueryOperator:
)
bigquery_task = ti.task
- test_job_id_params = {
- "job_id": TEST_JOB_ID_1,
- "project_id": TEST_GCP_PROJECT_ID,
- "location": TEST_DATASET_LOCATION,
- }
- ti.xcom_push(key="job_id_params", value=test_job_id_params)
- assert (
- f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
- == bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
+ job_id = "12345"
+ ti.xcom_push(key="job_id", value=job_id)
+
+ assert f"https://console.cloud.google.com/bigquery?j={job_id}" ==
bigquery_task.get_extra_links(
+ ti, BigQueryConsoleLink.name
)
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -791,22 +771,17 @@ class TestBigQueryOperator:
)
bigquery_task = ti.task
- test_job_id_params = {
- "job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
- "project_id": TEST_GCP_PROJECT_ID,
- "location": TEST_DATASET_LOCATION,
- }
- ti.xcom_push(key="job_id_params", value=test_job_id_params)
+ job_id = ["123", "45"]
+ ti.xcom_push(key="job_id", value=job_id)
+
assert {"BigQuery Console #1", "BigQuery Console #2"} ==
bigquery_task.operator_extra_link_dict.keys()
- assert (
- f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
- == bigquery_task.get_extra_links(ti, "BigQuery Console #1")
+ assert "https://console.cloud.google.com/bigquery?j=123" ==
bigquery_task.get_extra_links(
+ ti, "BigQuery Console #1"
)
- assert (
- f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
- == bigquery_task.get_extra_links(ti, "BigQuery Console #2")
+ assert "https://console.cloud.google.com/bigquery?j=45" ==
bigquery_task.get_extra_links(
+ ti, "BigQuery Console #2"
)