turbaszek commented on a change in pull request #8575:
URL: https://github.com/apache/airflow/pull/8575#discussion_r479960333
##########
File path: airflow/providers/google/cloud/operators/cloud_build.py
##########
@@ -15,92 +15,905 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+
"""Operators that integrate with Google Cloud Build service."""
+
import json
import re
+import warnings
from copy import deepcopy
-from typing import Any, Dict, Optional, Sequence, Union
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
from urllib.parse import unquote, urlparse
import yaml
+from google.api_core.retry import Retry
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger,
RepoSource
+from google.protobuf.json_format import MessageToDict, ParseDict
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
+from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook #
noqa
from airflow.utils.decorators import apply_defaults
-REGEX_REPO_PATH =
re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
+REGEX_REPO_PATH = re.compile(
+ r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)"
+)
+
+
+class CloudBuildCancelBuildOperator(BaseOperator):
+ """
+ Cancels a build in progress.
+
+ :param id_: The ID of the build.
+ :type id_: str
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+ :param gcp_conn_id: Optional, the connection ID used to connect to Google
Cloud Platform.
+ :type gcp_conn_id: Optional[str]
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+
+ :rtype: dict
+ """
+
+ template_fields = ("project_id", "id_", "gcp_conn_id",
"impersonation_chain")
+
+ @apply_defaults
+ def __init__(
+ self,
+ id_: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+ self.id_ = id_
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
+ result = hook.cancel_build(
+ id_=self.id_,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return MessageToDict(result)
+
+
+class CloudBuildCreateBuildOperator(BaseOperator):
+ """
+ Starts a build with the specified configuration.
+
+ :param build: The build resource to create. If a dict is provided, it must
be of the same form
+ as the protobuf message
`google.cloud.devtools.cloudbuild_v1.types.Build`. This can be a
+ dictionary or path to a file type like YAML or JSON.
+ :type build: Union[dict,
`google.cloud.devtools.cloudbuild_v1.types.Build`, str]
+ :param body: (Deprecated) The build resource to create.
+ This parameter has been deprecated. You should pass the build
parameter instead.
+ :type body: optional[dict]
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param wait: Optional, wait for operation to finish.
+ :type wait: Optional[bool]
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+ :param gcp_conn_id: Optional, the connection ID used to connect to Google
Cloud Platform.
+ :type gcp_conn_id: Optional[str]
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+
+ :rtype: dict
+ """
+
+ template_fields = ("project_id", "build", "body", "gcp_conn_id",
"impersonation_chain")
+ template_ext = ['.yml', '.yaml', '.json']
+
+ @apply_defaults
+ def __init__(
+ self,
+ build: Union[Dict, Build, str],
+ body: Optional[Dict] = None,
+ project_id: Optional[str] = None,
+ wait: bool = True,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+ self.build = build
+ # Not template fields to keep original value
+ self.build_raw = build
+ self.body = body
+ self.project_id = project_id
+ self.wait = wait
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def prepare_template(self) -> None:
+ # if no file is specified, skip
+ if not isinstance(self.build_raw, str):
+ return
+ with open(self.build_raw, 'r') as file:
+ if any(self.build_raw.endswith(ext) for ext in ['.yaml', '.yml']):
+ self.body = yaml.load(file.read(), Loader=yaml.FullLoader)
+ if self.build_raw.endswith('.json'):
+ self.body = json.loads(file.read())
+
+ def execute(self, context):
+ hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
+ if self.body:
+ warnings.warn(
+ "The body parameter has been deprecated. You should pass body
using "
+ "the build parameter.", DeprecationWarning, stacklevel=4)
+ if not self.build:
+ self.build = self.body
+ build = BuildProcessor(build=self.build).process_body()
+
+ result = hook.create_build(
+ build=build,
+ project_id=self.project_id,
+ wait=self.wait,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return MessageToDict(result)
+
+
+class CloudBuildCreateBuildTriggerOperator(BaseOperator):
+ """
+ Creates a new BuildTrigger.
+
+ :param trigger: The BuildTrigger to create. If a dict is provided, it must
be of the same form
+ as the protobuf message
`google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+ :type trigger: Union[dict,
`google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`]
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+ :param gcp_conn_id: Optional, the connection ID used to connect to Google
Cloud Platform.
+ :type gcp_conn_id: Optional[str]
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+
+ :rtype: dict
+ """
+
+ template_fields = ("project_id", "trigger", "gcp_conn_id",
"impersonation_chain")
+
+ @apply_defaults
+ def __init__(
+ self,
+ trigger: Union[dict, BuildTrigger],
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+ self.trigger = trigger
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
+ result = hook.create_build_trigger(
+ trigger=self.trigger,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return MessageToDict(result)
+
+
+class CloudBuildDeleteBuildTriggerOperator(BaseOperator):
+ """
+ Deletes a BuildTrigger by its project ID and trigger ID.
+
+ :param trigger_id: The ID of the BuildTrigger to delete.
+ :type trigger_id: str
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+ :param gcp_conn_id: Optional, the connection ID used to connect to Google
Cloud Platform.
+ :type gcp_conn_id: Optional[str]
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = ("project_id", "trigger_id", "gcp_conn_id",
"impersonation_chain")
+
+ @apply_defaults
+ def __init__(
+ self,
+ trigger_id: str,
+ project_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs
+ ) -> None:
+ super().__init__(**kwargs)
+ self.trigger_id = trigger_id
+ self.project_id = project_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context):
+ hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
+ hook.delete_build_trigger(
+ trigger_id=self.trigger_id,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+
+class CloudBuildGetBuildOperator(BaseOperator):
+ """
+ Returns information about a previously requested build.
+
+ :param id_: The ID of the build.
+ :type id_: str
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+ :param gcp_conn_id: Optional, the connection ID used to connect to Google
Cloud Platform.
+ :type gcp_conn_id: Optional[str]
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+
+ :rtype: dict
+ """
+
+ template_fields = ("project_id", "id_", "gcp_conn_id",
"impersonation_chain")
+
+ @apply_defaults
+ def __init__(
+ self,
+ id_: str,
Review comment:
```suggestion
*,
id_: str,
```
To make sure that users pass only key-words arguments
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]