This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 38082a19ebb5c895adaed534a6af39aaeb5f0f37
Author: max <[email protected]>
AuthorDate: Wed Jan 3 19:22:22 2024 +0100

    Implement Google Analytics Admin (GA4) operators (#36276)
    
    (cherry picked from commit f28643b7bdc90a61ec5bd12f8505772cd8c3bf7f)
---
 .github/workflows/ci.yml                           |   2 +-
 .../google/marketing_platform/hooks/analytics.py   |   9 +
 .../marketing_platform/hooks/analytics_admin.py    | 234 +++++++++
 .../google/marketing_platform/links/__init__.py    |  16 +
 .../marketing_platform/links/analytics_admin.py    |  65 +++
 .../marketing_platform/operators/analytics.py      |  53 ++
 .../operators/analytics_admin.py                   | 579 +++++++++++++++++++++
 airflow/providers/google/provider.yaml             |  14 +
 .../operators/marketing_platform/analytics.rst     |   5 +
 .../marketing_platform/analytics_admin.rst         | 151 ++++++
 generated/provider_dependencies.json               |   1 +
 .../hooks/test_analytics_admin.py                  | 192 +++++++
 .../google/marketing_platform/links/__init__.py    |  17 +
 .../links/test_analytics_admin.py                  |  71 +++
 .../operators/test_analytics_admin.py              | 310 +++++++++++
 .../marketing_platform/example_analytics_admin.py  | 203 ++++++++
 16 files changed, 1921 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a508c06f4f..610132b81a 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -671,7 +671,7 @@ jobs:
         run: aws s3 sync --delete ./files/documentation 
s3://apache-airflow-docs
 
   spellcheck-docs:
-    timeout-minutes: 60
+    timeout-minutes: 120
     name: "Spellcheck docs"
     runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
     needs: [build-info, wait-for-ci-images]
diff --git a/airflow/providers/google/marketing_platform/hooks/analytics.py 
b/airflow/providers/google/marketing_platform/hooks/analytics.py
index 05df51c2c1..ec98ec2829 100644
--- a/airflow/providers/google/marketing_platform/hooks/analytics.py
+++ b/airflow/providers/google/marketing_platform/hooks/analytics.py
@@ -17,11 +17,13 @@
 # under the License.
 from __future__ import annotations
 
+import warnings
 from typing import Any
 
 from googleapiclient.discovery import Resource, build
 from googleapiclient.http import MediaFileUpload
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
 
 
@@ -30,6 +32,13 @@ class GoogleAnalyticsHook(GoogleBaseHook):
 
     def __init__(self, api_version: str = "v3", *args, **kwargs):
         super().__init__(*args, **kwargs)
+        warnings.warn(
+            f"The `{type(self).__name__}` class is deprecated, please use "
+            f"`GoogleAnalyticsAdminHook` instead.",
+            AirflowProviderDeprecationWarning,
+            stacklevel=1,
+        )
+
         self.api_version = api_version
         self._conn = None
 
diff --git 
a/airflow/providers/google/marketing_platform/hooks/analytics_admin.py 
b/airflow/providers/google/marketing_platform/hooks/analytics_admin.py
new file mode 100644
index 0000000000..cff9e0e409
--- /dev/null
+++ b/airflow/providers/google/marketing_platform/hooks/analytics_admin.py
@@ -0,0 +1,234 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Hooks for Google Analytics (GA4) Admin service.
+
+.. spelling:word-list::
+
+    DataStream
+    ListAccountsPager
+    ListGoogleAdsLinksPager
+"""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Sequence
+
+from google.analytics.admin_v1beta import (
+    AnalyticsAdminServiceClient,
+    DataStream,
+    Property,
+)
+from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+
+from airflow.providers.google.common.consts import CLIENT_INFO
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+if TYPE_CHECKING:
+    from google.analytics.admin_v1beta.services.analytics_admin_service.pagers 
import (
+        ListAccountsPager,
+        ListGoogleAdsLinksPager,
+    )
+    from google.api_core.retry import Retry
+
+
+class GoogleAnalyticsAdminHook(GoogleBaseHook):
+    """Hook for Google Analytics 4 (GA4) Admin API."""
+
+    def __init__(self, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self._conn: AnalyticsAdminServiceClient | None = None
+
+    def get_conn(self) -> AnalyticsAdminServiceClient:
+        if not self._conn:
+            self._conn = AnalyticsAdminServiceClient(
+                credentials=self.get_credentials(), client_info=CLIENT_INFO
+            )
+        return self._conn
+
+    def list_accounts(
+        self,
+        page_size: int | None = None,
+        page_token: str | None = None,
+        show_deleted: bool | None = None,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> ListAccountsPager:
+        """Get list of accounts in Google Analytics.
+
+        .. seealso::
+            For more details please check the client library documentation:
+            
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/accounts/list
+
+        :param page_size: Optional, number of results to return in the list.
+        :param page_token: Optional. The next_page_token value returned from a 
previous List request, if any.
+        :param show_deleted: Optional. Whether to include soft-deleted (ie: 
"trashed") Accounts in the results.
+        :param retry: Optional, a retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The timeout for this request.
+        :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+
+        :returns: List of Google Analytics accounts.
+        """
+        request = {"page_size": page_size, "page_token": page_token, 
"show_deleted": show_deleted}
+        client = self.get_conn()
+        return client.list_accounts(request=request, retry=retry, 
timeout=timeout, metadata=metadata)
+
+    def create_property(
+        self,
+        analytics_property: Property | dict,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Property:
+        """Create Google Analytics property.
+
+        .. seealso::
+            For more details please check the client library documentation:
+            
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties/create
+
+        :param analytics_property: The property to create. Note: the supplied 
property must specify its
+            parent.
+        :param retry: Optional, a retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The timeout for this request.
+        :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+
+        :returns: Created Google Analytics property.
+        """
+        client = self.get_conn()
+        return client.create_property(
+            request={"property": analytics_property},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    def delete_property(
+        self,
+        property_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Property:
+        """Soft delete Google Analytics property.
+
+        .. seealso::
+            For more details please check the client library documentation:
+            
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties/delete
+
+        :param property_id: ID of the Property to soft-delete. Format: 
properties/{property_id}.
+        :param retry: Optional, a retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The timeout for this request.
+        :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+
+        :returns: Resource message representing Google Analytics property.
+        """
+        client = self.get_conn()
+        request = {"name": f"properties/{property_id}"}
+        return client.delete_property(request=request, retry=retry, 
timeout=timeout, metadata=metadata)
+
+    def create_data_stream(
+        self,
+        property_id: str,
+        data_stream: DataStream | dict,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> DataStream:
+        """Create Google Analytics data stream.
+
+        .. seealso::
+            For more details please check the client library documentation:
+            
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties.dataStreams/create
+
+        :param property_id: ID of the parent property for the data stream.
+        :param data_stream: The data stream to create.
+        :param retry: Optional, a retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The timeout for this request.
+        :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+
+        :returns: Created Google Analytics data stream.
+        """
+        client = self.get_conn()
+        return client.create_data_stream(
+            request={"parent": f"properties/{property_id}", "data_stream": 
data_stream},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    def delete_data_stream(
+        self,
+        property_id: str,
+        data_stream_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> None:
+        """Delete Google Analytics data stream.
+
+        .. seealso::
+            For more details please check the client library documentation:
+            
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties.dataStreams/delete
+
+        :param property_id: ID of the parent property for the data stream.
+        :param data_stream_id: The data stream id to delete.
+        :param retry: Optional, a retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The timeout for this request.
+        :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+        """
+        client = self.get_conn()
+        return client.delete_data_stream(
+            request={"name": 
f"properties/{property_id}/dataStreams/{data_stream_id}"},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    def list_google_ads_links(
+        self,
+        property_id: str,
+        page_size: int | None = None,
+        page_token: str | None = None,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> ListGoogleAdsLinksPager:
+        """Get list of Google Ads links.
+
+        .. seealso::
+            For more details please check the client library documentation:
+            
https://googleapis.dev/python/analyticsadmin/latest/admin_v1beta/analytics_admin_service.html#google.analytics.admin_v1beta.services.analytics_admin_service.AnalyticsAdminServiceAsyncClient.list_google_ads_links
+
+        :param property_id: ID of the parent property.
+        :param page_size: Optional, number of results to return in the list.
+        :param page_token: Optional. The next_page_token value returned from a 
previous List request, if any.
+        :param retry: Optional, a retry object used  to retry requests. If 
`None` is specified, requests
+            will not be retried.
+        :param timeout: Optional. The timeout for this request.
+        :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+
+        :returns: List of Google Analytics accounts.
+        """
+        client = self.get_conn()
+        request = {"parent": f"properties/{property_id}", "page_size": 
page_size, "page_token": page_token}
+        return client.list_google_ads_links(request=request, retry=retry, 
timeout=timeout, metadata=metadata)
diff --git a/airflow/providers/google/marketing_platform/links/__init__.py 
b/airflow/providers/google/marketing_platform/links/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/google/marketing_platform/links/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/airflow/providers/google/marketing_platform/links/analytics_admin.py 
b/airflow/providers/google/marketing_platform/links/analytics_admin.py
new file mode 100644
index 0000000000..3ab79c8804
--- /dev/null
+++ b/airflow/providers/google/marketing_platform/links/analytics_admin.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, ClassVar
+
+from airflow.models import BaseOperator, BaseOperatorLink, XCom
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstancekey import TaskInstanceKey
+    from airflow.utils.context import Context
+
+
+BASE_LINK = "https://analytics.google.com/analytics/web/";
+
+
+class GoogleAnalyticsBaseLink(BaseOperatorLink):
+    """Base class for Google Analytics links.
+
+    :meta private:
+    """
+
+    name: ClassVar[str]
+    key: ClassVar[str]
+    format_str: ClassVar[str]
+
+    def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> 
str:
+        if conf := XCom.get_value(key=self.key, ti_key=ti_key):
+            res = BASE_LINK + "#/" + self.format_str.format(**conf)
+            return res
+        return ""
+
+
+class GoogleAnalyticsPropertyLink(GoogleAnalyticsBaseLink):
+    """Helper class for constructing Google Analytics Property Link."""
+
+    name = "Data Analytics Property"
+    key = "data_analytics_property"
+    format_str = "p{property_id}/"
+
+    @staticmethod
+    def persist(
+        context: Context,
+        task_instance: BaseOperator,
+        property_id: str,
+    ):
+        task_instance.xcom_push(
+            context,
+            key=GoogleAnalyticsPropertyLink.key,
+            value={"property_id": property_id},
+        )
diff --git a/airflow/providers/google/marketing_platform/operators/analytics.py 
b/airflow/providers/google/marketing_platform/operators/analytics.py
index 0098980e9b..d987b62591 100644
--- a/airflow/providers/google/marketing_platform/operators/analytics.py
+++ b/airflow/providers/google/marketing_platform/operators/analytics.py
@@ -19,9 +19,11 @@
 from __future__ import annotations
 
 import csv
+import warnings
 from tempfile import NamedTemporaryFile
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.providers.google.marketing_platform.hooks.analytics import 
GoogleAnalyticsHook
@@ -34,6 +36,10 @@ class GoogleAnalyticsListAccountsOperator(BaseOperator):
     """
     Lists all accounts to which the user has access.
 
+    .. seealso::
+        This operator is deprecated, please use
+        
:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListAccountsOperator`:
+
     .. seealso::
         Check official API docs:
         
https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/accounts/list
@@ -70,6 +76,13 @@ class GoogleAnalyticsListAccountsOperator(BaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
+        warnings.warn(
+            f"The `{type(self).__name__}` operator is deprecated, please use "
+            f"`GoogleAnalyticsAdminListAccountsOperator` instead.",
+            AirflowProviderDeprecationWarning,
+            stacklevel=1,
+        )
+
         super().__init__(**kwargs)
 
         self.api_version = api_version
@@ -90,6 +103,10 @@ class GoogleAnalyticsGetAdsLinkOperator(BaseOperator):
     """
     Returns a web property-Google Ads link to which the user has access.
 
+    .. seealso::
+        This operator is deprecated, please use
+        
:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminGetGoogleAdsLinkOperator`:
+
     .. seealso::
         Check official API docs:
         
https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/webPropertyAdWordsLinks/get
@@ -132,6 +149,12 @@ class GoogleAnalyticsGetAdsLinkOperator(BaseOperator):
         **kwargs,
     ):
         super().__init__(**kwargs)
+        warnings.warn(
+            f"The `{type(self).__name__}` operator is deprecated, please use "
+            f"`GoogleAnalyticsAdminGetGoogleAdsLinkOperator` instead.",
+            AirflowProviderDeprecationWarning,
+            stacklevel=1,
+        )
 
         self.account_id = account_id
         self.web_property_ad_words_link_id = web_property_ad_words_link_id
@@ -158,6 +181,10 @@ class 
GoogleAnalyticsRetrieveAdsLinksListOperator(BaseOperator):
     """
     Lists webProperty-Google Ads links for a given web property.
 
+    .. seealso::
+        This operator is deprecated, please use
+        
:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListGoogleAdsLinksOperator`:
+
     .. seealso::
         Check official API docs:
         
https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/webPropertyAdWordsLinks/list#http-request
@@ -197,6 +224,12 @@ class 
GoogleAnalyticsRetrieveAdsLinksListOperator(BaseOperator):
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
+        warnings.warn(
+            f"The `{type(self).__name__}` operator is deprecated, please use "
+            f"`GoogleAnalyticsAdminListGoogleAdsLinksOperator` instead.",
+            AirflowProviderDeprecationWarning,
+            stacklevel=1,
+        )
 
         self.account_id = account_id
         self.web_property_id = web_property_id
@@ -221,6 +254,10 @@ class 
GoogleAnalyticsDataImportUploadOperator(BaseOperator):
     """
     Take a file from Cloud Storage and uploads it to GA via data import API.
 
+    .. seealso::
+        This operator is deprecated, please use
+        
:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreateDataStreamOperator`:
+
     :param storage_bucket: The Google cloud storage bucket where the file is 
stored.
     :param storage_name_object: The name of the object in the desired Google 
cloud
           storage bucket. (templated) If the destination points to an existing
@@ -266,6 +303,12 @@ class 
GoogleAnalyticsDataImportUploadOperator(BaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
+        warnings.warn(
+            f"The `{type(self).__name__}` operator is deprecated, please use "
+            f"`GoogleAnalyticsAdminCreateDataStreamOperator` instead.",
+            AirflowProviderDeprecationWarning,
+            stacklevel=1,
+        )
         super().__init__(**kwargs)
         self.storage_bucket = storage_bucket
         self.storage_name_object = storage_name_object
@@ -317,6 +360,10 @@ class 
GoogleAnalyticsDeletePreviousDataUploadsOperator(BaseOperator):
     """
     Deletes previous GA uploads to leave the latest file to control the size 
of the Data Set Quota.
 
+    .. seealso::
+        This operator is deprecated, please use
+        
:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeleteDataStreamOperator`:
+
     :param account_id: The GA account Id (long) to which the data upload 
belongs.
     :param web_property_id: The web property UA-string associated with the 
upload.
     :param custom_data_source_id: The id to which the data import belongs.
@@ -348,6 +395,12 @@ class 
GoogleAnalyticsDeletePreviousDataUploadsOperator(BaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
+        warnings.warn(
+            f"The `{type(self).__name__}` operator is deprecated, please use "
+            f"`GoogleAnalyticsAdminDeleteDataStreamOperator` instead.",
+            AirflowProviderDeprecationWarning,
+            stacklevel=1,
+        )
         super().__init__(**kwargs)
 
         self.account_id = account_id
diff --git 
a/airflow/providers/google/marketing_platform/operators/analytics_admin.py 
b/airflow/providers/google/marketing_platform/operators/analytics_admin.py
new file mode 100644
index 0000000000..d961630f86
--- /dev/null
+++ b/airflow/providers/google/marketing_platform/operators/analytics_admin.py
@@ -0,0 +1,579 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Analytics 4 (GA4) operators."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Sequence
+
+from google.analytics.admin_v1beta import (
+    Account,
+    DataStream,
+    GoogleAdsLink,
+    Property,
+)
+from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+
+from airflow.exceptions import AirflowNotFoundException
+from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
+from airflow.providers.google.marketing_platform.hooks.analytics_admin import 
GoogleAnalyticsAdminHook
+from airflow.providers.google.marketing_platform.links.analytics_admin import 
GoogleAnalyticsPropertyLink
+
+if TYPE_CHECKING:
+    from google.api_core.retry import Retry
+    from google.protobuf.message import Message
+
+    from airflow.utils.context import Context
+
+
+class GoogleAnalyticsAdminListAccountsOperator(GoogleCloudBaseOperator):
+    """
+    Lists all accounts to which the user has access.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleAnalyticsAdminListAccountsOperator`
+
+    :param page_size: Optional, number of results to return in the list.
+    :param page_token: Optional. The next_page_token value returned from a 
previous List request, if any.
+    :param show_deleted: Optional. Whether to include soft-deleted (ie: 
"trashed") Accounts in the results.
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+            will not be retried.
+    :param timeout: Optional. The timeout for this request.
+    :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "gcp_conn_id",
+        "impersonation_chain",
+        "page_size",
+        "page_token",
+    )
+
+    def __init__(
+        self,
+        *,
+        page_size: int | None = None,
+        page_token: str | None = None,
+        show_deleted: bool | None = None,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.page_size = page_size
+        self.page_token = page_token
+        self.show_deleted = show_deleted
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(
+        self,
+        context: Context,
+    ) -> Sequence[Message]:
+        hook = GoogleAnalyticsAdminHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info(
+            "Requesting list of Google Analytics accounts. "
+            f"Page size: {self.page_size}, page token: {self.page_token}"
+        )
+        accounts = hook.list_accounts(
+            page_size=self.page_size,
+            page_token=self.page_token,
+            show_deleted=self.show_deleted,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        accounts_list: Sequence[Message] = [Account.to_dict(item) for item in 
accounts]
+        n = len(accounts_list)
+        self.log.info("Successful request. Retrieved %s item%s.", n, "s" if n 
> 1 else "")
+        return accounts_list
+
+
+class GoogleAnalyticsAdminCreatePropertyOperator(GoogleCloudBaseOperator):
+    """
+    Creates property.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleAnalyticsAdminCreatePropertyOperator`
+
+    :param analytics_property: The property to create. Note: the supplied 
property must specify its parent.
+        For more details see: 
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties#Property
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :param timeout: Optional. The timeout for this request.
+    :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "gcp_conn_id",
+        "impersonation_chain",
+        "analytics_property",
+    )
+    operator_extra_links = (GoogleAnalyticsPropertyLink(),)
+
+    def __init__(
+        self,
+        *,
+        analytics_property: Property | dict[str, Any],
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.analytics_property = analytics_property
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(
+        self,
+        context: Context,
+    ) -> Message:
+        hook = GoogleAnalyticsAdminHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Creating a Google Analytics property.")
+        prop = hook.create_property(
+            analytics_property=self.analytics_property,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("The Google Analytics property %s was created 
successfully.", prop.name)
+        GoogleAnalyticsPropertyLink.persist(
+            context=context,
+            task_instance=self,
+            property_id=prop.name.lstrip("properties/"),
+        )
+
+        return Property.to_dict(prop)
+
+
+class GoogleAnalyticsAdminDeletePropertyOperator(GoogleCloudBaseOperator):
+    """
+    Soft-delete property.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleAnalyticsAdminDeletePropertyOperator`
+
+    :param property_id: The id of the Property to soft-delete.
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :param timeout: Optional. The timeout for this request.
+    :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "gcp_conn_id",
+        "impersonation_chain",
+        "property_id",
+    )
+
+    def __init__(
+        self,
+        *,
+        property_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.property_id = property_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(
+        self,
+        context: Context,
+    ) -> Message:
+        hook = GoogleAnalyticsAdminHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Deleting a Google Analytics property.")
+        prop = hook.delete_property(
+            property_id=self.property_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("The Google Analytics property %s was soft-deleted 
successfully.", prop.name)
+        return Property.to_dict(prop)
+
+
+class GoogleAnalyticsAdminCreateDataStreamOperator(GoogleCloudBaseOperator):
+    """
+    Creates Data stream.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleAnalyticsAdminCreateDataStreamOperator`
+
+    :param property_id: ID of the parent property for the data stream.
+    :param data_stream: The data stream to create.
+        For more details see: 
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties.dataStreams#DataStream
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :param timeout: Optional. The timeout for this request.
+    :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "gcp_conn_id",
+        "impersonation_chain",
+        "property_id",
+        "data_stream",
+    )
+
+    def __init__(
+        self,
+        *,
+        property_id: str,
+        data_stream: DataStream | dict,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.property_id = property_id
+        self.data_stream = data_stream
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(
+        self,
+        context: Context,
+    ) -> Message:
+        hook = GoogleAnalyticsAdminHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Creating a Google Analytics data stream.")
+        data_stream = hook.create_data_stream(
+            property_id=self.property_id,
+            data_stream=self.data_stream,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("The Google Analytics data stream %s was created 
successfully.", data_stream.name)
+        return DataStream.to_dict(data_stream)
+
+
+class GoogleAnalyticsAdminDeleteDataStreamOperator(GoogleCloudBaseOperator):
+    """
+    Deletes Data stream.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleAnalyticsAdminDeleteDataStreamOperator`
+
+    :param property_id: ID of the property which is parent for the data stream.
+    :param data_stream_id: ID of the data stream to delete.
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :param timeout: Optional. The timeout for this request.
+    :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "gcp_conn_id",
+        "impersonation_chain",
+        "property_id",
+        "data_stream_id",
+    )
+
+    def __init__(
+        self,
+        *,
+        property_id: str,
+        data_stream_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.property_id = property_id
+        self.data_stream_id = data_stream_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(
+        self,
+        context: Context,
+    ) -> None:
+        hook = GoogleAnalyticsAdminHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Deleting a Google Analytics data stream (id %s).", 
self.data_stream_id)
+        hook.delete_data_stream(
+            property_id=self.property_id,
+            data_stream_id=self.data_stream_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self.log.info("The Google Analytics data stream was deleted 
successfully.")
+        return None
+
+
+class GoogleAnalyticsAdminListGoogleAdsLinksOperator(GoogleCloudBaseOperator):
+    """
+    Lists all Google Ads links associated with a given property.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleAnalyticsAdminListGoogleAdsLinksOperator`
+
+    :param property_id: ID of the parent property.
+    :param page_size: Optional, number of results to return in the list.
+    :param page_token: Optional. The next_page_token value returned from a 
previous List request, if any.
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :param timeout: Optional. The timeout for this request.
+    :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "gcp_conn_id",
+        "impersonation_chain",
+        "property_id",
+        "page_size",
+        "page_token",
+    )
+
+    def __init__(
+        self,
+        *,
+        property_id: str,
+        page_size: int | None = None,
+        page_token: str | None = None,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.property_id = property_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(
+        self,
+        context: Context,
+    ) -> Sequence[Message]:
+        hook = GoogleAnalyticsAdminHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info(
+            "Requesting list of Google Ads links accounts for the property_id 
%s, "
+            "page size %s, page token %s",
+            self.property_id,
+            self.page_size,
+            self.page_token,
+        )
+        google_ads_links = hook.list_google_ads_links(
+            property_id=self.property_id,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        ads_links_list: Sequence[Message] = [GoogleAdsLink.to_dict(item) for 
item in google_ads_links]
+        n = len(ads_links_list)
+        self.log.info("Successful request. Retrieved %s item%s.", n, "s" if n 
> 1 else "")
+        return ads_links_list
+
+
+class GoogleAnalyticsAdminGetGoogleAdsLinkOperator(GoogleCloudBaseOperator):
+    """
+    Gets a Google Ads link associated with a given property.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleAnalyticsAdminGetGoogleAdsLinkOperator`
+
+    :param property_id: Parent property id.
+    :param google_ads_link_id: Google Ads link id.
+    :param retry: Optional, a retry object used  to retry requests. If `None` 
is specified, requests
+        will not be retried.
+    :param timeout: Optional. The timeout for this request.
+    :param metadata: Optional. Strings which should be sent along with the 
request as metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional. Service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "gcp_conn_id",
+        "impersonation_chain",
+        "google_ads_link_id",
+    )
+
+    def __init__(
+        self,
+        *,
+        property_id: str,
+        google_ads_link_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.property_id = property_id
+        self.google_ads_link_id = google_ads_link_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(
+        self,
+        context: Context,
+    ) -> Message:
+        hook = GoogleAnalyticsAdminHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info(
+            "Requesting the Google Ads link with id %s for the property_id %s",
+            self.google_ads_link_id,
+            self.property_id,
+        )
+        ads_links = hook.list_google_ads_links(
+            property_id=self.property_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        find_link = (item for item in ads_links if item.name.split("/")[-1] == 
self.google_ads_link_id)
+        if ads_link := next(find_link, None):
+            self.log.info("Successful request.")
+            return GoogleAdsLink.to_dict(ads_link)
+        raise AirflowNotFoundException(
+            f"Google Ads Link with id {self.google_ads_link_id} and property 
id {self.property_id} not found"
+        )
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index 0f702df942..aefc4d4997 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -91,6 +91,7 @@ dependencies:
   - gcloud-aio-storage>=9.0.0
   - gcsfs>=2023.10.0
   - google-ads>=22.1.0
+  - google-analytics-admin
   - google-api-core>=2.11.0
   - google-api-python-client>=1.6.0
   - google-auth>=1.0.0
@@ -145,6 +146,12 @@ dependencies:
   - sqlalchemy-spanner>=1.6.2
 
 integrations:
+  - integration-name: Google Analytics (GA4)
+    external-doc-url: https://analytics.google.com/
+    logo: /integration-logos/gcp/Google-Analytics.png
+    how-to-guide:
+      - 
/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst
+    tags: [gmp]
   - integration-name: Google Analytics360
     external-doc-url: https://analytics.google.com/
     logo: /integration-logos/gcp/Google-Analytics.png
@@ -605,6 +612,9 @@ operators:
   - integration-name: Google Cloud Firestore
     python-modules:
       - airflow.providers.google.firebase.operators.firestore
+  - integration-name: Google Analytics (GA4)
+    python-modules:
+      - airflow.providers.google.marketing_platform.operators.analytics_admin
   - integration-name: Google Analytics360
     python-modules:
       - airflow.providers.google.marketing_platform.operators.analytics
@@ -849,6 +859,9 @@ hooks:
   - integration-name: Google Cloud Firestore
     python-modules:
       - airflow.providers.google.firebase.hooks.firestore
+  - integration-name: Google Analytics (GA4)
+    python-modules:
+      - airflow.providers.google.marketing_platform.hooks.analytics_admin
   - integration-name: Google Analytics360
     python-modules:
       - airflow.providers.google.marketing_platform.hooks.analytics
@@ -1203,6 +1216,7 @@ extra-links:
   - 
airflow.providers.google.cloud.links.mlengine.MLEngineModelVersionDetailsLink
   - airflow.providers.google.common.links.storage.StorageLink
   - airflow.providers.google.common.links.storage.FileDetailsLink
+  - 
airflow.providers.google.marketing_platform.links.analytics_admin.GoogleAnalyticsPropertyLink
 
 additional-extras:
   - name: apache.beam
diff --git 
a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst
 
b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst
index 9b99c18295..765b6a6063 100644
--- 
a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst
+++ 
b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst
@@ -22,6 +22,11 @@ Google Analytics 360 operators allow you to lists all 
accounts to which the user
 For more information about the Google Analytics 360 API check
 `official documentation 
<https://developers.google.com/analytics/devguides/config/mgmt/v3>`__.
 
+Please note that the Google Analytics 360 API is replaced by
+`Google Analytics 4 
<https://developers.google.com/analytics/devguides/config/admin/v1>`__ and
+`will be turned down on July 1, 2024 
<https://support.google.com/analytics/answer/11583528>`__.
+Thus consider using new :doc:`Google Analytics (GA4) Admin Operators 
</operators/marketing_platform/analytics_admin>`.
+
 Prerequisite Tasks
 ^^^^^^^^^^^^^^^^^^
 
diff --git 
a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst
 
b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst
new file mode 100644
index 0000000000..b4aab19897
--- /dev/null
+++ 
b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst
@@ -0,0 +1,151 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Google Analytics (GA4) Admin Operators
+======================================
+
+Google Analytics (GA4) Admin operators allow you to lists all accounts to 
which the user has access.
+For more information about the Google Analytics 360 API check
+`official documentation 
<https://developers.google.com/analytics/devguides/config/admin/v1>`__.
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:GoogleAnalyticsAdminListAccountsOperator:
+
+List the Accounts
+^^^^^^^^^^^^^^^^^
+
+To list accounts from Analytics you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListAccountsOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_marketing_platform_list_accounts_operator]
+    :end-before: [END howto_marketing_platform_list_accounts_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListAccountsOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminCreatePropertyOperator:
+
+Create Property
+^^^^^^^^^^^^^^^
+
+Creates a property.
+To create a property you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreatePropertyOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_marketing_platform_create_property_operator]
+    :end-before: [END howto_marketing_platform_create_property_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreatePropertyOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminDeletePropertyOperator:
+
+Delete Property
+^^^^^^^^^^^^^^^
+
+Deletes a property.
+To delete a property you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeletePropertyOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_marketing_platform_delete_property_operator]
+    :end-before: [END howto_marketing_platform_delete_property_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeletePropertyOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminCreateDataStreamOperator:
+
+Create Data stream
+^^^^^^^^^^^^^^^^^^
+
+Creates a data stream.
+To create a data stream you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreateDataStreamOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_marketing_platform_create_data_stream_operator]
+    :end-before: [END howto_marketing_platform_create_data_stream_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreateDataStreamOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminDeleteDataStreamOperator:
+
+Delete Data stream
+^^^^^^^^^^^^^^^^^^
+
+Deletes a data stream.
+To delete a data stream you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeleteDataStreamOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_marketing_platform_delete_data_stream_operator]
+    :end-before: [END howto_marketing_platform_delete_data_stream_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeleteDataStreamOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminListGoogleAdsLinksOperator:
+
+List Google Ads Links
+^^^^^^^^^^^^^^^^^^^^^
+
+To list Google Ads links you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListGoogleAdsLinksOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_marketing_platform_list_google_ads_links]
+    :end-before: [END howto_marketing_platform_list_google_ads_links]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListGoogleAdsLinksOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminGetGoogleAdsLinkOperator:
+
+Get the Google Ads link
+^^^^^^^^^^^^^^^^^^^^^^^
+
+To list Google Ads links you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminGetGoogleAdsLinkOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_marketing_platform_get_google_ad_link]
+    :end-before: [END howto_marketing_platform_get_google_ad_link]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminGetGoogleAdsLinkOperator`
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index fbd5915882..d92a2559ea 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -468,6 +468,7 @@
       "gcloud-aio-storage>=9.0.0",
       "gcsfs>=2023.10.0",
       "google-ads>=22.1.0",
+      "google-analytics-admin",
       "google-api-core>=2.11.0",
       "google-api-python-client>=1.6.0",
       "google-auth-httplib2>=0.0.1",
diff --git 
a/tests/providers/google/marketing_platform/hooks/test_analytics_admin.py 
b/tests/providers/google/marketing_platform/hooks/test_analytics_admin.py
new file mode 100644
index 0000000000..81a5210d49
--- /dev/null
+++ b/tests/providers/google/marketing_platform/hooks/test_analytics_admin.py
@@ -0,0 +1,192 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from airflow.providers.google.marketing_platform.hooks.analytics_admin import 
GoogleAnalyticsAdminHook
+from tests.providers.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_default_project_id
+
+GCP_CONN_ID = "test_gcp_conn_id"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+TEST_PROPERTY_ID = "123456789"
+TEST_PROPERTY_NAME = f"properties/{TEST_PROPERTY_ID}"
+TEST_DATASTREAM_ID = "987654321"
+TEST_DATASTREAM_NAME = 
f"properties/{TEST_PROPERTY_ID}/dataStreams/{TEST_DATASTREAM_ID}"
+ANALYTICS_HOOK_PATH = 
"airflow.providers.google.marketing_platform.hooks.analytics_admin"
+
+
+class TestGoogleAnalyticsAdminHook:
+    def setup_method(self):
+        with mock.patch(
+            
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
+            new=mock_base_gcp_hook_default_project_id,
+        ):
+            self.hook = GoogleAnalyticsAdminHook(GCP_CONN_ID)
+
+    
@mock.patch("airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__")
+    def test_init(self, mock_base_init):
+        GoogleAnalyticsAdminHook(
+            GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_base_init.assert_called_once_with(
+            GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+    @mock.patch(f"{ANALYTICS_HOOK_PATH}.CLIENT_INFO")
+    
@mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_credentials")
+    @mock.patch(f"{ANALYTICS_HOOK_PATH}.AnalyticsAdminServiceClient")
+    def test_get_conn(self, mock_client, get_credentials, mock_client_info):
+        mock_credentials = mock.MagicMock()
+        get_credentials.return_value = mock_credentials
+
+        result = self.hook.get_conn()
+
+        mock_client.assert_called_once_with(credentials=mock_credentials, 
client_info=mock_client_info)
+        assert self.hook._conn == result
+
+    @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+    def test_list_accounts(self, mock_get_conn):
+        list_accounts_expected = mock.MagicMock()
+        mock_list_accounts = mock_get_conn.return_value.list_accounts
+        mock_list_accounts.return_value = list_accounts_expected
+        mock_page_size, mock_page_token, mock_show_deleted, mock_retry, 
mock_timeout, mock_metadata = (
+            mock.MagicMock() for _ in range(6)
+        )
+
+        request = {
+            "page_size": mock_page_size,
+            "page_token": mock_page_token,
+            "show_deleted": mock_show_deleted,
+        }
+
+        list_accounts_received = self.hook.list_accounts(
+            page_size=mock_page_size,
+            page_token=mock_page_token,
+            show_deleted=mock_show_deleted,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+        mock_list_accounts.assert_called_once_with(
+            request=request, retry=mock_retry, timeout=mock_timeout, 
metadata=mock_metadata
+        )
+        assert list_accounts_received == list_accounts_expected
+
+    @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+    def test_create_property(self, mock_get_conn):
+        property_expected = mock.MagicMock()
+
+        mock_create_property = mock_get_conn.return_value.create_property
+        mock_create_property.return_value = property_expected
+        mock_property, mock_retry, mock_timeout, mock_metadata = 
(mock.MagicMock() for _ in range(4))
+
+        property_created = self.hook.create_property(
+            analytics_property=mock_property, retry=mock_retry, 
timeout=mock_timeout, metadata=mock_metadata
+        )
+
+        request = {"property": mock_property}
+        mock_create_property.assert_called_once_with(
+            request=request, retry=mock_retry, timeout=mock_timeout, 
metadata=mock_metadata
+        )
+        assert property_created == property_expected
+
+    @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+    def test_delete_property(self, mock_get_conn):
+        property_expected = mock.MagicMock()
+        mock_delete_property = mock_get_conn.return_value.delete_property
+        mock_delete_property.return_value = property_expected
+        mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in 
range(3))
+
+        property_deleted = self.hook.delete_property(
+            property_id=TEST_PROPERTY_ID,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+        request = {"name": TEST_PROPERTY_NAME}
+        mock_delete_property.assert_called_once_with(
+            request=request,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+        assert property_deleted == property_expected
+
+    @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+    def test_create_data_stream(self, mock_get_conn):
+        data_stream_expected = mock.MagicMock()
+        mock_create_data_stream = mock_get_conn.return_value.create_data_stream
+        mock_create_data_stream.return_value = data_stream_expected
+        mock_data_stream, mock_retry, mock_timeout, mock_metadata = 
(mock.MagicMock() for _ in range(4))
+
+        data_stream_created = self.hook.create_data_stream(
+            property_id=TEST_PROPERTY_ID,
+            data_stream=mock_data_stream,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+
+        request = {"parent": TEST_PROPERTY_NAME, "data_stream": 
mock_data_stream}
+        mock_create_data_stream.assert_called_once_with(
+            request=request, retry=mock_retry, timeout=mock_timeout, 
metadata=mock_metadata
+        )
+        assert data_stream_created == data_stream_expected
+
+    @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+    def test_delete_data_stream(self, mock_get_conn):
+        mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in 
range(3))
+
+        self.hook.delete_data_stream(
+            property_id=TEST_PROPERTY_ID,
+            data_stream_id=TEST_DATASTREAM_ID,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+
+        request = {"name": TEST_DATASTREAM_NAME}
+        mock_get_conn.return_value.delete_data_stream.assert_called_once_with(
+            request=request,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+
+    @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+    def test_list_ads_links(self, mock_get_conn):
+        mock_page_size, mock_page_token, mock_retry, mock_timeout, 
mock_metadata = (
+            mock.MagicMock() for _ in range(5)
+        )
+
+        self.hook.list_google_ads_links(
+            property_id=TEST_PROPERTY_ID,
+            page_size=mock_page_size,
+            page_token=mock_page_token,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+
+        request = {"parent": TEST_PROPERTY_NAME, "page_size": mock_page_size, 
"page_token": mock_page_token}
+        
mock_get_conn.return_value.list_google_ads_links.assert_called_once_with(
+            request=request, retry=mock_retry, timeout=mock_timeout, 
metadata=mock_metadata
+        )
diff --git a/tests/providers/google/marketing_platform/links/__init__.py 
b/tests/providers/google/marketing_platform/links/__init__.py
new file mode 100644
index 0000000000..217e5db960
--- /dev/null
+++ b/tests/providers/google/marketing_platform/links/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/tests/providers/google/marketing_platform/links/test_analytics_admin.py 
b/tests/providers/google/marketing_platform/links/test_analytics_admin.py
new file mode 100644
index 0000000000..bb015c9be2
--- /dev/null
+++ b/tests/providers/google/marketing_platform/links/test_analytics_admin.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from airflow.providers.google.marketing_platform.links.analytics_admin import (
+    BASE_LINK,
+    GoogleAnalyticsPropertyLink,
+)
+
+TEST_PROPERTY_ID = "123456789"
+TEST_PROJECT_ID = "test_project"
+TEST_CONF_GOOGLE_ADS_LINK = {"property_id": TEST_PROJECT_ID}
+ANALYTICS_LINKS_PATH = 
"airflow.providers.google.marketing_platform.links.analytics_admin"
+
+
+class TestGoogleAnalyticsPropertyLink:
+    @mock.patch(f"{ANALYTICS_LINKS_PATH}.XCom")
+    def test_get_link(self, mock_xcom):
+        mock_ti_key = mock.MagicMock()
+        mock_xcom.get_value.return_value = TEST_CONF_GOOGLE_ADS_LINK
+        url_expected = f"{BASE_LINK}#/p{TEST_PROJECT_ID}/"
+
+        link = GoogleAnalyticsPropertyLink()
+        url = link.get_link(operator=mock.MagicMock(), ti_key=mock_ti_key)
+
+        mock_xcom.get_value.assert_called_once_with(key=link.key, 
ti_key=mock_ti_key)
+        assert url == url_expected
+
+    @mock.patch(f"{ANALYTICS_LINKS_PATH}.XCom")
+    def test_get_link_not_found(self, mock_xcom):
+        mock_ti_key = mock.MagicMock()
+        mock_xcom.get_value.return_value = None
+
+        link = GoogleAnalyticsPropertyLink()
+        url = link.get_link(operator=mock.MagicMock(), ti_key=mock_ti_key)
+
+        mock_xcom.get_value.assert_called_once_with(key=link.key, 
ti_key=mock_ti_key)
+        assert url == ""
+
+    def test_persist(self):
+        mock_context = mock.MagicMock()
+        mock_task_instance = mock.MagicMock()
+
+        GoogleAnalyticsPropertyLink.persist(
+            context=mock_context,
+            task_instance=mock_task_instance,
+            property_id=TEST_PROPERTY_ID,
+        )
+
+        mock_task_instance.xcom_push.assert_called_once_with(
+            mock_context,
+            key=GoogleAnalyticsPropertyLink.key,
+            value={"property_id": TEST_PROPERTY_ID},
+        )
diff --git 
a/tests/providers/google/marketing_platform/operators/test_analytics_admin.py 
b/tests/providers/google/marketing_platform/operators/test_analytics_admin.py
new file mode 100644
index 0000000000..f292125140
--- /dev/null
+++ 
b/tests/providers/google/marketing_platform/operators/test_analytics_admin.py
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.exceptions import AirflowNotFoundException
+from airflow.providers.google.marketing_platform.operators.analytics_admin 
import (
+    GoogleAnalyticsAdminCreateDataStreamOperator,
+    GoogleAnalyticsAdminCreatePropertyOperator,
+    GoogleAnalyticsAdminDeleteDataStreamOperator,
+    GoogleAnalyticsAdminDeletePropertyOperator,
+    GoogleAnalyticsAdminGetGoogleAdsLinkOperator,
+    GoogleAnalyticsAdminListAccountsOperator,
+    GoogleAnalyticsAdminListGoogleAdsLinksOperator,
+)
+
+GCP_CONN_ID = "google_cloud_default"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+TEST_GA_GOOGLE_ADS_PROPERTY_ID = "123456789"
+TEST_GA_GOOGLE_ADS_LINK_ID = "987654321"
+TEST_GA_GOOGLE_ADS_LINK_NAME = (
+    
f"properties/{TEST_GA_GOOGLE_ADS_PROPERTY_ID}/googleAdsLinks/{TEST_GA_GOOGLE_ADS_LINK_ID}"
+)
+TEST_PROPERTY_ID = "123456789"
+TEST_PROPERTY_NAME = f"properties/{TEST_PROPERTY_ID}"
+TEST_DATASTREAM_ID = "987654321"
+TEST_DATASTREAM_NAME = 
f"properties/{TEST_PROPERTY_ID}/dataStreams/{TEST_DATASTREAM_ID}"
+ANALYTICS_PATH = 
"airflow.providers.google.marketing_platform.operators.analytics_admin"
+
+
+class TestGoogleAnalyticsAdminListAccountsOperator:
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+    @mock.patch(f"{ANALYTICS_PATH}.Account.to_dict")
+    def test_execute(self, account_to_dict_mock, hook_mock):
+        list_accounts_returned = (mock.MagicMock(), mock.MagicMock(), 
mock.MagicMock())
+        hook_mock.return_value.list_accounts.return_value = 
list_accounts_returned
+
+        list_accounts_serialized = [mock.MagicMock(), mock.MagicMock(), 
mock.MagicMock()]
+        account_to_dict_mock.side_effect = list_accounts_serialized
+
+        mock_page_size, mock_page_token, mock_show_deleted, mock_retry, 
mock_timeout, mock_metadata = (
+            mock.MagicMock() for _ in range(6)
+        )
+
+        retrieved_accounts_list = GoogleAnalyticsAdminListAccountsOperator(
+            task_id="test_task",
+            page_size=mock_page_size,
+            page_token=mock_page_token,
+            show_deleted=mock_show_deleted,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        ).execute(context=None)
+
+        hook_mock.assert_called_once()
+        hook_mock.return_value.list_accounts.assert_called_once_with(
+            page_size=mock_page_size,
+            page_token=mock_page_token,
+            show_deleted=mock_show_deleted,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+        account_to_dict_mock.assert_has_calls([mock.call(item) for item in 
list_accounts_returned])
+        assert retrieved_accounts_list == list_accounts_serialized
+
+
+class TestGoogleAnalyticsAdminCreatePropertyOperator:
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsPropertyLink")
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+    @mock.patch(f"{ANALYTICS_PATH}.Property.to_dict")
+    def test_execute(self, property_to_dict_mock, hook_mock, _):
+        property_returned = mock.MagicMock()
+        hook_mock.return_value.create_property.return_value = property_returned
+
+        property_serialized = mock.MagicMock()
+        property_to_dict_mock.return_value = property_serialized
+
+        mock_property, mock_retry, mock_timeout, mock_metadata = 
(mock.MagicMock() for _ in range(4))
+
+        property_created = GoogleAnalyticsAdminCreatePropertyOperator(
+            task_id="test_task",
+            analytics_property=mock_property,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        ).execute(context=None)
+
+        hook_mock.assert_called_once()
+        hook_mock.return_value.create_property.assert_called_once_with(
+            analytics_property=mock_property,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+        property_to_dict_mock.assert_called_once_with(property_returned)
+        assert property_created == property_serialized
+
+
+class TestGoogleAnalyticsAdminDeletePropertyOperator:
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+    @mock.patch(f"{ANALYTICS_PATH}.Property.to_dict")
+    def test_execute(self, property_to_dict_mock, hook_mock):
+        property_returned = mock.MagicMock()
+        hook_mock.return_value.delete_property.return_value = property_returned
+
+        property_serialized = mock.MagicMock()
+        property_to_dict_mock.return_value = property_serialized
+
+        mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in 
range(3))
+
+        property_deleted = GoogleAnalyticsAdminDeletePropertyOperator(
+            task_id="test_task",
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+            property_id=TEST_PROPERTY_ID,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        ).execute(context=None)
+
+        hook_mock.assert_called_once()
+        hook_mock.return_value.delete_property.assert_called_once_with(
+            property_id=TEST_PROPERTY_ID,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+        property_to_dict_mock.assert_called_once_with(property_returned)
+        assert property_deleted == property_serialized
+
+
+class TestGoogleAnalyticsAdminCreateDataStreamOperator:
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+    @mock.patch(f"{ANALYTICS_PATH}.DataStream.to_dict")
+    def test_execute(self, data_stream_to_dict_mock, hook_mock):
+        data_stream_returned = mock.MagicMock()
+        hook_mock.return_value.create_data_stream.return_value = 
data_stream_returned
+
+        data_stream_serialized = mock.MagicMock()
+        data_stream_to_dict_mock.return_value = data_stream_serialized
+
+        mock_parent, mock_data_stream, mock_retry, mock_timeout, mock_metadata 
= (
+            mock.MagicMock() for _ in range(5)
+        )
+
+        data_stream_created = GoogleAnalyticsAdminCreateDataStreamOperator(
+            task_id="test_task",
+            property_id=TEST_PROPERTY_ID,
+            data_stream=mock_data_stream,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        ).execute(context=None)
+
+        hook_mock.assert_called_once()
+        hook_mock.return_value.create_data_stream.assert_called_once_with(
+            property_id=TEST_PROPERTY_ID,
+            data_stream=mock_data_stream,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+        data_stream_to_dict_mock.assert_called_once_with(data_stream_returned)
+        assert data_stream_created == data_stream_serialized
+
+
+class TestGoogleAnalyticsAdminDeleteDataStreamOperator:
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+    def test_execute(self, hook_mock):
+        mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in 
range(3))
+
+        GoogleAnalyticsAdminDeleteDataStreamOperator(
+            task_id="test_task",
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+            property_id=TEST_PROPERTY_ID,
+            data_stream_id=TEST_DATASTREAM_ID,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        ).execute(context=None)
+
+        hook_mock.assert_called_once()
+        hook_mock.return_value.delete_data_stream.assert_called_once_with(
+            property_id=TEST_PROPERTY_ID,
+            data_stream_id=TEST_DATASTREAM_ID,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+
+
+class TestGoogleAnalyticsAdminListGoogleAdsLinksOperator:
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAdsLink.to_dict")
+    def test_execute(self, ads_link_to_dict_mock, hook_mock):
+        list_ads_links_returned = (mock.MagicMock(), mock.MagicMock(), 
mock.MagicMock())
+        hook_mock.return_value.list_google_ads_links.return_value = 
list_ads_links_returned
+
+        list_ads_links_serialized = [mock.MagicMock(), mock.MagicMock(), 
mock.MagicMock()]
+        ads_link_to_dict_mock.side_effect = list_ads_links_serialized
+
+        mock_page_size, mock_page_token, mock_show_deleted, mock_retry, 
mock_timeout, mock_metadata = (
+            mock.MagicMock() for _ in range(6)
+        )
+
+        retrieved_ads_links = GoogleAnalyticsAdminListGoogleAdsLinksOperator(
+            task_id="test_task",
+            property_id=TEST_PROPERTY_ID,
+            page_size=mock_page_size,
+            page_token=mock_page_token,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        ).execute(context=None)
+
+        hook_mock.assert_called_once()
+        hook_mock.return_value.list_google_ads_links.assert_called_once_with(
+            property_id=TEST_PROPERTY_ID,
+            page_size=mock_page_size,
+            page_token=mock_page_token,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+        ads_link_to_dict_mock.assert_has_calls([mock.call(item) for item in 
list_ads_links_returned])
+        assert retrieved_ads_links == list_ads_links_serialized
+
+
+class TestGoogleAnalyticsAdminGetGoogleAdsLinkOperator:
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAdsLink")
+    def test_execute(self, mock_google_ads_link, hook_mock):
+        mock_ad_link = mock.MagicMock()
+        mock_ad_link.name = TEST_GA_GOOGLE_ADS_LINK_NAME
+        list_ads_links = hook_mock.return_value.list_google_ads_links
+        list_ads_links.return_value = [mock_ad_link]
+        mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in 
range(3))
+
+        GoogleAnalyticsAdminGetGoogleAdsLinkOperator(
+            task_id="test_task",
+            property_id=TEST_GA_GOOGLE_ADS_PROPERTY_ID,
+            google_ads_link_id=TEST_GA_GOOGLE_ADS_LINK_ID,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        ).execute(context=None)
+
+        hook_mock.assert_called_once()
+        hook_mock.return_value.list_google_ads_links.assert_called_once_with(
+            property_id=TEST_PROPERTY_ID,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
+        mock_google_ads_link.to_dict.assert_called_once_with(mock_ad_link)
+
+    @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+    def test_execute_not_found(self, hook_mock):
+        list_ads_links = hook_mock.return_value.list_google_ads_links
+        list_ads_links.return_value = []
+        mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in 
range(3))
+
+        with pytest.raises(AirflowNotFoundException):
+            GoogleAnalyticsAdminGetGoogleAdsLinkOperator(
+                task_id="test_task",
+                gcp_conn_id=GCP_CONN_ID,
+                impersonation_chain=IMPERSONATION_CHAIN,
+                property_id=TEST_GA_GOOGLE_ADS_PROPERTY_ID,
+                google_ads_link_id=TEST_GA_GOOGLE_ADS_LINK_ID,
+                retry=mock_retry,
+                timeout=mock_timeout,
+                metadata=mock_metadata,
+            ).execute(context=None)
+
+        hook_mock.assert_called_once()
+        hook_mock.return_value.list_google_ads_links.assert_called_once_with(
+            property_id=TEST_PROPERTY_ID,
+            retry=mock_retry,
+            timeout=mock_timeout,
+            metadata=mock_metadata,
+        )
diff --git 
a/tests/system/providers/google/marketing_platform/example_analytics_admin.py 
b/tests/system/providers/google/marketing_platform/example_analytics_admin.py
new file mode 100644
index 0000000000..58c5b65795
--- /dev/null
+++ 
b/tests/system/providers/google/marketing_platform/example_analytics_admin.py
@@ -0,0 +1,203 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that shows how to use Google Analytics (GA4) Admin 
Operators.
+
+This DAG relies on the following OS environment variables
+
+* GA_ACCOUNT_ID - Google Analytics account id.
+* GA_GOOGLE_ADS_PROPERTY_ID - Google Analytics property's id associated with 
Google Ads Link.
+
+In order to run this test, make sure you followed steps:
+1. Login to https://analytics.google.com
+2. In the settings section create an account and save its ID in the variable 
GA_ACCOUNT_ID.
+3. In the settings section go to the Property access management page and add 
your service account email with
+Editor permissions. This service account should be created on behalf of the 
account from the step 1.
+4. Make sure Google Analytics Admin API is enabled in your GCP project.
+5. Create Google Ads account and link it to your Google Analytics account in 
the GA admin panel.
+6. Associate the Google Ads account with a property, and save this property's 
id in the variable
+GA_GOOGLE_ADS_PROPERTY_ID.
+"""
+from __future__ import annotations
+
+import json
+import logging
+import os
+from datetime import datetime
+
+from google.analytics import admin_v1beta as google_analytics
+
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.marketing_platform.operators.analytics_admin 
import (
+    GoogleAnalyticsAdminCreateDataStreamOperator,
+    GoogleAnalyticsAdminCreatePropertyOperator,
+    GoogleAnalyticsAdminDeleteDataStreamOperator,
+    GoogleAnalyticsAdminDeletePropertyOperator,
+    GoogleAnalyticsAdminGetGoogleAdsLinkOperator,
+    GoogleAnalyticsAdminListAccountsOperator,
+    GoogleAnalyticsAdminListGoogleAdsLinksOperator,
+)
+from airflow.settings import Session
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_google_analytics_admin"
+
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
+ACCOUNT_ID = os.environ.get("GA_ACCOUNT_ID", "123456789")
+PROPERTY_ID = "{{ 
task_instance.xcom_pull('create_property')['name'].split('/')[-1] }}"
+DATA_STREAM_ID = "{{ 
task_instance.xcom_pull('create_data_stream')['name'].split('/')[-1] }}"
+GA_GOOGLE_ADS_PROPERTY_ID = os.environ.get("GA_GOOGLE_ADS_PROPERTY_ID", 
"123456789")
+GA_ADS_LINK_ID = "{{ 
task_instance.xcom_pull('list_google_ads_links')[0]['name'].split('/')[-1] }}"
+
+log = logging.getLogger(__name__)
+
+with DAG(
+    DAG_ID,
+    schedule="@once",  # Override to match your needs,
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "analytics"],
+) as dag:
+
+    @task
+    def setup_connection(**kwargs) -> None:
+        connection = Connection(
+            conn_id=CONNECTION_ID,
+            conn_type="google_cloud_platform",
+        )
+        conn_extra_json = json.dumps(
+            {
+                "scope": "https://www.googleapis.com/auth/analytics.edit,";
+                "https://www.googleapis.com/auth/analytics.readonly";,
+            }
+        )
+        connection.set_extra(conn_extra_json)
+
+        session = Session()
+        if session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID).first():
+            log.warning("Connection %s already exists", CONNECTION_ID)
+            return None
+
+        session.add(connection)
+        session.commit()
+
+    setup_connection_task = setup_connection()
+
+    # [START howto_marketing_platform_list_accounts_operator]
+    list_accounts = GoogleAnalyticsAdminListAccountsOperator(
+        task_id="list_account",
+        gcp_conn_id=CONNECTION_ID,
+        show_deleted=True,
+    )
+    # [END howto_marketing_platform_list_accounts_operator]
+
+    # [START howto_marketing_platform_create_property_operator]
+    create_property = GoogleAnalyticsAdminCreatePropertyOperator(
+        task_id="create_property",
+        analytics_property={
+            "parent": f"accounts/{ACCOUNT_ID}",
+            "display_name": "Test display name",
+            "time_zone": "America/Los_Angeles",
+        },
+        gcp_conn_id=CONNECTION_ID,
+    )
+    # [END howto_marketing_platform_create_property_operator]
+
+    # [START howto_marketing_platform_create_data_stream_operator]
+    create_data_stream = GoogleAnalyticsAdminCreateDataStreamOperator(
+        task_id="create_data_stream",
+        property_id=PROPERTY_ID,
+        data_stream={
+            "display_name": "Test data stream",
+            "web_stream_data": {
+                "default_uri": "www.example.com",
+            },
+            "type_": 
google_analytics.DataStream.DataStreamType.WEB_DATA_STREAM,
+        },
+        gcp_conn_id=CONNECTION_ID,
+    )
+    # [END howto_marketing_platform_create_data_stream_operator]
+
+    # [START howto_marketing_platform_delete_data_stream_operator]
+    delete_data_stream = GoogleAnalyticsAdminDeleteDataStreamOperator(
+        task_id="delete_datastream",
+        property_id=PROPERTY_ID,
+        data_stream_id=DATA_STREAM_ID,
+        gcp_conn_id=CONNECTION_ID,
+    )
+    # [END howto_marketing_platform_delete_data_stream_operator]
+
+    # [START howto_marketing_platform_delete_property_operator]
+    delete_property = GoogleAnalyticsAdminDeletePropertyOperator(
+        task_id="delete_property",
+        property_id=PROPERTY_ID,
+        gcp_conn_id=CONNECTION_ID,
+    )
+    # [END howto_marketing_platform_delete_property_operator]
+    delete_property.trigger_rule = TriggerRule.ALL_DONE
+
+    # [START howto_marketing_platform_list_google_ads_links]
+    list_google_ads_links = GoogleAnalyticsAdminListGoogleAdsLinksOperator(
+        task_id="list_google_ads_links",
+        property_id=GA_GOOGLE_ADS_PROPERTY_ID,
+        gcp_conn_id=CONNECTION_ID,
+    )
+    # [END howto_marketing_platform_list_google_ads_links]
+
+    # [START howto_marketing_platform_get_google_ad_link]
+    get_ad_link = GoogleAnalyticsAdminGetGoogleAdsLinkOperator(
+        task_id="get_ad_link",
+        property_id=GA_GOOGLE_ADS_PROPERTY_ID,
+        google_ads_link_id=GA_ADS_LINK_ID,
+        gcp_conn_id=CONNECTION_ID,
+    )
+    # [END howto_marketing_platform_get_google_ad_link]
+
+    delete_connection = BashOperator(
+        task_id="delete_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    (
+        # TEST SETUP
+        setup_connection_task
+        # TEST BODY
+        >> list_accounts
+        >> create_property
+        >> create_data_stream
+        >> delete_data_stream
+        >> delete_property
+        >> list_google_ads_links
+        >> get_ad_link
+        # TEST TEARDOWN
+        >> delete_connection
+    )
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)


Reply via email to