This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dc510d00f42 Add operators for AspectType resource (#46240)
dc510d00f42 is described below

commit dc510d00f428cf1a77238666cb5202b71ffb96cb
Author: VladaZakharova <[email protected]>
AuthorDate: Sat Feb 1 13:12:49 2025 +0100

    Add operators for AspectType resource (#46240)
    
    Co-authored-by: Ulada Zakharava <[email protected]>
---
 .../operators/cloud/dataplex.rst                   |  84 ++++
 docs/spelling_wordlist.txt                         |   2 +
 .../providers/google/cloud/hooks/dataplex.py       | 196 ++++++++++
 .../providers/google/cloud/links/dataplex.py       |  49 +++
 .../providers/google/cloud/operators/dataplex.py   | 424 ++++++++++++++++++++-
 .../src/airflow/providers/google/provider.yaml     |   2 +
 .../tests/google/cloud/hooks/test_dataplex.py      | 105 +++++
 .../tests/google/cloud/links/test_dataplex.py      |  57 ++-
 .../tests/google/cloud/operators/test_dataplex.py  | 149 +++++++-
 .../cloud/dataplex/example_dataplex_catalog.py     |  74 ++++
 10 files changed, 1137 insertions(+), 5 deletions(-)

diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst 
b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
index a8bd9acde56..fc45b5b95bc 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
@@ -595,3 +595,87 @@ use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUp
     :dedent: 4
     :start-after: [START howto_operator_dataplex_catalog_update_entry_type]
     :end-before: [END howto_operator_dataplex_catalog_update_entry_type]
+
+.. _howto/operator:DataplexCatalogCreateAspectTypeOperator:
+
+Create an AspectType
+--------------------
+
+To create an Aspect Type in specific location in Dataplex Catalog you can
+use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateAspectTypeOperator`
+For more information about the available fields to pass when creating an 
Aspect Type, visit `Aspect Type resource configuration. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.aspectTypes#AspectType>`__
+
+A simple Aspect Group configuration can look as followed:
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_dataplex_aspect_type_configuration]
+    :end-before: [END howto_dataplex_aspect_type_configuration]
+
+With this configuration you can create an Aspect Type resource:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateAspectTypeOperator`
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dataplex_catalog_create_aspect_type]
+    :end-before: [END howto_operator_dataplex_catalog_create_aspect_type]
+
+.. _howto/operator:DataplexCatalogDeleteAspectTypeOperator:
+
+Delete an AspectType
+--------------------
+
+To delete an Aspect Type in specific location in Dataplex Catalog you can
+use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteAspectTypeOperator`
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dataplex_catalog_delete_aspect_type]
+    :end-before: [END howto_operator_dataplex_catalog_delete_aspect_type]
+
+.. _howto/operator:DataplexCatalogListAspectTypesOperator:
+
+List AspectTypes
+----------------
+
+To list all Aspect Types in specific location in Dataplex Catalog you can
+use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListAspectTypesOperator`.
+This operator also supports filtering and ordering the result of the operation.
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dataplex_catalog_list_aspect_types]
+    :end-before: [END howto_operator_dataplex_catalog_list_aspect_types]
+
+.. _howto/operator:DataplexCatalogGetAspectTypeOperator:
+
+Get an AspectType
+-----------------
+
+To retrieve an Aspect Group in specific location in Dataplex Catalog you can
+use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetAspectTypeOperator`
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dataplex_catalog_get_aspect_type]
+    :end-before: [END howto_operator_dataplex_catalog_get_aspect_type]
+
+.. _howto/operator:DataplexCatalogUpdateAspectTypeOperator:
+
+Update an AspectType
+--------------------
+
+To update an Aspect Type in specific location in Dataplex Catalog you can
+use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateAspectTypeOperator`
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dataplex_catalog_update_aspect_type]
+    :end-before: [END howto_operator_dataplex_catalog_update_aspect_type]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 2b35ed5bc2d..eeb1978738d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -91,6 +91,8 @@ asciiart
 asctime
 asend
 asia
+AspectType
+AspectTypes
 assertEqualIgnoreMultipleSpaces
 AssetEvent
 AssetEvents
diff --git a/providers/src/airflow/providers/google/cloud/hooks/dataplex.py 
b/providers/src/airflow/providers/google/cloud/hooks/dataplex.py
index df6a4414284..28f85969006 100644
--- a/providers/src/airflow/providers/google/cloud/hooks/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/hooks/dataplex.py
@@ -32,6 +32,7 @@ from google.cloud.dataplex_v1 import (
 )
 from google.cloud.dataplex_v1.services.catalog_service import 
CatalogServiceClient
 from google.cloud.dataplex_v1.types import (
+    AspectType,
     Asset,
     DataScan,
     DataScanJob,
@@ -56,6 +57,7 @@ if TYPE_CHECKING:
     from google.api_core.retry import Retry
     from google.api_core.retry_async import AsyncRetry
     from google.cloud.dataplex_v1.services.catalog_service.pagers import (
+        ListAspectTypesPager,
         ListEntryGroupsPager,
         ListEntryTypesPager,
     )
@@ -138,6 +140,78 @@ class DataplexHook(GoogleBaseHook):
             error = operation.exception(timeout=timeout)
             raise AirflowException(error)
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_aspect_type(
+        self,
+        location: str,
+        aspect_type_id: str,
+        aspect_type_configuration: AspectType | dict,
+        project_id: str = PROVIDE_PROJECT_ID,
+        validate_only: bool = False,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Operation:
+        """
+        Create an EntryType resource.
+
+        :param location: Required. The ID of the Google Cloud location that 
the task belongs to.
+        :param aspect_type_id: Required. AspectType identifier.
+        :param aspect_type_configuration: Required. AspectType configuration 
body.
+        :param project_id: Optional. The ID of the Google Cloud project that 
the task belongs to.
+        :param validate_only: Optional. If set, performs request validation, 
but does not actually execute
+            the create request.
+        :param retry: Optional. A retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Optional. Additional metadata that is provided to the 
method.
+        """
+        client = self.get_dataplex_catalog_client()
+        return client.create_aspect_type(
+            request={
+                "parent": client.common_location_path(project_id, location),
+                "aspect_type_id": aspect_type_id,
+                "aspect_type": aspect_type_configuration,
+                "validate_only": validate_only,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_aspect_type(
+        self,
+        location: str,
+        aspect_type_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> AspectType:
+        """
+        Get an AspectType resource.
+
+        :param location: Required. The ID of the Google Cloud location that 
the task belongs to.
+        :param aspect_type_id: Required. AspectType identifier.
+        :param project_id: Optional. The ID of the Google Cloud project that 
the task belongs to.
+        :param retry: Optional. A retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Optional. Additional metadata that is provided to the 
method.
+        """
+        client = self.get_dataplex_catalog_client()
+        return client.get_aspect_type(
+            request={
+                "name": client.aspect_type_path(project_id, location, 
aspect_type_id),
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
     @GoogleBaseHook.fallback_to_default_project_id
     def create_entry_type(
         self,
@@ -210,6 +284,128 @@ class DataplexHook(GoogleBaseHook):
             metadata=metadata,
         )
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_aspect_type(
+        self,
+        location: str,
+        aspect_type_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Operation:
+        """
+        Delete an AspectType resource.
+
+        :param location: Required. The ID of the Google Cloud location that 
the task belongs to.
+        :param aspect_type_id: Required. AspectType identifier.
+        :param project_id: Optional. The ID of the Google Cloud project that 
the task belongs to.
+        :param retry: Optional. A retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Optional. Additional metadata that is provided to the 
method.
+        """
+        client = self.get_dataplex_catalog_client()
+        return client.delete_aspect_type(
+            request={
+                "name": client.aspect_type_path(project_id, location, 
aspect_type_id),
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_aspect_types(
+        self,
+        location: str,
+        filter_by: str | None = None,
+        order_by: str | None = None,
+        page_size: int | None = None,
+        page_token: str | None = None,
+        project_id: str = PROVIDE_PROJECT_ID,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> ListAspectTypesPager:
+        """
+        List AspectTypes resources from specific location.
+
+        :param location: Required. The ID of the Google Cloud location that 
the task belongs to.
+        :param filter_by: Optional. Filter to apply on the list results.
+        :param order_by: Optional. Fields to order the results by.
+        :param page_size: Optional. Maximum number of EntryGroups to return on 
one page.
+        :param page_token: Optional. Token to retrieve the next page of 
results.
+        :param project_id: Optional. The ID of the Google Cloud project that 
the task belongs to.
+        :param retry: Optional. A retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Optional. Additional metadata that is provided to the 
method.
+        """
+        client = self.get_dataplex_catalog_client()
+        return client.list_aspect_types(
+            request={
+                "parent": client.common_location_path(project_id, location),
+                "filter": filter_by,
+                "order_by": order_by,
+                "page_size": page_size,
+                "page_token": page_token,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_aspect_type(
+        self,
+        location: str,
+        aspect_type_id: str,
+        aspect_type_configuration: dict | AspectType,
+        project_id: str = PROVIDE_PROJECT_ID,
+        update_mask: list[str] | FieldMask | None = None,
+        validate_only: bool | None = False,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Operation:
+        """
+        Update an AspectType resource.
+
+        :param aspect_type_id: Required. ID of the AspectType to update.
+        :param aspect_type_configuration: Required. The updated configuration 
body of the AspectType.
+        :param location: Required. The ID of the Google Cloud location that 
the task belongs to.
+        :param update_mask: Optional. Names of fields whose values to 
overwrite on an entry group.
+            If this parameter is absent or empty, all modifiable fields are 
overwritten. If such
+            fields are non-required and omitted in the request body, their 
values are emptied.
+        :param project_id: Optional. The ID of the Google Cloud project that 
the task belongs to.
+        :param validate_only: Optional. The service validates the request 
without performing any mutations.
+        :param retry: Optional. A retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Optional. Additional metadata that is provided to the 
method.
+        """
+        client = self.get_dataplex_catalog_client()
+        _aspect_type = (
+            deepcopy(aspect_type_configuration)
+            if isinstance(aspect_type_configuration, dict)
+            else AspectType.to_dict(aspect_type_configuration)
+        )
+        _aspect_type["name"] = client.aspect_type_path(project_id, location, 
aspect_type_id)
+        return client.update_aspect_type(
+            request={
+                "aspect_type": _aspect_type,
+                "update_mask": FieldMask(paths=update_mask) if 
type(update_mask) is list else update_mask,
+                "validate_only": validate_only,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
     @GoogleBaseHook.fallback_to_default_project_id
     def delete_entry_type(
         self,
diff --git a/providers/src/airflow/providers/google/cloud/links/dataplex.py 
b/providers/src/airflow/providers/google/cloud/links/dataplex.py
index db9bba40276..8f30527c89b 100644
--- a/providers/src/airflow/providers/google/cloud/links/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/links/dataplex.py
@@ -39,6 +39,10 @@ DATAPLEX_CATALOG_ENTRY_TYPE_LINK = (
     
"/dataplex/projects/{project_id}/locations/{location}/entryTypes/{entry_type_id}?project={project_id}"
 )
 DATAPLEX_CATALOG_ENTRY_TYPES_LINK = 
"/dataplex/catalog/entry-types?project={project_id}"
+DATAPLEX_CATALOG_ASPECT_TYPE_LINK = (
+    
"/dataplex/projects/{project_id}/locations/{location}/aspectTypes/{aspect_type_id}?project={project_id}"
+)
+DATAPLEX_CATALOG_ASPECT_TYPES_LINK = 
"/dataplex/catalog/aspect-types?project={project_id}"
 
 
 class DataplexTaskLink(BaseGoogleLink):
@@ -199,3 +203,48 @@ class DataplexCatalogEntryTypesLink(BaseGoogleLink):
                 "project_id": task_instance.project_id,
             },
         )
+
+
+class DataplexCatalogAspectTypeLink(BaseGoogleLink):
+    """Helper class for constructing Dataplex Catalog AspectType link."""
+
+    name = "Dataplex Catalog AspectType"
+    key = "dataplex_catalog_aspect_type_key"
+    format_str = DATAPLEX_CATALOG_ASPECT_TYPE_LINK
+
+    @staticmethod
+    def persist(
+        context: Context,
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=DataplexCatalogAspectTypeLink.key,
+            value={
+                "aspect_type_id": task_instance.aspect_type_id,
+                "location": task_instance.location,
+                "project_id": task_instance.project_id,
+            },
+        )
+
+
+class DataplexCatalogAspectTypesLink(BaseGoogleLink):
+    """Helper class for constructing Dataplex Catalog AspectTypes link."""
+
+    name = "Dataplex Catalog AspectTypes"
+    key = "dataplex_catalog_aspect_types_key"
+    format_str = DATAPLEX_CATALOG_ASPECT_TYPES_LINK
+
+    @staticmethod
+    def persist(
+        context: Context,
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=DataplexCatalogAspectTypesLink.key,
+            value={
+                "location": task_instance.location,
+                "project_id": task_instance.project_id,
+            },
+        )
diff --git a/providers/src/airflow/providers/google/cloud/operators/dataplex.py 
b/providers/src/airflow/providers/google/cloud/operators/dataplex.py
index ab351b36a28..f442ba7172c 100644
--- a/providers/src/airflow/providers/google/cloud/operators/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/operators/dataplex.py
@@ -40,12 +40,14 @@ from google.api_core.exceptions import AlreadyExists, 
GoogleAPICallError, NotFou
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.retry import Retry, exponential_sleep_generator
 from google.cloud.dataplex_v1.types import (
+    AspectType,
     Asset,
     DataScan,
     DataScanJob,
     EntryGroup,
     EntryType,
     Lake,
+    ListAspectTypesResponse,
     ListEntryGroupsResponse,
     ListEntryTypesResponse,
     Task,
@@ -56,6 +58,8 @@ from googleapiclient.errors import HttpError
 from airflow.configuration import conf
 from airflow.providers.google.cloud.hooks.dataplex import 
AirflowDataQualityScanException, DataplexHook
 from airflow.providers.google.cloud.links.dataplex import (
+    DataplexCatalogAspectTypeLink,
+    DataplexCatalogAspectTypesLink,
     DataplexCatalogEntryGroupLink,
     DataplexCatalogEntryGroupsLink,
     DataplexCatalogEntryTypeLink,
@@ -2828,7 +2832,7 @@ class 
DataplexCatalogListEntryTypesOperator(DataplexCatalogBaseOperator):
 
     :param filter_by: Optional. Filter to apply on the list results.
     :param order_by: Optional. Fields to order the results by.
-    :param page_size: Optional. Maximum number of EntryGroups to return on the 
page.
+    :param page_size: Optional. Maximum number of EntryTypes to return on the 
page.
     :param page_token: Optional. Token to retrieve the next page of results.
     :param project_id: Required. The ID of the Google Cloud project where the 
service is used.
     :param location: Required. The ID of the Google Cloud region where the 
service is used.
@@ -2887,7 +2891,7 @@ class 
DataplexCatalogListEntryTypesOperator(DataplexCatalogBaseOperator):
                 timeout=self.timeout,
                 metadata=self.metadata,
             )
-            self.log.info("EntryGroup on page: %s", entry_type_on_page)
+            self.log.info("EntryType on page: %s", entry_type_on_page)
             self.xcom_push(
                 context=context,
                 key="entry_type_page",
@@ -2896,7 +2900,7 @@ class 
DataplexCatalogListEntryTypesOperator(DataplexCatalogBaseOperator):
         except Exception as ex:
             raise AirflowException(ex)
 
-        # Constructing list to return EntryGroups in readable format
+        # Constructing list to return EntryTypes in readable format
         entry_types_list = [
             MessageToDict(entry_type._pb, preserving_proto_field_name=True)
             for entry_type in next(iter(entry_type_on_page.pages)).entry_types
@@ -2997,3 +3001,417 @@ class 
DataplexCatalogUpdateEntryTypeOperator(DataplexCatalogBaseOperator):
         if not self.validate_request:
             self.log.info("EntryType %s was successfully updated.", 
self.entry_type_id)
         return result
+
+
+class DataplexCatalogCreateAspectTypeOperator(DataplexCatalogBaseOperator):
+    """
+    Create an AspectType resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataplexCatalogCreateAspectTypeOperator`
+
+    :param aspect_type_id: Required. AspectType identifier.
+    :param aspect_type_configuration: Required. AspectType configuration.
+        For more details please see API documentation:
+        
https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.aspectTypes#AspectType
+    :param validate_request: Optional. If set, performs request validation, 
but does not actually
+        execute the request.
+    :param project_id: Required. The ID of the Google Cloud project where the 
service is used.
+    :param location: Required. The ID of the Google Cloud region where the 
service is used.
+    :param gcp_conn_id: Optional. The connection ID to use to connect to 
Google Cloud.
+    :param retry: Optional. A retry object used to retry requests. If `None` 
is specified, requests will not
+        be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = tuple(
+        {"aspect_type_id", "aspect_type_configuration"} | 
set(DataplexCatalogBaseOperator.template_fields)
+    )
+    operator_extra_links = (DataplexCatalogAspectTypeLink(),)
+
+    def __init__(
+        self,
+        aspect_type_id: str,
+        aspect_type_configuration: AspectType | dict,
+        validate_request: bool = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.aspect_type_id = aspect_type_id
+        self.aspect_type_configuration = aspect_type_configuration
+        self.validate_request = validate_request
+
+    def execute(self, context: Context):
+        DataplexCatalogAspectTypeLink.persist(
+            context=context,
+            task_instance=self,
+        )
+
+        if self.validate_request:
+            self.log.info("Validating a Create Dataplex Catalog AspectType 
request.")
+        else:
+            self.log.info("Creating a Dataplex Catalog AspectType.")
+
+        try:
+            operation = self.hook.create_aspect_type(
+                aspect_type_id=self.aspect_type_id,
+                aspect_type_configuration=self.aspect_type_configuration,
+                location=self.location,
+                project_id=self.project_id,
+                validate_only=self.validate_request,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            aspect_type = self.hook.wait_for_operation(timeout=self.timeout, 
operation=operation)
+        except AlreadyExists:
+            aspect_type = self.hook.get_aspect_type(
+                aspect_type_id=self.aspect_type_id,
+                location=self.location,
+                project_id=self.project_id,
+            )
+            self.log.info(
+                "Dataplex Catalog AspectType %s already exists.",
+                self.aspect_type_id,
+            )
+            result = AspectType.to_dict(aspect_type)
+            return result
+        except Exception as ex:
+            raise AirflowException(ex)
+        else:
+            result = AspectType.to_dict(aspect_type) if not 
self.validate_request else None
+
+        if not self.validate_request:
+            self.log.info("Dataplex Catalog AspectType %s was successfully 
created.", self.aspect_type_id)
+        return result
+
+
+class DataplexCatalogGetAspectTypeOperator(DataplexCatalogBaseOperator):
+    """
+    Get an AspectType resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataplexCatalogGetAspectTypeOperator`
+
+    :param aspect_type_id: Required. AspectType identifier.
+    :param project_id: Required. The ID of the Google Cloud project where the 
service is used.
+    :param location: Required. The ID of the Google Cloud region where the 
service is used.
+    :param gcp_conn_id: Optional. The connection ID to use to connect to 
Google Cloud.
+    :param retry: Optional. A retry object used to retry requests. If `None` 
is specified, requests will not
+        be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = tuple(
+        {"aspect_type_id"} | set(DataplexCatalogBaseOperator.template_fields)
+    )
+    operator_extra_links = (DataplexCatalogAspectTypeLink(),)
+
+    def __init__(
+        self,
+        aspect_type_id: str,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.aspect_type_id = aspect_type_id
+
+    def execute(self, context: Context):
+        DataplexCatalogAspectTypeLink.persist(
+            context=context,
+            task_instance=self,
+        )
+        self.log.info(
+            "Retrieving Dataplex Catalog AspectType %s.",
+            self.aspect_type_id,
+        )
+        try:
+            aspect_type = self.hook.get_aspect_type(
+                aspect_type_id=self.aspect_type_id,
+                location=self.location,
+                project_id=self.project_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+        except NotFound:
+            self.log.info(
+                "Dataplex Catalog AspectType %s not found.",
+                self.aspect_type_id,
+            )
+            raise AirflowException(NotFound)
+        except Exception as ex:
+            raise AirflowException(ex)
+
+        return AspectType.to_dict(aspect_type)
+
+
+class DataplexCatalogListAspectTypesOperator(DataplexCatalogBaseOperator):
+    """
+    List AspectType resources.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataplexCatalogListAspectTypesOperator`
+
+    :param filter_by: Optional. Filter to apply on the list results.
+    :param order_by: Optional. Fields to order the results by.
+    :param page_size: Optional. Maximum number of AspectTypes to return on the 
page.
+    :param page_token: Optional. Token to retrieve the next page of results.
+    :param project_id: Required. The ID of the Google Cloud project where the 
service is used.
+    :param location: Required. The ID of the Google Cloud region where the 
service is used.
+    :param gcp_conn_id: Optional. The connection ID to use to connect to 
Google Cloud.
+    :param retry: Optional. A retry object used to retry requests. If `None` 
is specified, requests will not
+        be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = 
tuple(DataplexCatalogBaseOperator.template_fields)
+    operator_extra_links = (DataplexCatalogAspectTypesLink(),)
+
+    def __init__(
+        self,
+        page_size: int | None = None,
+        page_token: str | None = None,
+        filter_by: str | None = None,
+        order_by: str | None = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.page_size = page_size
+        self.page_token = page_token
+        self.filter_by = filter_by
+        self.order_by = order_by
+
+    def execute(self, context: Context):
+        DataplexCatalogAspectTypesLink.persist(
+            context=context,
+            task_instance=self,
+        )
+        self.log.info(
+            "Listing Dataplex Catalog AspectType from location %s.",
+            self.location,
+        )
+        try:
+            aspect_type_on_page = self.hook.list_aspect_types(
+                location=self.location,
+                project_id=self.project_id,
+                page_size=self.page_size,
+                page_token=self.page_token,
+                filter_by=self.filter_by,
+                order_by=self.order_by,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            self.log.info("AspectType on page: %s", aspect_type_on_page)
+            self.xcom_push(
+                context=context,
+                key="aspect_type_page",
+                
value=ListAspectTypesResponse.to_dict(aspect_type_on_page._response),
+            )
+        except Exception as ex:
+            raise AirflowException(ex)
+
+        # Constructing list to return AspectTypes in readable format
+        aspect_types_list = [
+            MessageToDict(aspect_type._pb, preserving_proto_field_name=True)
+            for aspect_type in 
next(iter(aspect_type_on_page.pages)).aspect_types
+        ]
+        return aspect_types_list
+
+
+class DataplexCatalogUpdateAspectTypeOperator(DataplexCatalogBaseOperator):
+    """
+    Update an AspectType resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataplexCatalogUpdateAspectTypeOperator`
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param location: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param update_mask: Optional. Names of fields whose values to overwrite on 
an entry group.
+        If this parameter is absent or empty, all modifiable fields are 
overwritten. If such
+        fields are non-required and omitted in the request body, their values 
are emptied.
+    :param aspect_type_id: Required. ID of the AspectType to update.
+    :param aspect_type_configuration: Required. The updated configuration body 
of the AspectType.
+        For more details please see API documentation:
+        
https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.aspectTypes#AspectType
+    :param validate_only: Optional. The service validates the request without 
performing any mutations.
+    :param retry: Optional. A retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param gcp_conn_id: Optional. The connection ID to use when fetching 
connection info.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = tuple(
+        {"aspect_type_id", "aspect_type_configuration", "update_mask"}
+        | set(DataplexCatalogBaseOperator.template_fields)
+    )
+    operator_extra_links = (DataplexCatalogAspectTypeLink(),)
+
+    def __init__(
+        self,
+        aspect_type_id: str,
+        aspect_type_configuration: dict | AspectType,
+        update_mask: list[str] | FieldMask | None = None,
+        validate_request: bool | None = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.aspect_type_id = aspect_type_id
+        self.aspect_type_configuration = aspect_type_configuration
+        self.update_mask = update_mask
+        self.validate_request = validate_request
+
+    def execute(self, context: Context):
+        DataplexCatalogAspectTypeLink.persist(
+            context=context,
+            task_instance=self,
+        )
+
+        if self.validate_request:
+            self.log.info("Validating an Update Dataplex Catalog AspectType 
request.")
+        else:
+            self.log.info(
+                "Updating Dataplex Catalog AspectType %s.",
+                self.aspect_type_id,
+            )
+        try:
+            operation = self.hook.update_aspect_type(
+                location=self.location,
+                project_id=self.project_id,
+                aspect_type_id=self.aspect_type_id,
+                aspect_type_configuration=self.aspect_type_configuration,
+                update_mask=self.update_mask,
+                validate_only=self.validate_request,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            aspect_type = self.hook.wait_for_operation(timeout=self.timeout, 
operation=operation)
+
+        except NotFound as ex:
+            self.log.info("Specified AspectType was not found.")
+            raise AirflowException(ex)
+        except Exception as exc:
+            raise AirflowException(exc)
+        else:
+            result = AspectType.to_dict(aspect_type) if not 
self.validate_request else None
+
+        if not self.validate_request:
+            self.log.info("AspectType %s was successfully updated.", 
self.aspect_type_id)
+        return result
+
+
+class DataplexCatalogDeleteAspectTypeOperator(DataplexCatalogBaseOperator):
+    """
+    Delete an AspectType resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataplexCatalogDeleteAspectTypeOperator`
+
+    :param aspect_type_id: Required. AspectType identifier.
+    :param project_id: Required. The ID of the Google Cloud project where the 
service is used.
+    :param location: Required. The ID of the Google Cloud region where the 
service is used.
+    :param gcp_conn_id: Optional. The connection ID to use to connect to 
Google Cloud.
+    :param retry: Optional. A retry object used to retry requests. If `None` 
is specified, requests will not
+        be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = tuple(
+        {"aspect_type_id"} | set(DataplexCatalogBaseOperator.template_fields)
+    )
+
+    def __init__(
+        self,
+        aspect_type_id: str,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.aspect_type_id = aspect_type_id
+
+    def execute(self, context: Context):
+        self.log.info(
+            "Deleting Dataplex Catalog AspectType %s.",
+            self.aspect_type_id,
+        )
+        try:
+            operation = self.hook.delete_aspect_type(
+                aspect_type_id=self.aspect_type_id,
+                location=self.location,
+                project_id=self.project_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            self.hook.wait_for_operation(timeout=self.timeout, 
operation=operation)
+
+        except NotFound:
+            self.log.info(
+                "Dataplex Catalog AspectType %s not found.",
+                self.aspect_type_id,
+            )
+            raise AirflowException(NotFound)
+        except Exception as ex:
+            raise AirflowException(ex)
+        return None
diff --git a/providers/src/airflow/providers/google/provider.yaml 
b/providers/src/airflow/providers/google/provider.yaml
index fb8129f183c..2dbfdeb3ed1 100644
--- a/providers/src/airflow/providers/google/provider.yaml
+++ b/providers/src/airflow/providers/google/provider.yaml
@@ -1203,6 +1203,8 @@ extra-links:
   - airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
   - airflow.providers.google.cloud.links.dataplex.DataplexTasksLink
   - airflow.providers.google.cloud.links.dataplex.DataplexLakeLink
+  - airflow.providers.google.cloud.links.dataplex.DataplexCatalogAspectTypeLink
+  - 
airflow.providers.google.cloud.links.dataplex.DataplexCatalogAspectTypesLink
   - airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryGroupLink
   - 
airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryGroupsLink
   - airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryTypeLink
diff --git a/providers/tests/google/cloud/hooks/test_dataplex.py 
b/providers/tests/google/cloud/hooks/test_dataplex.py
index 39f447e591b..cd8b3812968 100644
--- a/providers/tests/google/cloud/hooks/test_dataplex.py
+++ b/providers/tests/google/cloud/hooks/test_dataplex.py
@@ -56,6 +56,9 @@ ENTRY_GROUP_UPDATED_BODY = {"description": "Some new descr"}
 ENTRY_TYPE_ID = "entry-type-id"
 ENTRY_TYPE_BODY = {"description": "Some descr"}
 ENTRY_TYPE_UPDATED_BODY = {"description": "Some new descr"}
+ASPECT_TYPE_ID = "aspect-type-id"
+ASPECT_TYPE_BODY = {"description": "Some descr"}
+ASPECT_TYPE_UPDATED_BODY = {"description": "Some new descr"}
 UPDATE_MASK = ["description"]
 
 COMMON_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}"
@@ -67,6 +70,7 @@ ASSET_PARENT = 
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/
 DATASCAN_PARENT = f"projects/{PROJECT_ID}/locations/{REGION}"
 ENTRY_GROUP_PARENT = 
f"projects/{PROJECT_ID}/locations/{LOCATION}/entryGroup/{ENTRY_GROUP_ID}"
 ENTRY_TYPE_PARENT = 
f"projects/{PROJECT_ID}/locations/{LOCATION}/entryType/{ENTRY_TYPE_ID}"
+ASPECT_TYPE_PARENT = 
f"projects/{PROJECT_ID}/locations/{LOCATION}/aspectType/{ASPECT_TYPE_ID}"
 
 
 class TestDataplexHook:
@@ -530,3 +534,104 @@ class TestDataplexHook:
             timeout=None,
             metadata=(),
         )
+
+    @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+    def test_create_aspect_type(self, mock_client):
+        mock_common_location_path = 
mock_client.return_value.common_location_path
+        mock_common_location_path.return_value = COMMON_PARENT
+        self.hook.create_aspect_type(
+            project_id=PROJECT_ID,
+            location=LOCATION,
+            aspect_type_id=ASPECT_TYPE_ID,
+            aspect_type_configuration=ASPECT_TYPE_BODY,
+            validate_only=False,
+        )
+        mock_client.return_value.create_aspect_type.assert_called_once_with(
+            request=dict(
+                parent=COMMON_PARENT,
+                aspect_type_id=ASPECT_TYPE_ID,
+                aspect_type=ASPECT_TYPE_BODY,
+                validate_only=False,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+    def test_delete_aspect_type(self, mock_client):
+        mock_common_location_path = mock_client.return_value.aspect_type_path
+        mock_common_location_path.return_value = ASPECT_TYPE_PARENT
+        self.hook.delete_aspect_type(project_id=PROJECT_ID, location=LOCATION, 
aspect_type_id=ASPECT_TYPE_ID)
+
+        mock_client.return_value.delete_aspect_type.assert_called_once_with(
+            request=dict(
+                name=ASPECT_TYPE_PARENT,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+    def test_list_aspect_types(self, mock_client):
+        mock_common_location_path = 
mock_client.return_value.common_location_path
+        mock_common_location_path.return_value = COMMON_PARENT
+        self.hook.list_aspect_types(
+            project_id=PROJECT_ID,
+            location=LOCATION,
+            order_by="name",
+            page_size=2,
+            filter_by="'description' = 'Some descr'",
+        )
+        mock_client.return_value.list_aspect_types.assert_called_once_with(
+            request=dict(
+                parent=COMMON_PARENT,
+                page_size=2,
+                page_token=None,
+                filter="'description' = 'Some descr'",
+                order_by="name",
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+    def test_get_aspect_type(self, mock_client):
+        mock_common_location_path = mock_client.return_value.aspect_type_path
+        mock_common_location_path.return_value = ASPECT_TYPE_PARENT
+        self.hook.get_aspect_type(project_id=PROJECT_ID, location=LOCATION, 
aspect_type_id=ASPECT_TYPE_ID)
+
+        mock_client.return_value.get_aspect_type.assert_called_once_with(
+            request=dict(
+                name=ASPECT_TYPE_PARENT,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+    def test_update_aspect_type(self, mock_client):
+        mock_common_location_path = mock_client.return_value.entry_type_path
+        mock_common_location_path.return_value = ENTRY_TYPE_PARENT
+        self.hook.update_entry_type(
+            project_id=PROJECT_ID,
+            location=LOCATION,
+            entry_type_id=ENTRY_TYPE_ID,
+            entry_type_configuration=ENTRY_TYPE_UPDATED_BODY,
+            update_mask=UPDATE_MASK,
+            validate_only=False,
+        )
+
+        mock_client.return_value.update_entry_type.assert_called_once_with(
+            request=dict(
+                entry_type={**ENTRY_TYPE_UPDATED_BODY, "name": 
ENTRY_TYPE_PARENT},
+                update_mask=FieldMask(paths=UPDATE_MASK),
+                validate_only=False,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
diff --git a/providers/tests/google/cloud/links/test_dataplex.py 
b/providers/tests/google/cloud/links/test_dataplex.py
index 7b5fc9486aa..23859fbde8c 100644
--- a/providers/tests/google/cloud/links/test_dataplex.py
+++ b/providers/tests/google/cloud/links/test_dataplex.py
@@ -20,6 +20,8 @@ from __future__ import annotations
 import pytest
 
 from airflow.providers.google.cloud.links.dataplex import (
+    DataplexCatalogAspectTypeLink,
+    DataplexCatalogAspectTypesLink,
     DataplexCatalogEntryGroupLink,
     DataplexCatalogEntryGroupsLink,
     DataplexCatalogEntryTypeLink,
@@ -29,8 +31,10 @@ from airflow.providers.google.cloud.links.dataplex import (
     DataplexTasksLink,
 )
 from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCatalogCreateAspectTypeOperator,
     DataplexCatalogCreateEntryGroupOperator,
     DataplexCatalogCreateEntryTypeOperator,
+    DataplexCatalogGetAspectTypeOperator,
     DataplexCatalogGetEntryGroupOperator,
     DataplexCatalogGetEntryTypeOperator,
     DataplexCreateLakeOperator,
@@ -45,7 +49,10 @@ TEST_ENTRY_GROUP_ID_BODY = {"description": "some 
description"}
 TEST_ENTRY_GROUPS_ID = "test-entry-groups-id"
 TEST_ENTRY_TYPE_ID = "test-entry-type-id"
 TEST_ENTRY_TYPE_ID_BODY = {"description": "some description"}
-TEST_ENTRY_TYPES_ID = "test-entry-groups-id"
+TEST_ENTRY_TYPES_ID = "test-entry-types-id"
+TEST_ASPECT_TYPE_ID = "test-aspect-type-id"
+TEST_ASPECT_TYPE_ID_BODY = {"description": "some description"}
+TEST_ASPECT_TYPES_ID = "test-aspect-types-id"
 TEST_TASK_ID = "test-task-id"
 TEST_TASKS_ID = "test-tasks-id"
 TEST_LAKE_ID = "test-lake-id"
@@ -76,6 +83,13 @@ EXPECTED_DATAPLEX_TASK_LINK = (
 EXPECTED_DATAPLEX_TASKS_LINK = (
     DATAPLEX_BASE_LINK + 
f"process/tasks?project={TEST_PROJECT_ID}&qLake={TEST_LAKE_ID}.{TEST_LOCATION}"
 )
+EXPECTED_DATAPLEX_CATALOG_ASPECT_TYPE_LINK = (
+    DATAPLEX_BASE_LINK
+    + 
f"projects/{TEST_PROJECT_ID}/locations/{TEST_LOCATION}/aspectTypes/{TEST_ASPECT_TYPE_ID}?project={TEST_PROJECT_ID}"
+)
+EXPECTED_DATAPLEX_CATALOG_ASPECT_TYPES_LINK = (
+    DATAPLEX_BASE_LINK + f"catalog/aspect-types?project={TEST_PROJECT_ID}"
+)
 
 
 class TestDataplexTaskLink:
@@ -221,3 +235,44 @@ class TestDataplexCatalogEntryTypesLink:
         link.persist(context={"ti": ti}, task_instance=ti.task)
         actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
         assert actual_url == expected_url
+
+
+class TestDataplexCatalogAspectTypeLink:
+    @pytest.mark.db_test
+    def test_get_link(self, create_task_instance_of_operator, session):
+        expected_url = EXPECTED_DATAPLEX_CATALOG_ASPECT_TYPE_LINK
+        link = DataplexCatalogAspectTypeLink()
+        ti = create_task_instance_of_operator(
+            DataplexCatalogGetAspectTypeOperator,
+            dag_id="test_link_dag",
+            task_id="test_link_task",
+            location=TEST_LOCATION,
+            aspect_type_id=TEST_ASPECT_TYPE_ID,
+            project_id=TEST_PROJECT_ID,
+        )
+        session.add(ti)
+        session.commit()
+        link.persist(context={"ti": ti}, task_instance=ti.task)
+        actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
+        assert actual_url == expected_url
+
+
+class TestDataplexCatalogAspectTypesLink:
+    @pytest.mark.db_test
+    def test_get_link(self, create_task_instance_of_operator, session):
+        expected_url = EXPECTED_DATAPLEX_CATALOG_ASPECT_TYPES_LINK
+        link = DataplexCatalogAspectTypesLink()
+        ti = create_task_instance_of_operator(
+            DataplexCatalogCreateAspectTypeOperator,
+            dag_id="test_link_dag",
+            task_id="test_link_task",
+            location=TEST_LOCATION,
+            aspect_type_id=TEST_ASPECT_TYPE_ID,
+            aspect_type_configuration=TEST_ASPECT_TYPE_ID_BODY,
+            project_id=TEST_PROJECT_ID,
+        )
+        session.add(ti)
+        session.commit()
+        link.persist(context={"ti": ti}, task_instance=ti.task)
+        actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
+        assert actual_url == expected_url
diff --git a/providers/tests/google/cloud/operators/test_dataplex.py 
b/providers/tests/google/cloud/operators/test_dataplex.py
index ef5faa4637e..1f47f04fd2b 100644
--- a/providers/tests/google/cloud/operators/test_dataplex.py
+++ b/providers/tests/google/cloud/operators/test_dataplex.py
@@ -20,8 +20,14 @@ from unittest import mock
 
 import pytest
 from google.api_core.gapic_v1.method import DEFAULT
-from google.cloud.dataplex_v1.services.catalog_service.pagers import 
ListEntryGroupsPager, ListEntryTypesPager
+from google.cloud.dataplex_v1.services.catalog_service.pagers import (
+    ListAspectTypesPager,
+    ListEntryGroupsPager,
+    ListEntryTypesPager,
+)
 from google.cloud.dataplex_v1.types import (
+    ListAspectTypesRequest,
+    ListAspectTypesResponse,
     ListEntryGroupsRequest,
     ListEntryGroupsResponse,
     ListEntryTypesRequest,
@@ -30,12 +36,16 @@ from google.cloud.dataplex_v1.types import (
 
 from airflow.exceptions import TaskDeferred
 from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCatalogCreateAspectTypeOperator,
     DataplexCatalogCreateEntryGroupOperator,
     DataplexCatalogCreateEntryTypeOperator,
+    DataplexCatalogDeleteAspectTypeOperator,
     DataplexCatalogDeleteEntryGroupOperator,
     DataplexCatalogDeleteEntryTypeOperator,
+    DataplexCatalogGetAspectTypeOperator,
     DataplexCatalogGetEntryGroupOperator,
     DataplexCatalogGetEntryTypeOperator,
+    DataplexCatalogListAspectTypesOperator,
     DataplexCatalogListEntryGroupsOperator,
     DataplexCatalogListEntryTypesOperator,
     DataplexCreateAssetOperator,
@@ -68,6 +78,7 @@ ZONE_STR = 
"airflow.providers.google.cloud.operators.dataplex.Zone"
 ASSET_STR = "airflow.providers.google.cloud.operators.dataplex.Asset"
 ENTRY_GROUP_STR = 
"airflow.providers.google.cloud.operators.dataplex.EntryGroup"
 ENTRY_TYPE_STR = "airflow.providers.google.cloud.operators.dataplex.EntryType"
+ASPECT_TYPE_STR = 
"airflow.providers.google.cloud.operators.dataplex.AspectType"
 
 PROJECT_ID = "project-id"
 REGION = "region"
@@ -91,6 +102,7 @@ ZONE_ID = "test_zone_id"
 JOB_ID = "test_job_id"
 ENTRY_GROUP_NAME = "test_entry_group"
 ENTRY_TYPE_NAME = "test_entry_type"
+ASPECT_TYPE_NAME = "test_aspect_type"
 
 
 class TestDataplexCreateTaskOperator:
@@ -1023,3 +1035,138 @@ class TestDataplexCatalogListEntryTypesOperator:
             timeout=None,
             metadata=(),
         )
+
+
+class TestDataplexCatalogCreateAspectTypeOperator:
+    @mock.patch(ASPECT_TYPE_STR)
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock, aspect_type_mock):
+        op = DataplexCatalogCreateAspectTypeOperator(
+            task_id="create_task",
+            project_id=PROJECT_ID,
+            location=REGION,
+            aspect_type_id=ASPECT_TYPE_NAME,
+            aspect_type_configuration=BODY,
+            validate_request=None,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        aspect_type_mock.return_value.to_dict.return_value = None
+        hook_mock.return_value.wait_for_operation.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.create_aspect_type.assert_called_once_with(
+            aspect_type_id=ASPECT_TYPE_NAME,
+            aspect_type_configuration=BODY,
+            location=REGION,
+            project_id=PROJECT_ID,
+            validate_only=None,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexCatalogGetAspectTypeOperator:
+    @mock.patch(ASPECT_TYPE_STR)
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock, aspect_type_mock):
+        op = DataplexCatalogGetAspectTypeOperator(
+            project_id=PROJECT_ID,
+            location=REGION,
+            aspect_type_id=ASPECT_TYPE_NAME,
+            task_id="get_task",
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context=mock.MagicMock())
+        aspect_type_mock.return_value.to_dict.return_value = None
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.get_aspect_type.assert_called_once_with(
+            project_id=PROJECT_ID,
+            location=REGION,
+            aspect_type_id=ASPECT_TYPE_NAME,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexCatalogDeleteAspectTypeOperator:
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock):
+        op = DataplexCatalogDeleteAspectTypeOperator(
+            project_id=PROJECT_ID,
+            location=REGION,
+            aspect_type_id=ASPECT_TYPE_NAME,
+            task_id="delete_task",
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.wait_for_operation.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.delete_aspect_type.assert_called_once_with(
+            project_id=PROJECT_ID,
+            location=REGION,
+            aspect_type_id=ASPECT_TYPE_NAME,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexCatalogListAspectTypesOperator:
+    @mock.patch(ASPECT_TYPE_STR)
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock, aspect_type_mock):
+        op = DataplexCatalogListAspectTypesOperator(
+            project_id=PROJECT_ID,
+            location=REGION,
+            task_id="list_task",
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.list_aspect_types.return_value = 
ListAspectTypesPager(
+            response=(
+                ListAspectTypesResponse(
+                    aspect_types=[
+                        {
+                            "name": "aaa",
+                            "description": "Test Aspect Type 1",
+                            "display_name": "Aspect Type One",
+                        }
+                    ]
+                )
+            ),
+            method=mock.MagicMock(),
+            request=ListAspectTypesRequest(parent=""),
+        )
+
+        aspect_type_mock.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        hook_mock.return_value.list_aspect_types.assert_called_once_with(
+            project_id=PROJECT_ID,
+            location=REGION,
+            page_size=None,
+            page_token=None,
+            filter_by=None,
+            order_by=None,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
diff --git 
a/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py 
b/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
index 6f511d0f8e9..ca585df5b20 100644
--- a/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+++ b/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
@@ -25,14 +25,19 @@ import os
 
 from airflow.models.dag import DAG
 from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCatalogCreateAspectTypeOperator,
     DataplexCatalogCreateEntryGroupOperator,
     DataplexCatalogCreateEntryTypeOperator,
+    DataplexCatalogDeleteAspectTypeOperator,
     DataplexCatalogDeleteEntryGroupOperator,
     DataplexCatalogDeleteEntryTypeOperator,
+    DataplexCatalogGetAspectTypeOperator,
     DataplexCatalogGetEntryGroupOperator,
     DataplexCatalogGetEntryTypeOperator,
+    DataplexCatalogListAspectTypesOperator,
     DataplexCatalogListEntryGroupsOperator,
     DataplexCatalogListEntryTypesOperator,
+    DataplexCatalogUpdateAspectTypeOperator,
     DataplexCatalogUpdateEntryGroupOperator,
     DataplexCatalogUpdateEntryTypeOperator,
 )
@@ -56,6 +61,22 @@ ENTRY_TYPE_NAME = 
f"{DAG_ID}_entry_type_{ENV_ID}".replace("_", "-")
 ENTRY_TYPE_BODY = {"display_name": "Display Name", "description": "Some 
description"}
 # [END howto_dataplex_entry_type_configuration]
 
+ASPECT_TYPE_NAME = f"{DAG_ID}_aspect_type_{ENV_ID}".replace("_", "-")
+# [START howto_dataplex_aspect_type_configuration]
+ASPECT_TYPE_BODY = {
+    "display_name": "Sample AspectType",
+    "description": "A simple AspectType for demonstration purposes.",
+    "metadata_template": {
+        "name": "sample_field",
+        "type": "record",
+        "annotations": {
+            "display_name": "Sample Field",
+            "description": "A sample field within the AspectType.",
+        },
+    },
+}
+# [END howto_dataplex_aspect_type_configuration]
+
 with DAG(
     DAG_ID,
     start_date=datetime.datetime(2021, 1, 1),
@@ -84,6 +105,17 @@ with DAG(
     )
     # [END howto_operator_dataplex_catalog_create_entry_type]
 
+    # [START howto_operator_dataplex_catalog_create_aspect_type]
+    create_aspect_type = DataplexCatalogCreateAspectTypeOperator(
+        task_id="create_aspect_type",
+        project_id=PROJECT_ID,
+        location=GCP_LOCATION,
+        aspect_type_id=ASPECT_TYPE_NAME,
+        aspect_type_configuration=ASPECT_TYPE_BODY,
+        validate_request=False,
+    )
+    # [END howto_operator_dataplex_catalog_create_aspect_type]
+
     # [START howto_operator_dataplex_catalog_get_entry_group]
     get_entry_group = DataplexCatalogGetEntryGroupOperator(
         task_id="get_entry_group",
@@ -102,6 +134,15 @@ with DAG(
     )
     # [END howto_operator_dataplex_catalog_get_entry_type]
 
+    # [START howto_operator_dataplex_catalog_get_aspect_type]
+    get_aspect_type = DataplexCatalogGetAspectTypeOperator(
+        task_id="get_aspect_type",
+        project_id=PROJECT_ID,
+        location=GCP_LOCATION,
+        aspect_type_id=ASPECT_TYPE_NAME,
+    )
+    # [END howto_operator_dataplex_catalog_get_aspect_type]
+
     # [START howto_operator_dataplex_catalog_list_entry_groups]
     list_entry_group = DataplexCatalogListEntryGroupsOperator(
         task_id="list_entry_group",
@@ -122,6 +163,16 @@ with DAG(
     )
     # [END howto_operator_dataplex_catalog_list_entry_types]
 
+    # [START howto_operator_dataplex_catalog_list_aspect_types]
+    list_aspect_type = DataplexCatalogListAspectTypesOperator(
+        task_id="list_aspect_type",
+        project_id=PROJECT_ID,
+        location=GCP_LOCATION,
+        order_by="name",
+        filter_by='display_name = "Display Name"',
+    )
+    # [END howto_operator_dataplex_catalog_list_aspect_types]
+
     # [START howto_operator_dataplex_catalog_update_entry_group]
     update_entry_group = DataplexCatalogUpdateEntryGroupOperator(
         task_id="update_entry_group",
@@ -144,6 +195,17 @@ with DAG(
     )
     # [END howto_operator_dataplex_catalog_update_entry_type]
 
+    # [START howto_operator_dataplex_catalog_update_aspect_type]
+    update_aspect_type = DataplexCatalogUpdateAspectTypeOperator(
+        task_id="update_aspect_type",
+        project_id=PROJECT_ID,
+        location=GCP_LOCATION,
+        aspect_type_id=ASPECT_TYPE_NAME,
+        aspect_type_configuration={"display_name": "Updated Display Name"},
+        update_mask=["display_name"],
+    )
+    # [END howto_operator_dataplex_catalog_update_aspect_type]
+
     # [START howto_operator_dataplex_catalog_delete_entry_group]
     delete_entry_group = DataplexCatalogDeleteEntryGroupOperator(
         task_id="delete_entry_group",
@@ -164,12 +226,24 @@ with DAG(
     )
     # [END howto_operator_dataplex_catalog_delete_entry_type]
 
+    # [START howto_operator_dataplex_catalog_delete_aspect_type]
+    delete_aspect_type = DataplexCatalogDeleteAspectTypeOperator(
+        task_id="delete_aspect_type",
+        project_id=PROJECT_ID,
+        location=GCP_LOCATION,
+        aspect_type_id=ASPECT_TYPE_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END howto_operator_dataplex_catalog_delete_aspect_type]
+
     (
         [
             create_entry_group >> get_entry_group >> update_entry_group >> 
delete_entry_group,
             list_entry_group,
             create_entry_type >> get_entry_type >> update_entry_type >> 
delete_entry_type,
             list_entry_type,
+            create_aspect_type >> get_aspect_type >> update_aspect_type >> 
delete_aspect_type,
+            list_aspect_type,
         ]
     )
 

Reply via email to