turbaszek commented on a change in pull request #18184:
URL: https://github.com/apache/airflow/pull/18184#discussion_r706776117



##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,121 +15,933 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""Operators that integrate with Google Cloud Build service."""
+
+"""Operators that integrates with Google Cloud Build service."""
+
 import json
 import re
+import warnings
 from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
 from urllib.parse import unquote, urlparse
 
-try:
-    import airflow.utils.yaml as yaml
-except ImportError:
-    import yaml
+import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, 
RepoSource
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
 
-REGEX_REPO_PATH = 
re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = 
re.compile(r"^/(?P<project_id>[^/]+)/(?P<repo_name>[^/]+)[\+/]*(?P<branch_name>[^:]+)?")
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+    """
+    Cancels a build in progress.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+        result = hook.cancel_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+    """
+    Starts a build with the specified configuration.
+
+    :param build: The build resource to create. If a dict is provided, it must 
be of the same form
+        as the protobuf message 
`google.cloud.devtools.cloudbuild_v1.types.Build`
+    :type build: Union[dict, `google.cloud.devtools.cloudbuild_v1.types.Build`]
+    :param body: (Deprecated) The build resource to create.
+        This parameter has been deprecated. You should pass the build 
parameter instead.
+    :type body: optional[dict]
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: Optional[str]
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "build", "body", "gcp_conn_id", 
"impersonation_chain")
+
+    def __init__(
+        self,
+        *,
+        build: Union[Dict, Build, str],
+        body: Optional[Dict] = None,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
+        self.body = body
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+        if self.body:
+            warnings.warn(
+                "The body parameter has been deprecated. You should pass body 
using " "the build parameter.",
+                DeprecationWarning,
+                stacklevel=4,
+            )
+            if not self.build:
+                self.build = self.build_raw = self.body
+
+    def prepare_template(self) -> None:
+        # if no file is specified, skip
+        if not isinstance(self.build_raw, str):
+            return
+        with open(self.build_raw) as file:
+            if any(self.build_raw.endswith(ext) for ext in ['.yaml', '.yml']):
+                self.build = yaml.load(file.read(), Loader=yaml.FullLoader)
+            if self.build_raw.endswith('.json'):
+                self.build = json.loads(file.read())
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+
+        build = BuildProcessor(build=self.build).process_body()
+
+        result = hook.create_build(
+            build=build,
+            project_id=self.project_id,
+            wait=self.wait,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildCreateBuildTriggerOperator(BaseOperator):
+    """
+    Creates a new BuildTrigger.
+
+    :param trigger: The BuildTrigger to create. If a dict is provided, it must 
be of the same form
+        as the protobuf message 
`google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+    :type trigger: Union[dict, 
`google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`]
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "trigger", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        trigger: Union[dict, BuildTrigger],
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.trigger = trigger
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+        result = hook.create_build_trigger(
+            trigger=self.trigger,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return BuildTrigger.to_dict(result)
+
+
+class CloudBuildDeleteBuildTriggerOperator(BaseOperator):
+    """
+    Deletes a BuildTrigger by its project ID and trigger ID.
+
+    :param trigger_id: The ID of the BuildTrigger to delete.
+    :type trigger_id: str
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = ("project_id", "trigger_id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        trigger_id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.trigger_id = trigger_id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+        hook.delete_build_trigger(
+            trigger_id=self.trigger_id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+
+class CloudBuildGetBuildOperator(BaseOperator):
+    """
+    Returns information about a previously requested build.
+
+    :param id: The ID of the build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+        result = hook.get_build(
+            id=self.id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildGetBuildTriggerOperator(BaseOperator):
+    """
+    Returns information about a BuildTrigger.
+
+    :param trigger_id: The ID of the BuildTrigger to get.
+    :type trigger_id: str
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "trigger_id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        trigger_id: str,
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.trigger_id = trigger_id
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+        result = hook.get_build_trigger(
+            trigger_id=self.trigger_id,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return BuildTrigger.to_dict(result)
+
+
+class CloudBuildListBuildTriggersOperator(BaseOperator):
+    """
+    Lists existing BuildTriggers.
+
+    :param location: The location of the project.
+    :type location: string
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: Optional[str]
+    :param page_size: Optional, number of results to return in the list.
+    :type page_size: Optional[int]
+    :param page_token: Optional, token to provide to skip to a particular spot 
in the list.
+    :type page_token: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: List[dict]
+    """
+
+    template_fields = ("location", "project_id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        project_id: Optional[str] = None,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.location = location
+        self.project_id = project_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+        results = hook.list_build_triggers(
+            project_id=self.project_id,
+            location=self.location,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [BuildTrigger.to_dict(result) for result in results]
+
+
+class CloudBuildListBuildsOperator(BaseOperator):
+    """
+    Lists previously requested builds.
+
+    :param location: The location of the project.
+    :type location: string
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: str
+    :param page_size: Optional, number of results to return in the list.
+    :type page_size: Optional[int]
+    :param filter: Optional, the raw filter text to constrain the results.
+    :type filter: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: List[dict]
+    """
+
+    template_fields = ("location", "project_id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        location: str,
+        project_id: Optional[str] = None,
+        page_size: Optional[int] = None,
+        filter: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.location = location
+        self.project_id = project_id
+        self.page_size = page_size
+        self.filter = filter
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+        results = hook.list_builds(
+            project_id=self.project_id,
+            location=self.location,
+            page_size=self.page_size,
+            filter=self.filter,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Build.to_dict(result) for result in results]
+
+
+class CloudBuildRetryBuildOperator(BaseOperator):
+    """
+    Creates a new build based on the specified build. This method creates a 
new build
+    using the original build request, which may or may not result in an 
identical build.
+
+    :param id: Build ID of the original build.
+    :type id: str
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: str
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "id", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        id: str,
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.id = id
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+        result = hook.retry_build(
+            id=self.id,
+            project_id=self.project_id,
+            wait=self.wait,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildRunBuildTriggerOperator(BaseOperator):
+    """
+    Runs a BuildTrigger at a particular source revision.
+
+    :param trigger_id: The ID of the trigger.
+    :type trigger_id: str
+    :param source: Source to build against this trigger. If a dict is 
provided, it must be of the same form
+        as the protobuf message 
`google.cloud.devtools.cloudbuild_v1.types.RepoSource`
+    :type source: Union[dict, 
`google.cloud.devtools.cloudbuild_v1.types.RepoSource`]
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: str
+    :param wait: Optional, wait for operation to finish.
+    :type wait: Optional[bool]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "trigger_id", "source", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        trigger_id: str,
+        source: Union[dict, RepoSource],
+        project_id: Optional[str] = None,
+        wait: bool = True,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.trigger_id = trigger_id
+        self.source = source
+        self.project_id = project_id
+        self.wait = wait
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+        result = hook.run_build_trigger(
+            trigger_id=self.trigger_id,
+            source=self.source,
+            project_id=self.project_id,
+            wait=self.wait,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Build.to_dict(result)
+
+
+class CloudBuildUpdateBuildTriggerOperator(BaseOperator):
+    """
+    Updates a BuildTrigger by its project ID and trigger ID.
+
+    :param trigger_id: The ID of the trigger.
+    :type trigger_id: str
+    :param trigger: The BuildTrigger to create. If a dict is provided, it must 
be of the same form
+        as the protobuf message 
`google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+    :type trigger: Union[dict, 
`google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`]
+    :param project_id: Optional, Google Cloud Project project_id where the 
function belongs.
+        If set to None or missing, the default project_id from the GCP 
connection is used.
+    :type project_id: Optional[str]
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :type retry: Optional[Retry]
+    :param timeout: Optional, the amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :type timeout: Optional[float]
+    :param metadata: Optional, additional metadata that is provided to the 
method.
+    :type metadata: Optional[Sequence[Tuple[str, str]]]
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google 
Cloud Platform.
+    :type gcp_conn_id: Optional[str]
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+
+    :rtype: dict
+    """
+
+    template_fields = ("project_id", "trigger_id", "trigger", "gcp_conn_id")
+
+    def __init__(
+        self,
+        *,
+        trigger_id: str,
+        trigger: Union[dict, BuildTrigger],
+        project_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.trigger_id = trigger_id
+        self.trigger = trigger
+        self.project_id = project_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context):
+        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
+        result = hook.update_build_trigger(
+            trigger_id=self.trigger_id,
+            trigger=self.trigger,
+            project_id=self.project_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return BuildTrigger.to_dict(result)
 
 
 class BuildProcessor:
     """
     Processes build configurations to add additional functionality to support 
the use of operators.
-
     The following improvements are made:
-
     * It is required to provide the source and only one type can be given,
     * It is possible to provide the source as the URL address instead dict.
 
-    :param body: The request body.
-        See: 
https://cloud.google.com/cloud-build/docs/api/reference/rest/v1/projects.builds
-    :type body: dict
+    :param build: The request body of the build.
+        See: 
https://cloud.google.com/cloud-build/docs/api/reference/rest/Shared.Types/Build
+    :type build: Union[Dict, Build]
     """
 
-    def __init__(self, body: dict) -> None:
-        self.body = deepcopy(body)
+    def __init__(self, build: Union[Dict, Build]) -> None:
+        if isinstance(build, Build):
+            self.build = Build(build)
+        self.build = deepcopy(build)
 
     def _verify_source(self) -> None:
-        is_storage = "storageSource" in self.body["source"]
-        is_repo = "repoSource" in self.body["source"]
-
-        sources_count = sum([is_storage, is_repo])
-
-        if sources_count != 1:
+        if not (("storage_source" in self.build["source"]) ^ ("repo_source" in 
self.build["source"])):
             raise AirflowException(
                 "The source could not be determined. Please choose one data 
source from: "
-                "storageSource and repoSource."
+                "storage_source and repo_source."
             )
 
     def _reformat_source(self) -> None:
         self._reformat_repo_source()
         self._reformat_storage_source()
 
     def _reformat_repo_source(self) -> None:
-        if "repoSource" not in self.body["source"]:
+        if "repo_source" not in self.build["source"]:
             return
 
-        source = self.body["source"]["repoSource"]
+        source = self.build["source"]["repo_source"]

Review comment:
       ```suggestion
           repo_source = self.build["source"]["repo_source"]
   ```
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to