This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b50a065ae6 Add Dataplex Catalog Entry Group operators (#45751)
3b50a065ae6 is described below

commit 3b50a065ae6a06e72b031411ca8353b58926087d
Author: VladaZakharova <[email protected]>
AuthorDate: Sat Jan 18 11:24:58 2025 +0100

    Add Dataplex Catalog Entry Group operators (#45751)
    
    Co-authored-by: Ulada Zakharava <[email protected]>
---
 .../operators/cloud/dataplex.rst                   |  94 ++++
 docs/spelling_wordlist.txt                         |   2 +
 generated/provider_dependencies.json               |   2 +-
 .../providers/google/cloud/hooks/dataplex.py       | 212 ++++++++-
 .../providers/google/cloud/links/dataplex.py       |  51 ++-
 .../providers/google/cloud/operators/dataplex.py   | 490 ++++++++++++++++++++-
 .../src/airflow/providers/google/provider.yaml     |   4 +-
 .../tests/google/cloud/hooks/test_dataplex.py      | 114 +++++
 .../tests/google/cloud/links/test_dataplex.py      | 168 +++++++
 .../tests/google/cloud/operators/test_dataplex.py  | 143 ++++++
 .../cloud/dataplex/example_dataplex_catalog.py     | 118 +++++
 tests/always/test_project_structure.py             |   2 +-
 12 files changed, 1392 insertions(+), 8 deletions(-)

diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst 
b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
index cbeb5eafcd0..1846f1ad41b 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
@@ -417,3 +417,97 @@ To get a Data Profile scan job you can use:
     :dedent: 4
     :start-after: [START howto_dataplex_get_data_profile_job_operator]
     :end-before: [END howto_dataplex_get_data_profile_job_operator]
+
+
+Google Dataplex Catalog Operators
+=================================
+
+Dataplex Catalog provides a unified inventory of Google Cloud resources, such 
as BigQuery, and other resources,
+such as on-premises resources. Dataplex Catalog automatically retrieves 
metadata for Google Cloud resources,
+and you bring metadata for third-party resources into Dataplex Catalog.
+
+For more information about Dataplex Catalog visit `Dataplex Catalog production 
documentation <Product documentation 
<https://cloud.google.com/dataplex/docs/catalog-overview>`__
+
+.. _howto/operator:DataplexCatalogCreateEntryGroupOperator:
+
+Create an EntryGroup
+--------------------
+
+To create an Entry Group in specific location in Dataplex Catalog you can
+use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryGroupOperator`
+For more information about the available fields to pass when creating an Entry 
Group, visit `Entry Group resource configuration. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryGroups#EntryGroup>`__
+
+A simple Entry Group configuration can look as followed:
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_dataplex_entry_group_configuration]
+    :end-before: [END howto_dataplex_entry_group_configuration]
+
+With this configuration you can create an Entry Group resource:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryGroupOperator`
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dataplex_catalog_create_entry_group]
+    :end-before: [END howto_operator_dataplex_catalog_create_entry_group]
+
+.. _howto/operator:DataplexCatalogDeleteEntryGroupOperator:
+
+Delete an EntryGroup
+--------------------
+
+To delete an Entry Group in specific location in Dataplex Catalog you can
+use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteEntryGroupOperator`
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dataplex_catalog_delete_entry_group]
+    :end-before: [END howto_operator_dataplex_catalog_delete_entry_group]
+
+.. _howto/operator:DataplexCatalogListEntryGroupsOperator:
+
+List EntryGroups
+----------------
+
+To list all Entry Groups in specific location in Dataplex Catalog you can
+use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListEntryGroupsOperator`.
+This operator also supports filtering and ordering the result of the operation.
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dataplex_catalog_list_entry_groups]
+    :end-before: [END howto_operator_dataplex_catalog_list_entry_groups]
+
+.. _howto/operator:DataplexCatalogGetEntryGroupOperator:
+
+Get an EntryGroup
+-----------------
+
+To retrieve an Entry Group in specific location in Dataplex Catalog you can
+use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetEntryGroupOperator`
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dataplex_catalog_get_entry_group]
+    :end-before: [END howto_operator_dataplex_catalog_get_entry_group]
+
+.. _howto/operator:DataplexCatalogUpdateEntryGroupOperator:
+
+Update an EntryGroup
+--------------------
+
+To update an Entry Group in specific location in Dataplex Catalog you can
+use 
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateEntryGroupOperator`
+
+.. exampleinclude:: 
/../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dataplex_catalog_update_entry_group]
+    :end-before: [END howto_operator_dataplex_catalog_update_entry_group]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index d1a1e62d521..f0f7d90518f 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -574,6 +574,8 @@ encodable
 encryptor
 enqueue
 enqueued
+EntryGroup
+EntryGroups
 entrypoint
 entrypoints
 Enum
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 2724c6a73d4..8797be6c641 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -656,7 +656,7 @@
       "google-cloud-datacatalog>=3.23.0",
       "google-cloud-dataflow-client>=0.8.6",
       "google-cloud-dataform>=0.5.0",
-      "google-cloud-dataplex>=1.10.0",
+      "google-cloud-dataplex>=2.6.0",
       "google-cloud-dataproc-metastore>=1.12.0",
       "google-cloud-dataproc>=5.12.0",
       "google-cloud-dlp>=3.12.0",
diff --git a/providers/src/airflow/providers/google/cloud/hooks/dataplex.py 
b/providers/src/airflow/providers/google/cloud/hooks/dataplex.py
index 387dfb00a50..cb2c7e41a20 100644
--- a/providers/src/airflow/providers/google/cloud/hooks/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/hooks/dataplex.py
@@ -20,15 +20,22 @@ from __future__ import annotations
 
 import time
 from collections.abc import Sequence
+from copy import deepcopy
 from typing import TYPE_CHECKING, Any
 
 from google.api_core.client_options import ClientOptions
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
-from google.cloud.dataplex_v1 import DataplexServiceClient, 
DataScanServiceAsyncClient, DataScanServiceClient
+from google.cloud.dataplex_v1 import (
+    DataplexServiceClient,
+    DataScanServiceAsyncClient,
+    DataScanServiceClient,
+)
+from google.cloud.dataplex_v1.services.catalog_service import 
CatalogServiceClient
 from google.cloud.dataplex_v1.types import (
     Asset,
     DataScan,
     DataScanJob,
+    EntryGroup,
     Lake,
     Task,
     Zone,
@@ -47,6 +54,7 @@ if TYPE_CHECKING:
     from google.api_core.operation import Operation
     from google.api_core.retry import Retry
     from google.api_core.retry_async import AsyncRetry
+    from google.cloud.dataplex_v1.services.catalog_service.pagers import 
ListEntryGroupsPager
     from googleapiclient.discovery import Resource
 
 PATH_DATA_SCAN = 
"projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
@@ -110,6 +118,14 @@ class DataplexHook(GoogleBaseHook):
             credentials=self.get_credentials(), client_info=CLIENT_INFO, 
client_options=client_options
         )
 
+    def get_dataplex_catalog_client(self) -> CatalogServiceClient:
+        """Return CatalogServiceClient."""
+        client_options = 
ClientOptions(api_endpoint="dataplex.googleapis.com:443")
+
+        return CatalogServiceClient(
+            credentials=self.get_credentials(), client_info=CLIENT_INFO, 
client_options=client_options
+        )
+
     def wait_for_operation(self, timeout: float | None, operation: Operation):
         """Wait for long-lasting operation to complete."""
         try:
@@ -118,6 +134,200 @@ class DataplexHook(GoogleBaseHook):
             error = operation.exception(timeout=timeout)
             raise AirflowException(error)
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_entry_group(
+        self,
+        location: str,
+        entry_group_id: str,
+        entry_group_configuration: EntryGroup | dict,
+        project_id: str = PROVIDE_PROJECT_ID,
+        validate_only: bool = False,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Operation:
+        """
+        Create an Entry resource.
+
+        :param location: Required. The ID of the Google Cloud location that 
the task belongs to.
+        :param entry_group_id: Required. EntryGroup identifier.
+        :param entry_group_configuration: Required. EntryGroup configuration 
body.
+        :param project_id: Optional. The ID of the Google Cloud project that 
the task belongs to.
+        :param validate_only: Optional. If set, performs request validation, 
but does not actually execute
+            the create request.
+        :param retry: Optional. A retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Optional. Additional metadata that is provided to the 
method.
+        """
+        client = self.get_dataplex_catalog_client()
+        return client.create_entry_group(
+            request={
+                "parent": client.common_location_path(project_id, location),
+                "entry_group_id": entry_group_id,
+                "entry_group": entry_group_configuration,
+                "validate_only": validate_only,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_entry_group(
+        self,
+        location: str,
+        entry_group_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> EntryGroup:
+        """
+        Get an EntryGroup resource.
+
+        :param location: Required. The ID of the Google Cloud location that 
the task belongs to.
+        :param entry_group_id: Required. EntryGroup identifier.
+        :param project_id: Optional. The ID of the Google Cloud project that 
the task belongs to.
+        :param retry: Optional. A retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Optional. Additional metadata that is provided to the 
method.
+        """
+        client = self.get_dataplex_catalog_client()
+        return client.get_entry_group(
+            request={
+                "name": client.entry_group_path(project_id, location, 
entry_group_id),
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_entry_group(
+        self,
+        location: str,
+        entry_group_id: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Operation:
+        """
+        Delete an EntryGroup resource.
+
+        :param location: Required. The ID of the Google Cloud location that 
the task belongs to.
+        :param entry_group_id: Required. EntryGroup identifier.
+        :param project_id: Optional. The ID of the Google Cloud project that 
the task belongs to.
+        :param retry: Optional. A retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Optional. Additional metadata that is provided to the 
method.
+        """
+        client = self.get_dataplex_catalog_client()
+        return client.delete_entry_group(
+            request={
+                "name": client.entry_group_path(project_id, location, 
entry_group_id),
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_entry_groups(
+        self,
+        location: str,
+        filter_by: str | None = None,
+        order_by: str | None = None,
+        page_size: int | None = None,
+        page_token: str | None = None,
+        project_id: str = PROVIDE_PROJECT_ID,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> ListEntryGroupsPager:
+        """
+        List EntryGroups resources from specific location.
+
+        :param location: Required. The ID of the Google Cloud location that 
the task belongs to.
+        :param filter_by: Optional. Filter to apply on the list results.
+        :param order_by: Optional. Fields to order the results by.
+        :param page_size: Optional. Maximum number of EntryGroups to return on 
one page.
+        :param page_token: Optional. Token to retrieve the next page of 
results.
+        :param project_id: Optional. The ID of the Google Cloud project that 
the task belongs to.
+        :param retry: Optional. A retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Optional. Additional metadata that is provided to the 
method.
+        """
+        client = self.get_dataplex_catalog_client()
+        return client.list_entry_groups(
+            request={
+                "parent": client.common_location_path(project_id, location),
+                "filter": filter_by,
+                "order_by": order_by,
+                "page_size": page_size,
+                "page_token": page_token,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_entry_group(
+        self,
+        location: str,
+        entry_group_id: str,
+        entry_group_configuration: dict | EntryGroup,
+        project_id: str = PROVIDE_PROJECT_ID,
+        update_mask: list[str] | FieldMask | None = None,
+        validate_only: bool | None = False,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Operation:
+        """
+        Update an EntryGroup resource.
+
+        :param entry_group_id: Required. ID of the EntryGroup to update.
+        :param entry_group_configuration: Required. The updated configuration 
body of the EntryGroup.
+        :param location: Required. The ID of the Google Cloud location that 
the task belongs to.
+        :param update_mask: Optional. Names of fields whose values to 
overwrite on an entry group.
+            If this parameter is absent or empty, all modifiable fields are 
overwritten. If such
+            fields are non-required and omitted in the request body, their 
values are emptied.
+        :param project_id: Optional. The ID of the Google Cloud project that 
the task belongs to.
+        :param validate_only: Optional. The service validates the request 
without performing any mutations.
+        :param retry: Optional. A retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The amount of time, in seconds, to wait for 
the request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Optional. Additional metadata that is provided to the 
method.
+        """
+        client = self.get_dataplex_catalog_client()
+        _entry_group = (
+            deepcopy(entry_group_configuration)
+            if isinstance(entry_group_configuration, dict)
+            else EntryGroup.to_dict(entry_group_configuration)
+        )
+        _entry_group["name"] = client.entry_group_path(project_id, location, 
entry_group_id)
+        return client.update_entry_group(
+            request={
+                "entry_group": _entry_group,
+                "update_mask": FieldMask(paths=update_mask) if 
type(update_mask) is list else update_mask,
+                "validate_only": validate_only,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
     @GoogleBaseHook.fallback_to_default_project_id
     def create_task(
         self,
diff --git a/providers/src/airflow/providers/google/cloud/links/dataplex.py 
b/providers/src/airflow/providers/google/cloud/links/dataplex.py
index 80d4b2cb9c0..e0bd1ae5844 100644
--- a/providers/src/airflow/providers/google/cloud/links/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/links/dataplex.py
@@ -30,8 +30,10 @@ DATAPLEX_BASE_LINK = "/dataplex/process/tasks"
 DATAPLEX_TASK_LINK = DATAPLEX_BASE_LINK + 
"/{lake_id}.{task_id};location={region}/jobs?project={project_id}"
 DATAPLEX_TASKS_LINK = DATAPLEX_BASE_LINK + 
"?project={project_id}&qLake={lake_id}.{region}"
 
-DATAPLEX_LAKE_LINK = (
-    
"https://console.cloud.google.com/dataplex/lakes/{lake_id};location={region}?project={project_id}";
+DATAPLEX_LAKE_LINK = 
"/dataplex/lakes/{lake_id};location={region}?project={project_id}"
+DATAPLEX_CATALOG_ENTRY_GROUPS_LINK = 
"/dataplex/catalog/entry-groups?project={project_id}"
+DATAPLEX_CATALOG_ENTRY_GROUP_LINK = (
+    
"/dataplex/projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}?project={project_id}"
 )
 
 
@@ -103,3 +105,48 @@ class DataplexLakeLink(BaseGoogleLink):
                 "project_id": task_instance.project_id,
             },
         )
+
+
+class DataplexCatalogEntryGroupLink(BaseGoogleLink):
+    """Helper class for constructing Dataplex Catalog EntryGroup link."""
+
+    name = "Dataplex Catalog EntryGroup"
+    key = "dataplex_catalog_entry_group_key"
+    format_str = DATAPLEX_CATALOG_ENTRY_GROUP_LINK
+
+    @staticmethod
+    def persist(
+        context: Context,
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=DataplexCatalogEntryGroupLink.key,
+            value={
+                "entry_group_id": task_instance.entry_group_id,
+                "location": task_instance.location,
+                "project_id": task_instance.project_id,
+            },
+        )
+
+
+class DataplexCatalogEntryGroupsLink(BaseGoogleLink):
+    """Helper class for constructing Dataplex Catalog EntryGroups link."""
+
+    name = "Dataplex Catalog EntryGroups"
+    key = "dataplex_catalog_entry_groups_key"
+    format_str = DATAPLEX_CATALOG_ENTRY_GROUPS_LINK
+
+    @staticmethod
+    def persist(
+        context: Context,
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=DataplexCatalogEntryGroupsLink.key,
+            value={
+                "location": task_instance.location,
+                "project_id": task_instance.project_id,
+            },
+        )
diff --git a/providers/src/airflow/providers/google/cloud/operators/dataplex.py 
b/providers/src/airflow/providers/google/cloud/operators/dataplex.py
index 8f7a0d694b9..33874063955 100644
--- a/providers/src/airflow/providers/google/cloud/operators/dataplex.py
+++ b/providers/src/airflow/providers/google/cloud/operators/dataplex.py
@@ -20,8 +20,11 @@ from __future__ import annotations
 
 import time
 from collections.abc import Sequence
+from functools import cached_property
 from typing import TYPE_CHECKING, Any
 
+from google.protobuf.json_format import MessageToDict
+
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.triggers.dataplex import (
     DataplexDataProfileJobTrigger,
@@ -33,15 +36,26 @@ if TYPE_CHECKING:
 
     from airflow.utils.context import Context
 
-from google.api_core.exceptions import AlreadyExists, GoogleAPICallError
+from google.api_core.exceptions import AlreadyExists, GoogleAPICallError, 
NotFound
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.retry import Retry, exponential_sleep_generator
-from google.cloud.dataplex_v1.types import Asset, DataScan, DataScanJob, Lake, 
Task, Zone
+from google.cloud.dataplex_v1.types import (
+    Asset,
+    DataScan,
+    DataScanJob,
+    EntryGroup,
+    Lake,
+    ListEntryGroupsResponse,
+    Task,
+    Zone,
+)
 from googleapiclient.errors import HttpError
 
 from airflow.configuration import conf
 from airflow.providers.google.cloud.hooks.dataplex import 
AirflowDataQualityScanException, DataplexHook
 from airflow.providers.google.cloud.links.dataplex import (
+    DataplexCatalogEntryGroupLink,
+    DataplexCatalogEntryGroupsLink,
     DataplexLakeLink,
     DataplexTaskLink,
     DataplexTasksLink,
@@ -2093,3 +2107,475 @@ class 
DataplexDeleteAssetOperator(GoogleCloudBaseOperator):
         )
         hook.wait_for_operation(timeout=self.timeout, operation=operation)
         self.log.info("Dataplex asset %s deleted successfully!", self.asset_id)
+
+
+class DataplexCatalogBaseOperator(GoogleCloudBaseOperator):
+    """
+    Base class for all Dataplex Catalog operators.
+
+    :param project_id: Required. The ID of the Google Cloud project where the 
service is used.
+    :param location: Required. The ID of the Google Cloud region where the 
service is used.
+    :param gcp_conn_id: Optional. The connection ID to use to connect to 
Google Cloud.
+    :param retry: Optional. A retry object used to retry requests. If `None` 
is specified, requests will not
+        be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "project_id",
+        "location",
+        "gcp_conn_id",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        project_id: str,
+        location: str,
+        gcp_conn_id: str = "google_cloud_default",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        impersonation_chain: str | Sequence[str] | None = None,
+        *args,
+        **kwargs,
+    ):
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.location = location
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+
+    @cached_property
+    def hook(self) -> DataplexHook:
+        return DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+
+class DataplexCatalogCreateEntryGroupOperator(DataplexCatalogBaseOperator):
+    """
+    Create an EntryGroup resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataplexCatalogCreateEntryGroupOperator`
+
+    :param entry_group_id: Required. EntryGroup identifier.
+    :param entry_group_configuration: Required. EntryGroup configuration.
+        For more details please see API documentation:
+        
https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryGroups#EntryGroup
+    :param validate_request: Optional. If set, performs request validation, 
but does not actually
+        execute the request.
+    :param project_id: Required. The ID of the Google Cloud project where the 
service is used.
+    :param location: Required. The ID of the Google Cloud region where the 
service is used.
+    :param gcp_conn_id: Optional. The connection ID to use to connect to 
Google Cloud.
+    :param retry: Optional. A retry object used to retry requests. If `None` 
is specified, requests will not
+        be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = tuple(
+        {"entry_group_id", "entry_group_configuration"} | 
set(DataplexCatalogBaseOperator.template_fields)
+    )
+    operator_extra_links = (DataplexCatalogEntryGroupLink(),)
+
+    def __init__(
+        self,
+        entry_group_id: str,
+        entry_group_configuration: EntryGroup | dict,
+        validate_request: bool = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.entry_group_id = entry_group_id
+        self.entry_group_configuration = entry_group_configuration
+        self.validate_request = validate_request
+
+    def execute(self, context: Context):
+        DataplexCatalogEntryGroupLink.persist(
+            context=context,
+            task_instance=self,
+        )
+
+        if self.validate_request:
+            self.log.info("Validating a Create Dataplex Catalog EntryGroup 
request.")
+        else:
+            self.log.info("Creating a Dataplex Catalog EntryGroup.")
+
+        try:
+            operation = self.hook.create_entry_group(
+                entry_group_id=self.entry_group_id,
+                entry_group_configuration=self.entry_group_configuration,
+                location=self.location,
+                project_id=self.project_id,
+                validate_only=self.validate_request,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            entry_group = self.hook.wait_for_operation(timeout=self.timeout, 
operation=operation)
+        except AlreadyExists:
+            entry_group = self.hook.get_entry_group(
+                entry_group_id=self.entry_group_id,
+                location=self.location,
+                project_id=self.project_id,
+            )
+            self.log.info(
+                "Dataplex Catalog EntryGroup %s already exists.",
+                self.entry_group_id,
+            )
+            result = EntryGroup.to_dict(entry_group)
+            return result
+        except Exception as ex:
+            raise AirflowException(ex)
+        else:
+            result = EntryGroup.to_dict(entry_group) if not 
self.validate_request else None
+
+        if not self.validate_request:
+            self.log.info("Dataplex Catalog EntryGroup %s was successfully 
created.", self.entry_group_id)
+        return result
+
+
+class DataplexCatalogGetEntryGroupOperator(DataplexCatalogBaseOperator):
+    """
+    Get an EntryGroup resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataplexCatalogGetEntryGroupOperator`
+
+    :param entry_group_id: Required. EntryGroup identifier.
+    :param project_id: Required. The ID of the Google Cloud project where the 
service is used.
+    :param location: Required. The ID of the Google Cloud region where the 
service is used.
+    :param gcp_conn_id: Optional. The connection ID to use to connect to 
Google Cloud.
+    :param retry: Optional. A retry object used to retry requests. If `None` 
is specified, requests will not
+        be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = tuple(
+        {"entry_group_id"} | set(DataplexCatalogBaseOperator.template_fields)
+    )
+    operator_extra_links = (DataplexCatalogEntryGroupLink(),)
+
+    def __init__(
+        self,
+        entry_group_id: str,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.entry_group_id = entry_group_id
+
+    def execute(self, context: Context):
+        DataplexCatalogEntryGroupLink.persist(
+            context=context,
+            task_instance=self,
+        )
+        self.log.info(
+            "Retrieving Dataplex Catalog EntryGroup %s.",
+            self.entry_group_id,
+        )
+        try:
+            entry_group = self.hook.get_entry_group(
+                entry_group_id=self.entry_group_id,
+                location=self.location,
+                project_id=self.project_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+        except NotFound:
+            self.log.info(
+                "Dataplex Catalog EntryGroup %s not found.",
+                self.entry_group_id,
+            )
+            raise AirflowException(NotFound)
+        except Exception as ex:
+            raise AirflowException(ex)
+
+        return EntryGroup.to_dict(entry_group)
+
+
+class DataplexCatalogDeleteEntryGroupOperator(DataplexCatalogBaseOperator):
+    """
+    Delete an EntryGroup resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataplexCatalogDeleteEntryGroupOperator`
+
+    :param entry_group_id: Required. EntryGroup identifier.
+    :param project_id: Required. The ID of the Google Cloud project where the 
service is used.
+    :param location: Required. The ID of the Google Cloud region where the 
service is used.
+    :param gcp_conn_id: Optional. The connection ID to use to connect to 
Google Cloud.
+    :param retry: Optional. A retry object used to retry requests. If `None` 
is specified, requests will not
+        be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = tuple(
+        {"entry_group_id"} | set(DataplexCatalogBaseOperator.template_fields)
+    )
+
+    def __init__(
+        self,
+        entry_group_id: str,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.entry_group_id = entry_group_id
+
+    def execute(self, context: Context):
+        self.log.info(
+            "Deleting Dataplex Catalog EntryGroup %s.",
+            self.entry_group_id,
+        )
+        try:
+            operation = self.hook.delete_entry_group(
+                entry_group_id=self.entry_group_id,
+                location=self.location,
+                project_id=self.project_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            self.hook.wait_for_operation(timeout=self.timeout, 
operation=operation)
+
+        except NotFound:
+            self.log.info(
+                "Dataplex Catalog EntryGroup %s not found.",
+                self.entry_group_id,
+            )
+            raise AirflowException(NotFound)
+        except Exception as ex:
+            raise AirflowException(ex)
+        return None
+
+
+class DataplexCatalogListEntryGroupsOperator(DataplexCatalogBaseOperator):
+    """
+    List EntryGroup resources.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataplexCatalogListEntryGroupsOperator`
+
+    :param filter_by: Optional. Filter to apply on the list results.
+    :param order_by: Optional. Fields to order the results by.
+    :param page_size: Optional. Maximum number of EntryGroups to return on the 
page.
+    :param page_token: Optional. Token to retrieve the next page of results.
+    :param project_id: Required. The ID of the Google Cloud project where the 
service is used.
+    :param location: Required. The ID of the Google Cloud region where the 
service is used.
+    :param gcp_conn_id: Optional. The connection ID to use to connect to 
Google Cloud.
+    :param retry: Optional. A retry object used to retry requests. If `None` 
is specified, requests will not
+        be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = 
tuple(DataplexCatalogBaseOperator.template_fields)
+    operator_extra_links = (DataplexCatalogEntryGroupsLink(),)
+
+    def __init__(
+        self,
+        page_size: int | None = None,
+        page_token: str | None = None,
+        filter_by: str | None = None,
+        order_by: str | None = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.page_size = page_size
+        self.page_token = page_token
+        self.filter_by = filter_by
+        self.order_by = order_by
+
+    def execute(self, context: Context):
+        DataplexCatalogEntryGroupsLink.persist(
+            context=context,
+            task_instance=self,
+        )
+        self.log.info(
+            "Listing Dataplex Catalog EntryGroup from location %s.",
+            self.location,
+        )
+        try:
+            entry_group_on_page = self.hook.list_entry_groups(
+                location=self.location,
+                project_id=self.project_id,
+                page_size=self.page_size,
+                page_token=self.page_token,
+                filter_by=self.filter_by,
+                order_by=self.order_by,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            self.log.info("EntryGroup on page: %s", entry_group_on_page)
+            self.xcom_push(
+                context=context,
+                key="entry_group_page",
+                
value=ListEntryGroupsResponse.to_dict(entry_group_on_page._response),
+            )
+        except Exception as ex:
+            raise AirflowException(ex)
+
+        # Constructing list to return EntryGroups in readable format
+        entry_groups_list = [
+            MessageToDict(entry_group._pb, preserving_proto_field_name=True)
+            for entry_group in 
next(iter(entry_group_on_page.pages)).entry_groups
+        ]
+        return entry_groups_list
+
+
+class DataplexCatalogUpdateEntryGroupOperator(DataplexCatalogBaseOperator):
+    """
+    Update an EntryGroup resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:DataplexCatalogUpdateEntryGroupOperator`
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param location: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param update_mask: Optional. Names of fields whose values to overwrite on 
an entry group.
+        If this parameter is absent or empty, all modifiable fields are 
overwritten. If such
+        fields are non-required and omitted in the request body, their values 
are emptied.
+    :param entry_group_id: Required. ID of the EntryGroup to update.
+    :param entry_group_configuration: Required. The updated configuration body 
of the EntryGroup.
+        For more details please see API documentation:
+        
https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryGroups#EntryGroup
+    :param validate_only: Optional. The service validates the request without 
performing any mutations.
+    :param retry: Optional. A retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :param timeout: Optional. The amount of time, in seconds, to wait for the 
request to complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Optional. Additional metadata that is provided to the 
method.
+    :param gcp_conn_id: Optional. The connection ID to use when fetching 
connection info.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = tuple(
+        {"entry_group_id", "entry_group_configuration", "update_mask"}
+        | set(DataplexCatalogBaseOperator.template_fields)
+    )
+    operator_extra_links = (DataplexCatalogEntryGroupLink(),)
+
+    def __init__(
+        self,
+        entry_group_id: str,
+        entry_group_configuration: dict | EntryGroup,
+        update_mask: list[str] | FieldMask | None = None,
+        validate_request: bool | None = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.entry_group_id = entry_group_id
+        self.entry_group_configuration = entry_group_configuration
+        self.update_mask = update_mask
+        self.validate_request = validate_request
+
+    def execute(self, context: Context):
+        DataplexCatalogEntryGroupLink.persist(
+            context=context,
+            task_instance=self,
+        )
+
+        if self.validate_request:
+            self.log.info("Validating an Update Dataplex Catalog EntryGroup 
request.")
+        else:
+            self.log.info(
+                "Updating Dataplex Catalog EntryGroup %s.",
+                self.entry_group_id,
+            )
+        try:
+            operation = self.hook.update_entry_group(
+                location=self.location,
+                project_id=self.project_id,
+                entry_group_id=self.entry_group_id,
+                entry_group_configuration=self.entry_group_configuration,
+                update_mask=self.update_mask,
+                validate_only=self.validate_request,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            entry_group = self.hook.wait_for_operation(timeout=self.timeout, 
operation=operation)
+
+        except NotFound as ex:
+            self.log.info("Specified EntryGroup was not found.")
+            raise AirflowException(ex)
+        except Exception as exc:
+            raise AirflowException(exc)
+        else:
+            result = EntryGroup.to_dict(entry_group) if not 
self.validate_request else None
+
+        if not self.validate_request:
+            self.log.info("EntryGroup %s was successfully updated.", 
self.entry_group_id)
+        return result
diff --git a/providers/src/airflow/providers/google/provider.yaml 
b/providers/src/airflow/providers/google/provider.yaml
index 772c8babdee..97277806b85 100644
--- a/providers/src/airflow/providers/google/provider.yaml
+++ b/providers/src/airflow/providers/google/provider.yaml
@@ -131,7 +131,7 @@ dependencies:
   - google-cloud-datacatalog>=3.23.0
   - google-cloud-dataflow-client>=0.8.6
   - google-cloud-dataform>=0.5.0
-  - google-cloud-dataplex>=1.10.0
+  - google-cloud-dataplex>=2.6.0
   - google-cloud-dataproc>=5.12.0
   - google-cloud-dataproc-metastore>=1.12.0
   - google-cloud-dlp>=3.12.0
@@ -1203,6 +1203,8 @@ extra-links:
   - airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
   - airflow.providers.google.cloud.links.dataplex.DataplexTasksLink
   - airflow.providers.google.cloud.links.dataplex.DataplexLakeLink
+  - airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryGroupLink
+  - 
airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryGroupsLink
   - airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink
   - airflow.providers.google.cloud.links.bigquery.BigQueryTableLink
   - airflow.providers.google.cloud.links.bigquery.BigQueryJobDetailLink
diff --git a/providers/tests/google/cloud/hooks/test_dataplex.py 
b/providers/tests/google/cloud/hooks/test_dataplex.py
index 8f1f5d98666..4a4f550eca6 100644
--- a/providers/tests/google/cloud/hooks/test_dataplex.py
+++ b/providers/tests/google/cloud/hooks/test_dataplex.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 from unittest import mock
 
 from google.api_core.gapic_v1.method import DEFAULT
+from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow.providers.google.cloud.operators.dataplex import DataplexHook
 
@@ -30,6 +31,9 @@ DATAPLEX_HOOK_CLIENT = 
"airflow.providers.google.cloud.hooks.dataplex.DataplexHo
 DATAPLEX_HOOK_DS_CLIENT = (
     
"airflow.providers.google.cloud.hooks.dataplex.DataplexHook.get_dataplex_data_scan_client"
 )
+DATAPLEX_CATALOG_HOOK_CLIENT = (
+    
"airflow.providers.google.cloud.hooks.dataplex.DataplexHook.get_dataplex_catalog_client"
+)
 
 PROJECT_ID = "project-id"
 REGION = "region"
@@ -44,12 +48,21 @@ DATA_SCAN_ID = "test-data-scan-id"
 ASSET_ID = "test_asset_id"
 ZONE_ID = "test_zone_id"
 JOB_ID = "job_id"
+
+LOCATION = "us-central1"
+ENTRY_GROUP_ID = "entry-group-id"
+ENTRY_GROUP_BODY = {"description": "Some descr"}
+ENTRY_GROUP_UPDATED_BODY = {"description": "Some new descr"}
+UPDATE_MASK = ["description"]
+
+COMMON_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}"
 DATA_SCAN_NAME = 
f"projects/{PROJECT_ID}/locations/{REGION}/dataScans/{DATA_SCAN_ID}"
 DATA_SCAN_JOB_NAME = 
f"projects/{PROJECT_ID}/locations/{REGION}/dataScans/{DATA_SCAN_ID}/jobs/{JOB_ID}"
 ZONE_NAME = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}"
 ZONE_PARENT = 
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}"
 ASSET_PARENT = 
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/assets/{ASSET_ID}"
 DATASCAN_PARENT = f"projects/{PROJECT_ID}/locations/{REGION}"
+ENTRY_GROUP_PARENT = 
f"projects/{PROJECT_ID}/locations/{LOCATION}/entryGroup/{ENTRY_GROUP_ID}"
 
 
 class TestDataplexHook:
@@ -311,3 +324,104 @@ class TestDataplexHook:
             timeout=None,
             metadata=(),
         )
+
+    @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+    def test_create_entry_group(self, mock_client):
+        mock_common_location_path = 
mock_client.return_value.common_location_path
+        mock_common_location_path.return_value = COMMON_PARENT
+        self.hook.create_entry_group(
+            project_id=PROJECT_ID,
+            location=LOCATION,
+            entry_group_id=ENTRY_GROUP_ID,
+            entry_group_configuration=ENTRY_GROUP_BODY,
+            validate_only=False,
+        )
+        mock_client.return_value.create_entry_group.assert_called_once_with(
+            request=dict(
+                parent=COMMON_PARENT,
+                entry_group_id=ENTRY_GROUP_ID,
+                entry_group=ENTRY_GROUP_BODY,
+                validate_only=False,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+    def test_delete_entry_group(self, mock_client):
+        mock_common_location_path = mock_client.return_value.entry_group_path
+        mock_common_location_path.return_value = ENTRY_GROUP_PARENT
+        self.hook.delete_entry_group(project_id=PROJECT_ID, location=LOCATION, 
entry_group_id=ENTRY_GROUP_ID)
+
+        mock_client.return_value.delete_entry_group.assert_called_once_with(
+            request=dict(
+                name=ENTRY_GROUP_PARENT,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+    def test_list_entry_groups(self, mock_client):
+        mock_common_location_path = 
mock_client.return_value.common_location_path
+        mock_common_location_path.return_value = COMMON_PARENT
+        self.hook.list_entry_groups(
+            project_id=PROJECT_ID,
+            location=LOCATION,
+            order_by="name",
+            page_size=2,
+            filter_by="'description' = 'Some descr'",
+        )
+        mock_client.return_value.list_entry_groups.assert_called_once_with(
+            request=dict(
+                parent=COMMON_PARENT,
+                page_size=2,
+                page_token=None,
+                filter="'description' = 'Some descr'",
+                order_by="name",
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+    def test_get_entry_group(self, mock_client):
+        mock_common_location_path = mock_client.return_value.entry_group_path
+        mock_common_location_path.return_value = ENTRY_GROUP_PARENT
+        self.hook.get_entry_group(project_id=PROJECT_ID, location=LOCATION, 
entry_group_id=ENTRY_GROUP_ID)
+
+        mock_client.return_value.get_entry_group.assert_called_once_with(
+            request=dict(
+                name=ENTRY_GROUP_PARENT,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT)
+    def test_update_entry_group(self, mock_client):
+        mock_common_location_path = mock_client.return_value.entry_group_path
+        mock_common_location_path.return_value = ENTRY_GROUP_PARENT
+        self.hook.update_entry_group(
+            project_id=PROJECT_ID,
+            location=LOCATION,
+            entry_group_id=ENTRY_GROUP_ID,
+            entry_group_configuration=ENTRY_GROUP_UPDATED_BODY,
+            update_mask=UPDATE_MASK,
+            validate_only=False,
+        )
+
+        mock_client.return_value.update_entry_group.assert_called_once_with(
+            request=dict(
+                entry_group={**ENTRY_GROUP_UPDATED_BODY, "name": 
ENTRY_GROUP_PARENT},
+                update_mask=FieldMask(paths=UPDATE_MASK),
+                validate_only=False,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
diff --git a/providers/tests/google/cloud/links/test_dataplex.py 
b/providers/tests/google/cloud/links/test_dataplex.py
new file mode 100644
index 00000000000..05661c84bd3
--- /dev/null
+++ b/providers/tests/google/cloud/links/test_dataplex.py
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.google.cloud.links.dataplex import (
+    DataplexCatalogEntryGroupLink,
+    DataplexCatalogEntryGroupsLink,
+    DataplexLakeLink,
+    DataplexTaskLink,
+    DataplexTasksLink,
+)
+from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCatalogCreateEntryGroupOperator,
+    DataplexCatalogGetEntryGroupOperator,
+    DataplexCreateLakeOperator,
+    DataplexCreateTaskOperator,
+    DataplexListTasksOperator,
+)
+
+TEST_LOCATION = "test-location"
+TEST_PROJECT_ID = "test-project-id"
+TEST_ENTRY_GROUP_ID = "test-entry-group-id"
+TEST_ENTRY_GROUP_ID_BODY = {"description": "some description"}
+TEST_ENTRY_GROUPS_ID = "test-entry-groups-id"
+TEST_TASK_ID = "test-task-id"
+TEST_TASKS_ID = "test-tasks-id"
+TEST_LAKE_ID = "test-lake-id"
+TEST_LAKE_BODY = {"name": "some_name"}
+
+DATAPLEX_BASE_LINK = "https://console.cloud.google.com/dataplex/";
+EXPECTED_DATAPLEX_CATALOG_ENTRY_GROUP_LINK = (
+    DATAPLEX_BASE_LINK
+    + 
f"projects/{TEST_PROJECT_ID}/locations/{TEST_LOCATION}/entryGroups/{TEST_ENTRY_GROUP_ID}?project={TEST_PROJECT_ID}"
+)
+EXPECTED_DATAPLEX_CATALOG_ENTRY_GROUPS_LINK = (
+    DATAPLEX_BASE_LINK + f"catalog/entry-groups?project={TEST_PROJECT_ID}"
+)
+DATAPLEX_LAKE_LINK = (
+    DATAPLEX_BASE_LINK + 
f"lakes/{TEST_LAKE_ID};location={TEST_LOCATION}?project={TEST_PROJECT_ID}"
+)
+EXPECTED_DATAPLEX_TASK_LINK = (
+    DATAPLEX_BASE_LINK
+    + 
f"process/tasks/{TEST_LAKE_ID}.{TEST_TASK_ID};location={TEST_LOCATION}/jobs?project={TEST_PROJECT_ID}"
+)
+EXPECTED_DATAPLEX_TASKS_LINK = (
+    DATAPLEX_BASE_LINK + 
f"process/tasks?project={TEST_PROJECT_ID}&qLake={TEST_LAKE_ID}.{TEST_LOCATION}"
+)
+
+
+class TestDataplexTaskLink:
+    @pytest.mark.db_test
+    def test_get_link(self, create_task_instance_of_operator, session):
+        expected_url = EXPECTED_DATAPLEX_TASK_LINK
+        link = DataplexTaskLink()
+        ti = create_task_instance_of_operator(
+            DataplexCreateTaskOperator,
+            dag_id="test_link_dag",
+            task_id="test_link_task",
+            region=TEST_LOCATION,
+            lake_id=TEST_LAKE_ID,
+            project_id=TEST_PROJECT_ID,
+            body=TEST_LAKE_BODY,
+            dataplex_task_id=TEST_TASK_ID,
+        )
+        session.add(ti)
+        session.commit()
+        link.persist(context={"ti": ti}, task_instance=ti.task)
+        actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
+        assert actual_url == expected_url
+
+
+class TestDataplexTasksLink:
+    @pytest.mark.db_test
+    def test_get_link(self, create_task_instance_of_operator, session):
+        expected_url = EXPECTED_DATAPLEX_TASKS_LINK
+        link = DataplexTasksLink()
+        ti = create_task_instance_of_operator(
+            DataplexListTasksOperator,
+            dag_id="test_link_dag",
+            task_id="test_link_task",
+            region=TEST_LOCATION,
+            lake_id=TEST_LAKE_ID,
+            project_id=TEST_PROJECT_ID,
+        )
+        session.add(ti)
+        session.commit()
+        link.persist(context={"ti": ti}, task_instance=ti.task)
+        actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
+        assert actual_url == expected_url
+
+
+class TestDataplexLakeLink:
+    @pytest.mark.db_test
+    def test_get_link(self, create_task_instance_of_operator, session):
+        expected_url = DATAPLEX_LAKE_LINK
+        link = DataplexLakeLink()
+        ti = create_task_instance_of_operator(
+            DataplexCreateLakeOperator,
+            dag_id="test_link_dag",
+            task_id="test_link_task",
+            region=TEST_LOCATION,
+            lake_id=TEST_LAKE_ID,
+            project_id=TEST_PROJECT_ID,
+            body={},
+        )
+        session.add(ti)
+        session.commit()
+        link.persist(context={"ti": ti}, task_instance=ti.task)
+        actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
+        assert actual_url == expected_url
+
+
+class TestDataplexCatalogEntryGroupLink:
+    @pytest.mark.db_test
+    def test_get_link(self, create_task_instance_of_operator, session):
+        expected_url = EXPECTED_DATAPLEX_CATALOG_ENTRY_GROUP_LINK
+        link = DataplexCatalogEntryGroupLink()
+        ti = create_task_instance_of_operator(
+            DataplexCatalogGetEntryGroupOperator,
+            dag_id="test_link_dag",
+            task_id="test_link_task",
+            location=TEST_LOCATION,
+            entry_group_id=TEST_ENTRY_GROUP_ID,
+            project_id=TEST_PROJECT_ID,
+        )
+        session.add(ti)
+        session.commit()
+        link.persist(context={"ti": ti}, task_instance=ti.task)
+        actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
+        assert actual_url == expected_url
+
+
+class TestDataplexCatalogEntryGroupsLink:
+    @pytest.mark.db_test
+    def test_get_link(self, create_task_instance_of_operator, session):
+        expected_url = EXPECTED_DATAPLEX_CATALOG_ENTRY_GROUPS_LINK
+        link = DataplexCatalogEntryGroupsLink()
+        ti = create_task_instance_of_operator(
+            DataplexCatalogCreateEntryGroupOperator,
+            dag_id="test_link_dag",
+            task_id="test_link_task",
+            location=TEST_LOCATION,
+            entry_group_id=TEST_ENTRY_GROUP_ID,
+            entry_group_configuration=TEST_ENTRY_GROUP_ID_BODY,
+            project_id=TEST_PROJECT_ID,
+        )
+        session.add(ti)
+        session.commit()
+        link.persist(context={"ti": ti}, task_instance=ti.task)
+        actual_url = link.get_link(operator=ti.task, ti_key=ti.key)
+        assert actual_url == expected_url
diff --git a/providers/tests/google/cloud/operators/test_dataplex.py 
b/providers/tests/google/cloud/operators/test_dataplex.py
index 1eec9008e2c..2aff961623b 100644
--- a/providers/tests/google/cloud/operators/test_dataplex.py
+++ b/providers/tests/google/cloud/operators/test_dataplex.py
@@ -20,9 +20,15 @@ from unittest import mock
 
 import pytest
 from google.api_core.gapic_v1.method import DEFAULT
+from google.cloud.dataplex_v1.services.catalog_service.pagers import 
ListEntryGroupsPager
+from google.cloud.dataplex_v1.types import ListEntryGroupsRequest, 
ListEntryGroupsResponse
 
 from airflow.exceptions import TaskDeferred
 from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCatalogCreateEntryGroupOperator,
+    DataplexCatalogDeleteEntryGroupOperator,
+    DataplexCatalogGetEntryGroupOperator,
+    DataplexCatalogListEntryGroupsOperator,
     DataplexCreateAssetOperator,
     DataplexCreateLakeOperator,
     DataplexCreateOrUpdateDataProfileScanOperator,
@@ -51,6 +57,7 @@ LAKE_STR = 
"airflow.providers.google.cloud.operators.dataplex.Lake"
 DATASCANJOB_STR = 
"airflow.providers.google.cloud.operators.dataplex.DataScanJob"
 ZONE_STR = "airflow.providers.google.cloud.operators.dataplex.Zone"
 ASSET_STR = "airflow.providers.google.cloud.operators.dataplex.Asset"
+ENTRY_GROUP_STR = 
"airflow.providers.google.cloud.operators.dataplex.EntryGroup"
 
 PROJECT_ID = "project-id"
 REGION = "region"
@@ -72,6 +79,7 @@ TASK_ID = "test_task_id"
 ASSET_ID = "test_asset_id"
 ZONE_ID = "test_zone_id"
 JOB_ID = "test_job_id"
+ENTRY_GROUP_NAME = "test_entry_group"
 
 
 class TestDataplexCreateTaskOperator:
@@ -734,3 +742,138 @@ class TestDataplexCreateDataProfileScanOperator:
             timeout=None,
             metadata=(),
         )
+
+
+class TestDataplexCatalogCreateEntryGroupOperator:
+    @mock.patch(ENTRY_GROUP_STR)
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock, entry_group_mock):
+        op = DataplexCatalogCreateEntryGroupOperator(
+            task_id="create_task",
+            project_id=PROJECT_ID,
+            location=REGION,
+            entry_group_id=ENTRY_GROUP_NAME,
+            entry_group_configuration=BODY,
+            validate_request=None,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        entry_group_mock.return_value.to_dict.return_value = None
+        hook_mock.return_value.wait_for_operation.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.create_entry_group.assert_called_once_with(
+            entry_group_id=ENTRY_GROUP_NAME,
+            entry_group_configuration=BODY,
+            location=REGION,
+            project_id=PROJECT_ID,
+            validate_only=None,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexCatalogGetEntryGroupOperator:
+    @mock.patch(ENTRY_GROUP_STR)
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock, entry_group_mock):
+        op = DataplexCatalogGetEntryGroupOperator(
+            project_id=PROJECT_ID,
+            location=REGION,
+            entry_group_id=ENTRY_GROUP_NAME,
+            task_id="get_task",
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context=mock.MagicMock())
+        entry_group_mock.return_value.to_dict.return_value = None
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.get_entry_group.assert_called_once_with(
+            project_id=PROJECT_ID,
+            location=REGION,
+            entry_group_id=ENTRY_GROUP_NAME,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexCatalogDeleteEntryGroupOperator:
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock):
+        op = DataplexCatalogDeleteEntryGroupOperator(
+            project_id=PROJECT_ID,
+            location=REGION,
+            entry_group_id=ENTRY_GROUP_NAME,
+            task_id="delete_task",
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.wait_for_operation.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.delete_entry_group.assert_called_once_with(
+            project_id=PROJECT_ID,
+            location=REGION,
+            entry_group_id=ENTRY_GROUP_NAME,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexCatalogListEntryGroupsOperator:
+    @mock.patch(ENTRY_GROUP_STR)
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock, entry_group_mock):
+        op = DataplexCatalogListEntryGroupsOperator(
+            project_id=PROJECT_ID,
+            location=REGION,
+            task_id="list_task",
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.list_entry_groups.return_value = 
ListEntryGroupsPager(
+            response=(
+                ListEntryGroupsResponse(
+                    entry_groups=[
+                        {
+                            "name": "aaa",
+                            "description": "Test Entry Group 1",
+                            "display_name": "Entry Group One",
+                        }
+                    ]
+                )
+            ),
+            method=mock.MagicMock(),
+            request=ListEntryGroupsRequest(parent=""),
+        )
+
+        entry_group_mock.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        hook_mock.return_value.list_entry_groups.assert_called_once_with(
+            project_id=PROJECT_ID,
+            location=REGION,
+            page_size=None,
+            page_token=None,
+            filter_by=None,
+            order_by=None,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
diff --git 
a/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py 
b/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
new file mode 100644
index 00000000000..8eec8a317d6
--- /dev/null
+++ b/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that shows how to use Dataplex Catalog.
+"""
+
+from __future__ import annotations
+
+import datetime
+import os
+
+from airflow.models.dag import DAG
+from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCatalogCreateEntryGroupOperator,
+    DataplexCatalogDeleteEntryGroupOperator,
+    DataplexCatalogGetEntryGroupOperator,
+    DataplexCatalogListEntryGroupsOperator,
+    DataplexCatalogUpdateEntryGroupOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or 
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+
+DAG_ID = "dataplex_catalog"
+GCP_LOCATION = "us-central1"
+
+ENTRY_GROUP_NAME = f"{DAG_ID}_entry_group_{ENV_ID}".replace("_", "-")
+# [START howto_dataplex_entry_group_configuration]
+ENTRY_GROUP_BODY = {"display_name": "Display Name", "description": "Some 
description"}
+# [END howto_dataplex_entry_group_configuration]
+
+with DAG(
+    DAG_ID,
+    start_date=datetime.datetime(2021, 1, 1),
+    schedule="@once",
+    tags=["example", "dataplex_catalog"],
+) as dag:
+    # [START howto_operator_dataplex_catalog_create_entry_group]
+    create_entry_group = DataplexCatalogCreateEntryGroupOperator(
+        task_id="create_entry_group",
+        project_id=PROJECT_ID,
+        location=GCP_LOCATION,
+        entry_group_id=ENTRY_GROUP_NAME,
+        entry_group_configuration=ENTRY_GROUP_BODY,
+        validate_request=False,
+    )
+    # [END howto_operator_dataplex_catalog_create_entry_group]
+
+    # [START howto_operator_dataplex_catalog_get_entry_group]
+    get_entry_group = DataplexCatalogGetEntryGroupOperator(
+        task_id="get_entry_group",
+        project_id=PROJECT_ID,
+        location=GCP_LOCATION,
+        entry_group_id=ENTRY_GROUP_NAME,
+    )
+    # [END howto_operator_dataplex_catalog_get_entry_group]
+
+    # [START howto_operator_dataplex_catalog_list_entry_groups]
+    list_entry_group = DataplexCatalogListEntryGroupsOperator(
+        task_id="list_entry_group",
+        project_id=PROJECT_ID,
+        location=GCP_LOCATION,
+        order_by="name",
+        filter_by='display_name = "Display Name"',
+    )
+    # [END howto_operator_dataplex_catalog_list_entry_groups]
+
+    # [START howto_operator_dataplex_catalog_update_entry_group]
+    update_entry_group = DataplexCatalogUpdateEntryGroupOperator(
+        task_id="update_entry_group",
+        project_id=PROJECT_ID,
+        location=GCP_LOCATION,
+        entry_group_id=ENTRY_GROUP_NAME,
+        entry_group_configuration={"display_name": "Updated Display Name"},
+        update_mask=["display_name"],
+    )
+    # [END howto_operator_dataplex_catalog_update_entry_group]
+
+    # [START howto_operator_dataplex_catalog_delete_entry_group]
+    delete_entry_group = DataplexCatalogDeleteEntryGroupOperator(
+        task_id="delete_entry_group",
+        project_id=PROJECT_ID,
+        location=GCP_LOCATION,
+        entry_group_id=ENTRY_GROUP_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END howto_operator_dataplex_catalog_delete_entry_group]
+
+    create_entry_group >> get_entry_group >> list_entry_group >> 
update_entry_group >> delete_entry_group
+
+    from tests_common.test_utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index f12b3ad6a66..b894acdb3a8 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -116,7 +116,6 @@ class TestProjectStructure:
             "providers/tests/google/cloud/links/test_dataflow.py",
             "providers/tests/google/cloud/links/test_dataform.py",
             "providers/tests/google/cloud/links/test_datafusion.py",
-            "providers/tests/google/cloud/links/test_dataplex.py",
             "providers/tests/google/cloud/links/test_dataprep.py",
             "providers/tests/google/cloud/links/test_dataproc.py",
             "providers/tests/google/cloud/links/test_datastore.py",
@@ -396,6 +395,7 @@ class 
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
         
"airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator",
         
"airflow.providers.google.cloud.operators.dataproc.DataprocJobBaseOperator",
         
"airflow.providers.google.cloud.operators.dataproc._DataprocStartStopClusterBaseOperator",
+        
"airflow.providers.google.cloud.operators.dataplex.DataplexCatalogBaseOperator",
         
"airflow.providers.google.cloud.operators.vertex_ai.custom_job.CustomTrainingJobBaseOperator",
         
"airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator",
         
"airflow.providers.google.marketing_platform.operators.search_ads._GoogleSearchAdsBaseOperator",

Reply via email to