This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new def1f89e70 Fix location on cloud build operators (#29937)
def1f89e70 is described below
commit def1f89e702d401f67a94f34a01f6a4806ea92e6
Author: tnk-ysk <[email protected]>
AuthorDate: Fri Mar 10 21:09:43 2023 +0900
Fix location on cloud build operators (#29937)
---
.../providers/google/cloud/hooks/cloud_build.py | 89 ++++++++++++++--------
.../providers/google/cloud/links/cloud_build.py | 18 ++++-
.../google/cloud/operators/cloud_build.py | 86 ++++++++++++++++-----
.../providers/google/cloud/triggers/cloud_build.py | 5 ++
.../google/cloud/hooks/test_cloud_build.py | 10 ++-
.../google/cloud/operators/test_cloud_build.py | 41 +++++++---
.../google/cloud/triggers/test_cloud_build.py | 13 +++-
7 files changed, 192 insertions(+), 70 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/cloud_build.py
b/airflow/providers/google/cloud/hooks/cloud_build.py
index c8005f259a..6876faaef9 100644
--- a/airflow/providers/google/cloud/hooks/cloud_build.py
+++ b/airflow/providers/google/cloud/hooks/cloud_build.py
@@ -21,6 +21,7 @@ from __future__ import annotations
import warnings
from typing import Sequence
+from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import AlreadyExists
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.operation import Operation
@@ -67,7 +68,7 @@ class CloudBuildHook(GoogleBaseHook):
super().__init__(
gcp_conn_id=gcp_conn_id, delegate_to=delegate_to,
impersonation_chain=impersonation_chain
)
- self._client: CloudBuildClient | None = None
+ self._client: dict[str, CloudBuildClient] = {}
def _get_build_id_from_operation(self, operation: Operation) -> str:
"""
@@ -91,15 +92,24 @@ class CloudBuildHook(GoogleBaseHook):
error = operation.exception(timeout=timeout)
raise AirflowException(error)
- def get_conn(self) -> CloudBuildClient:
+ def get_conn(self, location: str = "global") -> CloudBuildClient:
"""
Retrieves the connection to Google Cloud Build.
+ :param location: The location of the project.
+
:return: Google Cloud Build client object.
"""
- if not self._client:
- self._client =
CloudBuildClient(credentials=self.get_credentials(), client_info=CLIENT_INFO)
- return self._client
+ if location not in self._client:
+ client_options = None
+ if location != "global":
+ client_options =
ClientOptions(api_endpoint=f"{location}-cloudbuild.googleapis.com:443")
+ self._client[location] = CloudBuildClient(
+ credentials=self.get_credentials(),
+ client_info=CLIENT_INFO,
+ client_options=client_options,
+ )
+ return self._client[location]
@GoogleBaseHook.fallback_to_default_project_id
def cancel_build(
@@ -109,6 +119,7 @@ class CloudBuildHook(GoogleBaseHook):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
+ location: str = "global",
) -> Build:
"""
Cancels a build in progress.
@@ -121,9 +132,9 @@ class CloudBuildHook(GoogleBaseHook):
:param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
Note that if `retry` is specified, the timeout applies to each
individual attempt.
:param metadata: Optional, additional metadata that is provided to the
method.
-
+ :param location: The location of the project.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
self.log.info("Start cancelling build: %s.", id_)
@@ -145,6 +156,7 @@ class CloudBuildHook(GoogleBaseHook):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
+ location: str = "global",
) -> tuple[Operation, str]:
"""
Starts a build with the specified configuration without waiting for it
to finish.
@@ -158,13 +170,16 @@ class CloudBuildHook(GoogleBaseHook):
:param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
Note that if `retry` is specified, the timeout applies to each
individual attempt.
:param metadata: Optional, additional metadata that is provided to the
method.
+ :param location: The location of the project.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
+
+ parent = f"projects/{project_id}/locations/{location}"
self.log.info("Start creating build...")
operation = client.create_build(
- request={"project_id": project_id, "build": build},
+ request={"parent": parent, "project_id": project_id, "build":
build},
retry=retry,
timeout=timeout,
metadata=metadata,
@@ -227,6 +242,7 @@ class CloudBuildHook(GoogleBaseHook):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
+ location: str = "global",
) -> BuildTrigger:
"""
Creates a new BuildTrigger.
@@ -240,9 +256,9 @@ class CloudBuildHook(GoogleBaseHook):
:param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
Note that if `retry` is specified, the timeout applies to each
individual attempt.
:param metadata: Optional, additional metadata that is provided to the
method.
-
+ :param location: The location of the project.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
self.log.info("Start creating build trigger...")
@@ -268,6 +284,7 @@ class CloudBuildHook(GoogleBaseHook):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
+ location: str = "global",
) -> None:
"""
Deletes a BuildTrigger by its project ID and trigger ID.
@@ -280,8 +297,9 @@ class CloudBuildHook(GoogleBaseHook):
:param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
Note that if `retry` is specified, the timeout applies to each
individual attempt.
:param metadata: Optional, additional metadata that is provided to the
method.
+ :param location: The location of the project.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
self.log.info("Start deleting build trigger: %s.", trigger_id)
@@ -302,6 +320,7 @@ class CloudBuildHook(GoogleBaseHook):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
+ location: str = "global",
) -> Build:
"""
Returns information about a previously requested build.
@@ -314,9 +333,9 @@ class CloudBuildHook(GoogleBaseHook):
:param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
Note that if `retry` is specified, the timeout applies to each
individual attempt.
:param metadata: Optional, additional metadata that is provided to the
method.
-
+ :param location: The location of the project.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
self.log.info("Start retrieving build: %s.", id_)
@@ -339,6 +358,7 @@ class CloudBuildHook(GoogleBaseHook):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
+ location: str = "global",
) -> BuildTrigger:
"""
Returns information about a BuildTrigger.
@@ -351,9 +371,9 @@ class CloudBuildHook(GoogleBaseHook):
:param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
Note that if `retry` is specified, the timeout applies to each
individual attempt.
:param metadata: Optional, additional metadata that is provided to the
method.
-
+ :param location: The location of the project.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
self.log.info("Start retrieving build trigger: %s.", trigger_id)
@@ -371,7 +391,7 @@ class CloudBuildHook(GoogleBaseHook):
@GoogleBaseHook.fallback_to_default_project_id
def list_build_triggers(
self,
- location: str,
+ location: str = "global",
project_id: str = PROVIDE_PROJECT_ID,
page_size: int | None = None,
page_token: str | None = None,
@@ -394,7 +414,7 @@ class CloudBuildHook(GoogleBaseHook):
:param metadata: Optional, additional metadata that is provided to the
method.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
parent = f"projects/{project_id}/locations/{location}"
@@ -419,7 +439,7 @@ class CloudBuildHook(GoogleBaseHook):
@GoogleBaseHook.fallback_to_default_project_id
def list_builds(
self,
- location: str,
+ location: str = "global",
project_id: str = PROVIDE_PROJECT_ID,
page_size: int | None = None,
page_token: int | None = None,
@@ -444,7 +464,7 @@ class CloudBuildHook(GoogleBaseHook):
:param metadata: Optional, additional metadata that is provided to the
method.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
parent = f"projects/{project_id}/locations/{location}"
@@ -476,6 +496,7 @@ class CloudBuildHook(GoogleBaseHook):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
+ location: str = "global",
) -> Build:
"""
Creates a new build based on the specified build. This method creates
a new build
@@ -490,9 +511,9 @@ class CloudBuildHook(GoogleBaseHook):
:param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
Note that if `retry` is specified, the timeout applies to each
individual attempt.
:param metadata: Optional, additional metadata that is provided to the
method.
-
+ :param location: The location of the project.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
self.log.info("Start retrying build: %s.", id_)
@@ -506,13 +527,13 @@ class CloudBuildHook(GoogleBaseHook):
id_ = self._get_build_id_from_operation(operation)
if not wait:
- return self.get_build(id_=id_, project_id=project_id)
+ return self.get_build(id_=id_, project_id=project_id,
location=location)
operation.result()
self.log.info("Build has been retried: %s.", id_)
- return self.get_build(id_=id_, project_id=project_id)
+ return self.get_build(id_=id_, project_id=project_id,
location=location)
@GoogleBaseHook.fallback_to_default_project_id
def run_build_trigger(
@@ -524,6 +545,7 @@ class CloudBuildHook(GoogleBaseHook):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
+ location: str = "global",
) -> Build:
"""
Runs a BuildTrigger at a particular source revision.
@@ -539,9 +561,9 @@ class CloudBuildHook(GoogleBaseHook):
:param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
Note that if `retry` is specified, the timeout applies to each
individual attempt.
:param metadata: Optional, additional metadata that is provided to the
method.
-
+ :param location: The location of the project.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
self.log.info("Start running build trigger: %s.", trigger_id)
operation = client.run_build_trigger(
@@ -554,12 +576,12 @@ class CloudBuildHook(GoogleBaseHook):
id_ = self._get_build_id_from_operation(operation)
if not wait:
- return self.get_build(id_=id_, project_id=project_id)
+ return self.get_build(id_=id_, project_id=project_id,
location=location)
operation.result()
self.log.info("Build trigger has been run: %s.", trigger_id)
- return self.get_build(id_=id_, project_id=project_id)
+ return self.get_build(id_=id_, project_id=project_id,
location=location)
@GoogleBaseHook.fallback_to_default_project_id
def update_build_trigger(
@@ -570,6 +592,7 @@ class CloudBuildHook(GoogleBaseHook):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
+ location: str = "global",
) -> BuildTrigger:
"""
Updates a BuildTrigger by its project ID and trigger ID.
@@ -584,9 +607,9 @@ class CloudBuildHook(GoogleBaseHook):
:param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
Note that if `retry` is specified, the timeout applies to each
individual attempt.
:param metadata: Optional, additional metadata that is provided to the
method.
-
+ :param location: The location of the project.
"""
- client = self.get_conn()
+ client = self.get_conn(location=location)
self.log.info("Start updating build trigger: %s.", trigger_id)
@@ -613,12 +636,16 @@ class CloudBuildAsyncHook(GoogleBaseHook):
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
+ location: str = "global",
) -> Build:
"""Retrieves a Cloud Build with a specified id."""
if not id_:
raise AirflowException("Google Cloud Build id is required.")
- client = CloudBuildAsyncClient()
+ client_options = None
+ if location != "global":
+ client_options =
ClientOptions(api_endpoint=f"{location}-cloudbuild.googleapis.com:443")
+ client = CloudBuildAsyncClient(client_options=client_options)
request = GetBuildRequest(
project_id=project_id,
diff --git a/airflow/providers/google/cloud/links/cloud_build.py
b/airflow/providers/google/cloud/links/cloud_build.py
index f42216acba..ceae038289 100644
--- a/airflow/providers/google/cloud/links/cloud_build.py
+++ b/airflow/providers/google/cloud/links/cloud_build.py
@@ -25,13 +25,15 @@ if TYPE_CHECKING:
BUILD_BASE_LINK = "/cloud-build"
-BUILD_LINK = BUILD_BASE_LINK + "/builds/{build_id}?project={project_id}"
+BUILD_LINK = BUILD_BASE_LINK +
"/builds;region={region}/{build_id}?project={project_id}"
-BUILD_LIST_LINK = BUILD_BASE_LINK + "/builds?project={project_id}"
+BUILD_LIST_LINK = BUILD_BASE_LINK +
"/builds;region={region}?project={project_id}"
-BUILD_TRIGGERS_LIST_LINK = BUILD_BASE_LINK + "/triggers?project={project_id}"
+BUILD_TRIGGERS_LIST_LINK = BUILD_BASE_LINK +
"/triggers;region={region}?project={project_id}"
-BUILD_TRIGGER_DETAILS_LINK = BUILD_BASE_LINK +
"/triggers/edit/{trigger_id}?project={project_id}"
+BUILD_TRIGGER_DETAILS_LINK = (
+ BUILD_BASE_LINK +
"/triggers;region={region}/edit/{trigger_id}?project={project_id}"
+)
class CloudBuildLink(BaseGoogleLink):
@@ -47,12 +49,14 @@ class CloudBuildLink(BaseGoogleLink):
task_instance,
build_id: str,
project_id: str,
+ region: str,
):
task_instance.xcom_push(
context=context,
key=CloudBuildLink.key,
value={
"project_id": project_id,
+ "region": region,
"build_id": build_id,
},
)
@@ -70,12 +74,14 @@ class CloudBuildListLink(BaseGoogleLink):
context: Context,
task_instance,
project_id: str,
+ region: str,
):
task_instance.xcom_push(
context=context,
key=CloudBuildListLink.key,
value={
"project_id": project_id,
+ "region": region,
},
)
@@ -92,12 +98,14 @@ class CloudBuildTriggersListLink(BaseGoogleLink):
context: Context,
task_instance,
project_id: str,
+ region: str,
):
task_instance.xcom_push(
context=context,
key=CloudBuildTriggersListLink.key,
value={
"project_id": project_id,
+ "region": region,
},
)
@@ -114,6 +122,7 @@ class CloudBuildTriggerDetailsLink(BaseGoogleLink):
context: Context,
task_instance,
project_id: str,
+ region: str,
trigger_id: str,
):
task_instance.xcom_push(
@@ -121,6 +130,7 @@ class CloudBuildTriggerDetailsLink(BaseGoogleLink):
key=CloudBuildTriggerDetailsLink.key,
value={
"project_id": project_id,
+ "region": region,
"trigger_id": trigger_id,
},
)
diff --git a/airflow/providers/google/cloud/operators/cloud_build.py
b/airflow/providers/google/cloud/operators/cloud_build.py
index f070081919..17a06a7ace 100644
--- a/airflow/providers/google/cloud/operators/cloud_build.py
+++ b/airflow/providers/google/cloud/operators/cloud_build.py
@@ -75,10 +75,10 @@ class
CloudBuildCancelBuildOperator(GoogleCloudBaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
-
+ :param location: The location of the project.
"""
- template_fields: Sequence[str] = ("project_id", "id_", "gcp_conn_id")
+ template_fields: Sequence[str] = ("project_id", "id_", "gcp_conn_id",
"location")
operator_extra_links = (CloudBuildLink(),)
def __init__(
@@ -91,6 +91,7 @@ class CloudBuildCancelBuildOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ location: str = "global",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -101,6 +102,7 @@ class
CloudBuildCancelBuildOperator(GoogleCloudBaseOperator):
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
+ self.location = location
def execute(self, context: Context):
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
@@ -110,6 +112,7 @@ class
CloudBuildCancelBuildOperator(GoogleCloudBaseOperator):
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
+ location=self.location,
)
self.xcom_push(context, key="id", value=result.id)
@@ -119,6 +122,7 @@ class
CloudBuildCancelBuildOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
build_id=result.id,
)
return Build.to_dict(result)
@@ -158,9 +162,10 @@ class
CloudBuildCreateBuildOperator(GoogleCloudBaseOperator):
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as
metadata.
:param deferrable: Run operator in the deferrable mode
+ :param location: The location of the project.
"""
- template_fields: Sequence[str] = ("project_id", "build", "gcp_conn_id",
"impersonation_chain")
+ template_fields: Sequence[str] = ("project_id", "build", "gcp_conn_id",
"impersonation_chain", "location")
operator_extra_links = (CloudBuildLink(),)
def __init__(
@@ -177,6 +182,7 @@ class
CloudBuildCreateBuildOperator(GoogleCloudBaseOperator):
delegate_to: str | None = None,
poll_interval: float = 4.0,
deferrable: bool = False,
+ location: str = "global",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -197,6 +203,7 @@ class
CloudBuildCreateBuildOperator(GoogleCloudBaseOperator):
self.delegate_to = delegate_to
self.poll_interval = poll_interval
self.deferrable = deferrable
+ self.location = location
def prepare_template(self) -> None:
# if no file is specified, skip
@@ -222,10 +229,13 @@ class
CloudBuildCreateBuildOperator(GoogleCloudBaseOperator):
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
+ location=self.location,
)
self.xcom_push(context, key="id", value=self.id_)
if not self.wait:
- return Build.to_dict(hook.get_build(id_=self.id_,
project_id=self.project_id))
+ return Build.to_dict(
+ hook.get_build(id_=self.id_, project_id=self.project_id,
location=self.location)
+ )
if self.deferrable:
self.defer(
@@ -236,6 +246,7 @@ class
CloudBuildCreateBuildOperator(GoogleCloudBaseOperator):
impersonation_chain=self.impersonation_chain,
delegate_to=self.delegate_to,
poll_interval=self.poll_interval,
+ location=self.location,
),
method_name=GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME,
)
@@ -249,6 +260,7 @@ class
CloudBuildCreateBuildOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
build_id=cloud_build_instance_result.id,
)
return Build.to_dict(cloud_build_instance_result)
@@ -267,6 +279,7 @@ class
CloudBuildCreateBuildOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
build_id=event["id_"],
)
return event["instance"]
@@ -300,10 +313,10 @@ class
CloudBuildCreateBuildTriggerOperator(GoogleCloudBaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
-
+ :param location: The location of the project.
"""
- template_fields: Sequence[str] = ("project_id", "trigger", "gcp_conn_id")
+ template_fields: Sequence[str] = ("project_id", "trigger", "gcp_conn_id",
"location")
operator_extra_links = (
CloudBuildTriggersListLink(),
CloudBuildTriggerDetailsLink(),
@@ -319,6 +332,7 @@ class
CloudBuildCreateBuildTriggerOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ location: str = "global",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -329,6 +343,7 @@ class
CloudBuildCreateBuildTriggerOperator(GoogleCloudBaseOperator):
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
+ self.location = location
def execute(self, context: Context):
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
@@ -338,6 +353,7 @@ class
CloudBuildCreateBuildTriggerOperator(GoogleCloudBaseOperator):
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
+ location=self.location,
)
self.xcom_push(context, key="id", value=result.id)
project_id = self.project_id or hook.project_id
@@ -346,12 +362,14 @@ class
CloudBuildCreateBuildTriggerOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
trigger_id=result.id,
)
CloudBuildTriggersListLink.persist(
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
)
return BuildTrigger.to_dict(result)
@@ -381,9 +399,10 @@ class
CloudBuildDeleteBuildTriggerOperator(GoogleCloudBaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
+ :param location: The location of the project.
"""
- template_fields: Sequence[str] = ("project_id", "trigger_id",
"gcp_conn_id")
+ template_fields: Sequence[str] = ("project_id", "trigger_id",
"gcp_conn_id", "location")
operator_extra_links = (CloudBuildTriggersListLink(),)
def __init__(
@@ -396,6 +415,7 @@ class
CloudBuildDeleteBuildTriggerOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ location: str = "global",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -406,6 +426,7 @@ class
CloudBuildDeleteBuildTriggerOperator(GoogleCloudBaseOperator):
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
+ self.location = location
def execute(self, context: Context):
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
@@ -415,6 +436,7 @@ class
CloudBuildDeleteBuildTriggerOperator(GoogleCloudBaseOperator):
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
+ location=self.location,
)
project_id = self.project_id or hook.project_id
if project_id:
@@ -422,6 +444,7 @@ class
CloudBuildDeleteBuildTriggerOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
)
@@ -450,10 +473,10 @@ class CloudBuildGetBuildOperator(GoogleCloudBaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
-
+ :param location: The location of the project.
"""
- template_fields: Sequence[str] = ("project_id", "id_", "gcp_conn_id")
+ template_fields: Sequence[str] = ("project_id", "id_", "gcp_conn_id",
"location")
operator_extra_links = (CloudBuildLink(),)
def __init__(
@@ -466,6 +489,7 @@ class CloudBuildGetBuildOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ location: str = "global",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -476,6 +500,7 @@ class CloudBuildGetBuildOperator(GoogleCloudBaseOperator):
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
+ self.location = location
def execute(self, context: Context):
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
@@ -485,6 +510,7 @@ class CloudBuildGetBuildOperator(GoogleCloudBaseOperator):
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
+ location=self.location,
)
project_id = self.project_id or hook.project_id
if project_id:
@@ -492,6 +518,7 @@ class CloudBuildGetBuildOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
build_id=result.id,
)
return Build.to_dict(result)
@@ -522,10 +549,10 @@ class
CloudBuildGetBuildTriggerOperator(GoogleCloudBaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
-
+ :param location: The location of the project.
"""
- template_fields: Sequence[str] = ("project_id", "trigger_id",
"gcp_conn_id")
+ template_fields: Sequence[str] = ("project_id", "trigger_id",
"gcp_conn_id", "location")
operator_extra_links = (CloudBuildTriggerDetailsLink(),)
def __init__(
@@ -538,6 +565,7 @@ class
CloudBuildGetBuildTriggerOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ location: str = "global",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -548,6 +576,7 @@ class
CloudBuildGetBuildTriggerOperator(GoogleCloudBaseOperator):
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
+ self.location = location
def execute(self, context: Context):
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
@@ -557,6 +586,7 @@ class
CloudBuildGetBuildTriggerOperator(GoogleCloudBaseOperator):
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
+ location=self.location,
)
project_id = self.project_id or hook.project_id
if project_id:
@@ -564,6 +594,7 @@ class
CloudBuildGetBuildTriggerOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
trigger_id=result.id,
)
return BuildTrigger.to_dict(result)
@@ -605,7 +636,7 @@ class
CloudBuildListBuildTriggersOperator(GoogleCloudBaseOperator):
def __init__(
self,
*,
- location: str,
+ location: str = "global",
project_id: str | None = None,
page_size: int | None = None,
page_token: str | None = None,
@@ -644,6 +675,7 @@ class
CloudBuildListBuildTriggersOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
)
return [BuildTrigger.to_dict(result) for result in results]
@@ -684,7 +716,7 @@ class CloudBuildListBuildsOperator(GoogleCloudBaseOperator):
def __init__(
self,
*,
- location: str,
+ location: str = "global",
project_id: str | None = None,
page_size: int | None = None,
filter_: str | None = None,
@@ -719,7 +751,9 @@ class CloudBuildListBuildsOperator(GoogleCloudBaseOperator):
)
project_id = self.project_id or hook.project_id
if project_id:
- CloudBuildListLink.persist(context=context, task_instance=self,
project_id=project_id)
+ CloudBuildListLink.persist(
+ context=context, task_instance=self, project_id=project_id,
region=self.location
+ )
return [Build.to_dict(result) for result in results]
@@ -750,10 +784,10 @@ class
CloudBuildRetryBuildOperator(GoogleCloudBaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
-
+ :param location: The location of the project.
"""
- template_fields: Sequence[str] = ("project_id", "id_", "gcp_conn_id")
+ template_fields: Sequence[str] = ("project_id", "id_", "gcp_conn_id",
"location")
operator_extra_links = (CloudBuildLink(),)
def __init__(
@@ -767,6 +801,7 @@ class CloudBuildRetryBuildOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ location: str = "global",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -778,6 +813,7 @@ class CloudBuildRetryBuildOperator(GoogleCloudBaseOperator):
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
+ self.location = location
def execute(self, context: Context):
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
@@ -788,6 +824,7 @@ class CloudBuildRetryBuildOperator(GoogleCloudBaseOperator):
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
+ location=self.location,
)
self.xcom_push(context, key="id", value=result.id)
@@ -797,6 +834,7 @@ class CloudBuildRetryBuildOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
build_id=result.id,
)
return Build.to_dict(result)
@@ -830,10 +868,10 @@ class
CloudBuildRunBuildTriggerOperator(GoogleCloudBaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
-
+ :param location: The location of the project.
"""
- template_fields: Sequence[str] = ("project_id", "trigger_id", "source",
"gcp_conn_id")
+ template_fields: Sequence[str] = ("project_id", "trigger_id", "source",
"gcp_conn_id", "location")
operator_extra_links = (CloudBuildLink(),)
def __init__(
@@ -848,6 +886,7 @@ class
CloudBuildRunBuildTriggerOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ location: str = "global",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -860,6 +899,7 @@ class
CloudBuildRunBuildTriggerOperator(GoogleCloudBaseOperator):
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
+ self.location = location
def execute(self, context: Context):
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
@@ -871,6 +911,7 @@ class
CloudBuildRunBuildTriggerOperator(GoogleCloudBaseOperator):
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
+ location=self.location,
)
self.xcom_push(context, key="id", value=result.id)
project_id = self.project_id or hook.project_id
@@ -879,6 +920,7 @@ class
CloudBuildRunBuildTriggerOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
build_id=result.id,
)
return Build.to_dict(result)
@@ -911,10 +953,10 @@ class
CloudBuildUpdateBuildTriggerOperator(GoogleCloudBaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
-
+ :param location: The location of the project.
"""
- template_fields: Sequence[str] = ("project_id", "trigger_id", "trigger",
"gcp_conn_id")
+ template_fields: Sequence[str] = ("project_id", "trigger_id", "trigger",
"gcp_conn_id", "location")
operator_extra_links = (CloudBuildTriggerDetailsLink(),)
def __init__(
@@ -928,6 +970,7 @@ class
CloudBuildUpdateBuildTriggerOperator(GoogleCloudBaseOperator):
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ location: str = "global",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -939,6 +982,7 @@ class
CloudBuildUpdateBuildTriggerOperator(GoogleCloudBaseOperator):
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
+ self.location = location
def execute(self, context: Context):
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
@@ -949,6 +993,7 @@ class
CloudBuildUpdateBuildTriggerOperator(GoogleCloudBaseOperator):
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
+ location=self.location,
)
self.xcom_push(context, key="id", value=result.id)
project_id = self.project_id or hook.project_id
@@ -957,6 +1002,7 @@ class
CloudBuildUpdateBuildTriggerOperator(GoogleCloudBaseOperator):
context=context,
task_instance=self,
project_id=project_id,
+ region=self.location,
trigger_id=result.id,
)
return BuildTrigger.to_dict(result)
diff --git a/airflow/providers/google/cloud/triggers/cloud_build.py
b/airflow/providers/google/cloud/triggers/cloud_build.py
index b9b4095f89..aad757e2b3 100644
--- a/airflow/providers/google/cloud/triggers/cloud_build.py
+++ b/airflow/providers/google/cloud/triggers/cloud_build.py
@@ -45,6 +45,7 @@ class CloudBuildCreateBuildTrigger(BaseTrigger):
if any. For this to work, the service account making the request must
have
domain-wide delegation enabled.
:param poll_interval: polling period in seconds to check for the status
+ :param location: The location of the project.
"""
def __init__(
@@ -55,6 +56,7 @@ class CloudBuildCreateBuildTrigger(BaseTrigger):
impersonation_chain: str | Sequence[str] | None = None,
delegate_to: str | None = None,
poll_interval: float = 4.0,
+ location: str = "global",
):
super().__init__()
self.id_ = id_
@@ -67,6 +69,7 @@ class CloudBuildCreateBuildTrigger(BaseTrigger):
)
self.delegate_to = delegate_to
self.poll_interval = poll_interval
+ self.location = location
def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes CloudBuildCreateBuildTrigger arguments and classpath."""
@@ -79,6 +82,7 @@ class CloudBuildCreateBuildTrigger(BaseTrigger):
"impersonation_chain": self.impersonation_chain,
"delegate_to": self.delegate_to,
"poll_interval": self.poll_interval,
+ "location": self.location,
},
)
@@ -91,6 +95,7 @@ class CloudBuildCreateBuildTrigger(BaseTrigger):
cloud_build_instance = await hook.get_cloud_build(
id_=self.id_,
project_id=self.project_id,
+ location=self.location,
)
if cloud_build_instance._pb.status in (Build.Status.SUCCESS,):
yield TriggerEvent(
diff --git a/tests/providers/google/cloud/hooks/test_cloud_build.py
b/tests/providers/google/cloud/hooks/test_cloud_build.py
index 1dd86528e7..a24a120562 100644
--- a/tests/providers/google/cloud/hooks/test_cloud_build.py
+++ b/tests/providers/google/cloud/hooks/test_cloud_build.py
@@ -71,9 +71,13 @@ class TestCloudBuildHook:
@mock.patch("airflow.providers.google.cloud.hooks.cloud_build.CloudBuildClient")
def test_cloud_build_service_client_creation(self, mock_client,
mock_get_creds):
result = self.hook.get_conn()
-
mock_client.assert_called_once_with(credentials=mock_get_creds.return_value,
client_info=CLIENT_INFO)
+ mock_client.assert_called_once_with(
+ credentials=mock_get_creds.return_value,
+ client_info=CLIENT_INFO,
+ client_options=None,
+ )
assert mock_client.return_value == result
- assert self.hook._client == result
+ assert self.hook._client["global"] == result
@mock.patch("airflow.providers.google.cloud.hooks.cloud_build.CloudBuildHook.get_conn")
def test_cancel_build(self, get_conn):
@@ -325,7 +329,7 @@ class TestAsyncHook:
)
@pytest.mark.asyncio
- @async_mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self:
None)
+ @async_mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self,
client_options: None)
@async_mock.patch(CLOUD_BUILD_PATH.format("CloudBuildAsyncClient.get_build"))
async def
test_async_cloud_build_service_client_creation_should_execute_successfully(
self, mocked_get_build, hook
diff --git a/tests/providers/google/cloud/operators/test_cloud_build.py
b/tests/providers/google/cloud/operators/test_cloud_build.py
index 72850fb3b1..bd02f3e02e 100644
--- a/tests/providers/google/cloud/operators/test_cloud_build.py
+++ b/tests/providers/google/cloud/operators/test_cloud_build.py
@@ -112,7 +112,7 @@ class TestCloudBuildOperator:
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None)
mock_hook.return_value.cancel_build.assert_called_once_with(
- id_=TRIGGER_ID, project_id=None, retry=DEFAULT, timeout=None,
metadata=()
+ id_=TRIGGER_ID, project_id=None, retry=DEFAULT, timeout=None,
metadata=(), location="global"
)
@mock.patch(CLOUD_BUILD_HOOK_PATH)
@@ -126,7 +126,7 @@ class TestCloudBuildOperator:
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, delegate_to=None)
build = Build(BUILD)
mock_hook.return_value.create_build_without_waiting_for_result.assert_called_once_with(
- build=build, project_id=None, retry=DEFAULT, timeout=None,
metadata=()
+ build=build, project_id=None, retry=DEFAULT, timeout=None,
metadata=(), location="global"
)
mock_hook.return_value.wait_for_operation.assert_called_once_with(timeout=None,
operation=BUILD)
@@ -174,7 +174,12 @@ class TestCloudBuildOperator:
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None)
mock_hook.return_value.create_build_trigger.assert_called_once_with(
- trigger=BUILD_TRIGGER, project_id=None, retry=DEFAULT,
timeout=None, metadata=()
+ trigger=BUILD_TRIGGER,
+ project_id=None,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ location="global",
)
@mock.patch(CLOUD_BUILD_HOOK_PATH)
@@ -186,7 +191,12 @@ class TestCloudBuildOperator:
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None)
mock_hook.return_value.delete_build_trigger.assert_called_once_with(
- trigger_id=TRIGGER_ID, project_id=None, retry=DEFAULT,
timeout=None, metadata=()
+ trigger_id=TRIGGER_ID,
+ project_id=None,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ location="global",
)
@mock.patch(CLOUD_BUILD_HOOK_PATH)
@@ -198,7 +208,7 @@ class TestCloudBuildOperator:
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None)
mock_hook.return_value.get_build.assert_called_once_with(
- id_=BUILD_ID, project_id=None, retry=DEFAULT, timeout=None,
metadata=()
+ id_=BUILD_ID, project_id=None, retry=DEFAULT, timeout=None,
metadata=(), location="global"
)
@mock.patch(CLOUD_BUILD_HOOK_PATH)
@@ -210,7 +220,12 @@ class TestCloudBuildOperator:
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None)
mock_hook.return_value.get_build_trigger.assert_called_once_with(
- trigger_id=TRIGGER_ID, project_id=None, retry=DEFAULT,
timeout=None, metadata=()
+ trigger_id=TRIGGER_ID,
+ project_id=None,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ location="global",
)
@mock.patch(CLOUD_BUILD_HOOK_PATH)
@@ -258,7 +273,13 @@ class TestCloudBuildOperator:
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None)
mock_hook.return_value.retry_build.assert_called_once_with(
- id_=BUILD_ID, project_id=None, wait=True, retry=DEFAULT,
timeout=None, metadata=()
+ id_=BUILD_ID,
+ project_id=None,
+ wait=True,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ location="global",
)
@mock.patch(CLOUD_BUILD_HOOK_PATH)
@@ -277,6 +298,7 @@ class TestCloudBuildOperator:
retry=DEFAULT,
timeout=None,
metadata=(),
+ location="global",
)
@mock.patch(CLOUD_BUILD_HOOK_PATH)
@@ -296,6 +318,7 @@ class TestCloudBuildOperator:
retry=DEFAULT,
timeout=None,
metadata=(),
+ location="global",
)
@@ -414,9 +437,9 @@ def
test_async_create_build_without_wait_should_execute_successfully(mock_hook):
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID,
impersonation_chain=None, delegate_to=None)
build = Build(BUILD)
mock_hook.return_value.create_build_without_waiting_for_result.assert_called_once_with(
- build=build, project_id=None, retry=DEFAULT, timeout=None, metadata=()
+ build=build, project_id=None, retry=DEFAULT, timeout=None,
metadata=(), location="global"
)
- mock_hook.return_value.get_build.assert_called_once_with(id_=BUILD_ID,
project_id=None)
+ mock_hook.return_value.get_build.assert_called_once_with(id_=BUILD_ID,
project_id=None, location="global")
@mock.patch(CLOUD_BUILD_HOOK_PATH)
diff --git a/tests/providers/google/cloud/triggers/test_cloud_build.py
b/tests/providers/google/cloud/triggers/test_cloud_build.py
index 7f005caf27..d16e938d65 100644
--- a/tests/providers/google/cloud/triggers/test_cloud_build.py
+++ b/tests/providers/google/cloud/triggers/test_cloud_build.py
@@ -45,6 +45,7 @@ TEST_BUILD_WORKING = {
TEST_CONN_ID = "google_cloud_default"
TEST_POLL_INTERVAL = 4.0
+TEST_LOCATION = "global"
TEST_BUILD_INSTANCE = dict(
id="test-build-id-9832662",
status=3,
@@ -98,6 +99,7 @@ def
test_async_create_build_trigger_serialization_should_execute_successfully():
impersonation_chain=None,
delegate_to=None,
poll_interval=TEST_POLL_INTERVAL,
+ location=TEST_LOCATION,
)
classpath, kwargs = trigger.serialize()
assert classpath ==
"airflow.providers.google.cloud.triggers.cloud_build.CloudBuildCreateBuildTrigger"
@@ -108,11 +110,12 @@ def
test_async_create_build_trigger_serialization_should_execute_successfully():
"impersonation_chain": None,
"delegate_to": None,
"poll_interval": TEST_POLL_INTERVAL,
+ "location": TEST_LOCATION,
}
@pytest.mark.asyncio
-@async_mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self: None)
+@async_mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self,
client_options: None)
@async_mock.patch(CLOUD_BUILD_PATH.format("CloudBuildAsyncClient.get_build"))
async def
test_async_create_build_trigger_triggers_on_success_should_execute_successfully(
mock_get_build, hook
@@ -131,6 +134,7 @@ async def
test_async_create_build_trigger_triggers_on_success_should_execute_suc
impersonation_chain=None,
delegate_to=None,
poll_interval=TEST_POLL_INTERVAL,
+ location=TEST_LOCATION,
)
generator = trigger.run()
@@ -149,7 +153,7 @@ async def
test_async_create_build_trigger_triggers_on_success_should_execute_suc
@pytest.mark.asyncio
-@async_mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self: None)
+@async_mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self,
client_options: None)
@async_mock.patch(CLOUD_BUILD_PATH.format("CloudBuildAsyncClient.get_build"))
async def
test_async_create_build_trigger_triggers_on_running_should_execute_successfully(
mock_get_build, hook, caplog
@@ -169,6 +173,7 @@ async def
test_async_create_build_trigger_triggers_on_running_should_execute_suc
impersonation_chain=None,
delegate_to=None,
poll_interval=TEST_POLL_INTERVAL,
+ location=TEST_LOCATION,
)
task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)
@@ -184,7 +189,7 @@ async def
test_async_create_build_trigger_triggers_on_running_should_execute_suc
@pytest.mark.asyncio
-@async_mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self: None)
+@async_mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self,
client_options: None)
@async_mock.patch(CLOUD_BUILD_PATH.format("CloudBuildAsyncClient.get_build"))
async def
test_async_create_build_trigger_triggers_on_error_should_execute_successfully(
mock_get_build, hook, caplog
@@ -204,6 +209,7 @@ async def
test_async_create_build_trigger_triggers_on_error_should_execute_succe
impersonation_chain=None,
delegate_to=None,
poll_interval=TEST_POLL_INTERVAL,
+ location=TEST_LOCATION,
)
generator = trigger.run()
@@ -226,6 +232,7 @@ async def
test_async_create_build_trigger_triggers_on_excp_should_execute_succes
impersonation_chain=None,
delegate_to=None,
poll_interval=TEST_POLL_INTERVAL,
+ location=TEST_LOCATION,
)
generator = trigger.run()