[
https://issues.apache.org/jira/browse/AIRFLOW-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187524#comment-17187524
]
ASF GitHub Bot commented on AIRFLOW-6981:
-----------------------------------------
turbaszek commented on a change in pull request #8575:
URL: https://github.com/apache/airflow/pull/8575#discussion_r479956513
##########
File path: airflow/providers/google/cloud/hooks/cloud_build.py
##########
@@ -69,82 +67,568 @@ def __init__(
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
+ self._client: Optional[CloudBuildClient] = None
+
+ def get_conn(self) -> CloudBuildClient:
+ """
+ Retrieves the connection to Google Cloud Build.
- self.api_version = api_version
+ :return: Google Cloud Build client object.
+ :rtype: `google.cloud.devtools.cloudbuild_v1.CloudBuildClient`
+ """
+ if not self._client:
+ self._client = CloudBuildClient(
+ credentials=self._get_credentials(),
+ client_info=self.client_info,
+ )
+ return self._client
- def get_conn(self):
+ @GoogleBaseHook.fallback_to_default_project_id
+ def cancel_build(
+ self,
+ id_: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Build:
"""
- Retrieves the connection to Cloud Build.
+ Cancels a build in progress.
+
+ :param id_: The ID of the build.
+ :type id_: str
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
- :return: Google Cloud Build services object.
+ :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
"""
- if not self._conn:
- http_authorized = self._authorize()
- self._conn = build("cloudbuild", self.api_version,
http=http_authorized, cache_discovery=False)
- return self._conn
+ if not project_id:
+ raise ValueError("The project_id should be set")
+
+ client = self.get_conn()
+ return client.cancel_build(
+ project_id=project_id,
+ id_=id_,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
@GoogleBaseHook.fallback_to_default_project_id
- def create_build(self, body: Dict, project_id: str) -> Dict:
+ def create_build(
+ self,
+ build: Union[Dict, Build],
+ project_id: str,
+ wait: bool = True,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Build:
"""
Starts a build with the specified configuration.
- :param body: The request body.
- See:
https://cloud.google.com/cloud-build/docs/api/reference/rest/v1/projects.builds
- :type body: dict
+ :param build: The build resource to create. If a dict is provided, it
must be of the same form
+ as the protobuf message
`google.cloud.devtools.cloudbuild_v1.types.Build`
+ :type build: Union[dict,
`google.cloud.devtools.cloudbuild_v1.types.Build`]
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param wait: Optional, wait for operation to finish.
+ :type wait: Optional[bool]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+ :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+ """
+ if not project_id:
+ raise ValueError("The project_id should be set")
+
+ client = self.get_conn()
+ operation = client.create_build(
+ project_id=project_id,
+ build=build,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ operation_dict = MessageToDict(operation)
+ try:
+ id_ = operation_dict["metadata"]["build"]["id"]
+ except Exception:
+ raise AirflowException(
+ "Could not retrieve Build ID from Operation."
+ )
+
+ if wait:
+ self._wait_for_operation_to_complete(
+ func=self.get_build,
+ id_=id_,
+ project_id=project_id, # type: ignore
+ )
+ return self.get_build(id_=id_, project_id=project_id)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_build_trigger(
+ self,
+ trigger: Union[dict, BuildTrigger],
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> BuildTrigger:
+ """
+ Creates a new BuildTrigger.
+
+ :param trigger: The BuildTrigger to create. If a dict is provided, it
must be of the same form
+ as the protobuf message
`google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+ :type trigger: Union[dict,
`google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`]
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+ :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+ """
+ if not project_id:
+ raise ValueError("The project_id should be set")
+
+ client = self.get_conn()
+ return client.create_build_trigger(
+ project_id=project_id,
+ trigger=trigger,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_build_trigger(
+ self,
+ trigger_id: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> None:
+ """
+ Deletes a BuildTrigger by its project ID and trigger ID.
+
+ :param trigger_id: The ID of the BuildTrigger to delete.
+ :type trigger_id: str
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+ """
+ if not project_id:
+ raise ValueError("The project_id should be set")
+
+ client = self.get_conn()
+ client.delete_build_trigger(
+ project_id=project_id,
+ trigger_id=trigger_id,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_build(
+ self,
+ id_: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Build:
+ """
+ Returns information about a previously requested build.
+
+ :param id_: The ID of the build.
+ :type id_: str
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+ :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+ """
+ if not project_id:
+ raise ValueError("The project_id should be set")
+
+ client = self.get_conn()
+ return client.get_build(
+ project_id=project_id,
+ id_=id_,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_build_trigger(
+ self,
+ trigger_id: str,
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> BuildTrigger:
+ """
+ Returns information about a BuildTrigger.
+
+ :param trigger_id: The ID of the BuildTrigger to get.
+ :type trigger_id: str
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+ :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+ """
+ if not project_id:
+ raise ValueError("The project_id should be set")
+
+ client = self.get_conn()
+ return client.get_build_trigger(
+ project_id=project_id,
+ trigger_id=trigger_id,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def list_build_triggers(
+ self,
+ project_id: str,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> ListBuildTriggersResponse:
+ """
+ Lists existing BuildTriggers.
+
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param page_size: Optional, number of results to return in the list.
+ :type page_size: Optional[int]
+ :param page_token: Optional, token to provide to skip to a particular
spot in the list.
+ :type page_token: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+ :rtype:
`google.cloud.devtools.cloudbuild_v1.types.ListBuildTriggersResponse`
+ """
+
+ if not project_id:
+ raise ValueError("The project_id should be set")
+
+ client = self.get_conn()
+ return client.list_build_triggers(
+ project_id=project_id,
+ page_size=page_size,
+ page_token=page_token,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def list_builds(
+ self,
+ project_id: str,
+ page_size: Optional[int] = None,
+ filter_: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> List[Build]:
+ """
+ Lists previously requested builds.
+
:param project_id: Optional, Google Cloud Project project_id where the
function belongs.
If set to None or missing, the default project_id from the GCP
connection is used.
:type project_id: str
- :return: Dict
+ :param page_size: Optional, number of results to return in the list.
+ :type page_size: Optional[int]
+ :param filter_: Optional, the raw filter text to constrain the results.
+ :type filter_: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+ :rtype: List[`google.cloud.devtools.cloudbuild_v1.types.Build`]
"""
- service = self.get_conn()
+ if not project_id:
+ raise ValueError("The project_id should be set")
- # Create build
- response = (
- service.projects() # pylint: disable=no-member
- .builds()
- .create(projectId=project_id, body=body)
- .execute(num_retries=self.num_retries)
+ client = self.get_conn()
+ builds = client.list_builds(
+ project_id=project_id,
+ page_size=page_size,
+ filter_=filter_,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
)
+ return list(builds)
- # Wait
- operation_name = response["name"]
- self._wait_for_operation_to_complete(operation_name=operation_name)
+ @GoogleBaseHook.fallback_to_default_project_id
+ def retry_build(
+ self,
+ id_: str,
+ project_id: str,
+ wait: bool = True,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Build:
+ """
+ Creates a new build based on the specified build. This method creates
a new build
+ using the original build request, which may or may not result in an
identical build.
+
+ :param id_: Build ID of the original build.
+ :type id_: str
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: str
+ :param wait: Optional, wait for operation to finish.
+ :type wait: Optional[bool]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
- # Get result
- build_id = response["metadata"]["build"]["id"]
+ :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+ """
+ if not project_id:
+ raise ValueError("The project_id should be set")
- result = (
- service.projects() # pylint: disable=no-member
- .builds()
- .get(projectId=project_id, id=build_id)
- .execute(num_retries=self.num_retries)
+ client = self.get_conn()
+ operation = client.retry_build(
+ project_id=project_id,
+ id_=id_,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
)
- return result
+ operation_dict = MessageToDict(operation)
+ try:
+ id_ = operation_dict["metadata"]["build"]["id"]
+ except Exception:
+ raise AirflowException(
+ "Could not retrieve Build ID from Operation."
+ )
+
+ if wait:
+ self._wait_for_operation_to_complete(
+ func=self.get_build,
+ id_=id_, # type: ignore
+ project_id=project_id, # type: ignore
+ )
+ return self.get_build(id_=id_, project_id=project_id)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def run_build_trigger(
+ self,
+ trigger_id: str,
+ source: Union[dict, RepoSource],
+ project_id: str,
+ wait: bool = True,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> Build:
+ """
+ Runs a BuildTrigger at a particular source revision.
+
+ :param trigger_id: The ID of the trigger.
+ :type trigger_id: str
+ :param source: Source to build against this trigger. If a dict is
provided, it must be of the
+ same form as the protobuf message
`google.cloud.devtools.cloudbuild_v1.types.RepoSource`
+ :type source: Union[dict,
`google.cloud.devtools.cloudbuild_v1.types.RepoSource`]
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: str
+ :param wait: Optional, wait for operation to finish.
+ :type wait: Optional[bool]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
- def _wait_for_operation_to_complete(self, operation_name: str) -> None:
+ :rtype: `google.cloud.devtools.cloudbuild_v1.types.Build`
+ """
+ if not project_id:
+ raise ValueError("The project_id should be set")
+
+ client = self.get_conn()
+ operation = client.run_build_trigger(
+ project_id=project_id,
+ trigger_id=trigger_id,
+ source=source,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ operation_dict = MessageToDict(operation)
+ try:
+ id_ = operation_dict["metadata"]["build"]["id"]
+ except Exception:
+ raise AirflowException(
+ "Could not retrieve Build ID from Operation."
+ )
+
+ if wait:
+ self._wait_for_operation_to_complete(
+ func=self.get_build,
+ id_=id_,
+ project_id=project_id, # type: ignore
+ )
+ return self.get_build(id_=id_, project_id=project_id)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def update_build_trigger(
+ self,
+ trigger_id: str,
+ trigger: Union[dict, BuildTrigger],
+ project_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ) -> BuildTrigger:
+ """
+ Updates a BuildTrigger by its project ID and trigger ID.
+
+ :param trigger_id: The ID of the trigger.
+ :type trigger_id: str
+ :param trigger: The BuildTrigger to create. If a dict is provided, it
must be of the same form
+ as the protobuf message
`google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+ :type trigger: Union[dict,
`google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`]
+ :param project_id: Optional, Google Cloud Project project_id where the
function belongs.
+ If set to None or missing, the default project_id from the GCP
connection is used.
+ :type project_id: Optional[str]
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :type retry: Optional[Retry]
+ :param timeout: Optional, the amount of time, in seconds, to wait for
the request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :type timeout: Optional[float]
+ :param metadata: Optional, additional metadata that is provided to the
method.
+ :type metadata: Optional[Sequence[Tuple[str, str]]]
+
+ :rtype: `google.cloud.devtools.cloudbuild_v1.types.BuildTrigger`
+ """
+ if not project_id:
+ raise ValueError("The project_id should be set")
+
+ client = self.get_conn()
+ return client.update_build_trigger(
+ project_id=project_id,
+ trigger_id=trigger_id,
+ trigger=trigger,
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ def _wait_for_operation_to_complete(
+ self, func: Callable, **kwargs: Optional[dict]
+ ) -> dict:
"""
Waits for the named operation to complete - checks status of the
asynchronous call.
- :param operation_name: The name of the operation.
- :type operation_name: str
+ :param func: The function that needs to be called.
+ :type func: Callable
+ :param kwargs: dict of function keyword arguments
+ :type kwargs: dict
:return: The response returned by the operation.
:rtype: dict
:exception: AirflowException in case error is returned.
"""
- service = self.get_conn()
while True:
- operation_response = (
- # pylint: disable=no-member
-
service.operations().get(name=operation_name).execute(num_retries=self.num_retries)
- )
- if operation_response.get("done"):
- response = operation_response.get("response")
- error = operation_response.get("error")
- # Note, according to documentation always either response or
error is
- # set when "done" == True
- if error:
- raise AirflowException(str(error))
- return response
+ operation = func(**kwargs)
+ operation_dict = MessageToDict(operation)
+ status = operation_dict["status"]
+ if status:
+ if status == "SUCCESS":
+ return operation_dict
+ elif status in [
+ "FAILURE",
+ "INTERNAL_ERROR",
+ "TIMEOUT",
+ "CANCELLED",
+ "EXPIRED",
+ ]:
+ raise AirflowException(str(operation_dict))
Review comment:
Won't `operation.result()` do the same trick as `while` loop?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Move Google Cloud Build from Discovery API to Python Library
> ------------------------------------------------------------
>
> Key: AIRFLOW-6981
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6981
> Project: Apache Airflow
> Issue Type: Improvement
> Components: gcp
> Affects Versions: 2.0.0
> Reporter: Ryan Yuan
> Assignee: Ryan Yuan
> Priority: Major
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)