This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f28643b7bd Implement Google Analytics Admin (GA4) operators (#36276)
f28643b7bd is described below
commit f28643b7bdc90a61ec5bd12f8505772cd8c3bf7f
Author: max <[email protected]>
AuthorDate: Wed Jan 3 19:22:22 2024 +0100
Implement Google Analytics Admin (GA4) operators (#36276)
---
.github/workflows/ci.yml | 2 +-
.../google/marketing_platform/hooks/analytics.py | 9 +
.../marketing_platform/hooks/analytics_admin.py | 234 +++++++++
.../google/marketing_platform/links/__init__.py | 16 +
.../marketing_platform/links/analytics_admin.py | 65 +++
.../marketing_platform/operators/analytics.py | 53 ++
.../operators/analytics_admin.py | 579 +++++++++++++++++++++
airflow/providers/google/provider.yaml | 14 +
.../operators/marketing_platform/analytics.rst | 5 +
.../marketing_platform/analytics_admin.rst | 151 ++++++
generated/provider_dependencies.json | 1 +
.../hooks/test_analytics_admin.py | 192 +++++++
.../google/marketing_platform/links/__init__.py | 17 +
.../links/test_analytics_admin.py | 71 +++
.../operators/test_analytics_admin.py | 310 +++++++++++
.../marketing_platform/example_analytics_admin.py | 203 ++++++++
16 files changed, 1921 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 7e52be67b2..a622e74111 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -668,7 +668,7 @@ jobs:
run: aws s3 sync --delete ./files/documentation
s3://apache-airflow-docs
spellcheck-docs:
- timeout-minutes: 60
+ timeout-minutes: 120
name: "Spellcheck docs"
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
diff --git a/airflow/providers/google/marketing_platform/hooks/analytics.py
b/airflow/providers/google/marketing_platform/hooks/analytics.py
index 05df51c2c1..ec98ec2829 100644
--- a/airflow/providers/google/marketing_platform/hooks/analytics.py
+++ b/airflow/providers/google/marketing_platform/hooks/analytics.py
@@ -17,11 +17,13 @@
# under the License.
from __future__ import annotations
+import warnings
from typing import Any
from googleapiclient.discovery import Resource, build
from googleapiclient.http import MediaFileUpload
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
@@ -30,6 +32,13 @@ class GoogleAnalyticsHook(GoogleBaseHook):
def __init__(self, api_version: str = "v3", *args, **kwargs):
super().__init__(*args, **kwargs)
+ warnings.warn(
+ f"The `{type(self).__name__}` class is deprecated, please use "
+ f"`GoogleAnalyticsAdminHook` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
+
self.api_version = api_version
self._conn = None
diff --git
a/airflow/providers/google/marketing_platform/hooks/analytics_admin.py
b/airflow/providers/google/marketing_platform/hooks/analytics_admin.py
new file mode 100644
index 0000000000..cff9e0e409
--- /dev/null
+++ b/airflow/providers/google/marketing_platform/hooks/analytics_admin.py
@@ -0,0 +1,234 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Hooks for Google Analytics (GA4) Admin service.
+
+.. spelling:word-list::
+
+ DataStream
+ ListAccountsPager
+ ListGoogleAdsLinksPager
+"""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Sequence
+
+from google.analytics.admin_v1beta import (
+ AnalyticsAdminServiceClient,
+ DataStream,
+ Property,
+)
+from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+
+from airflow.providers.google.common.consts import CLIENT_INFO
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+if TYPE_CHECKING:
+ from google.analytics.admin_v1beta.services.analytics_admin_service.pagers
import (
+ ListAccountsPager,
+ ListGoogleAdsLinksPager,
+ )
+ from google.api_core.retry import Retry
+
+
+class GoogleAnalyticsAdminHook(GoogleBaseHook):
+ """Hook for Google Analytics 4 (GA4) Admin API."""
+
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
+ self._conn: AnalyticsAdminServiceClient | None = None
+
+ def get_conn(self) -> AnalyticsAdminServiceClient:
+ if not self._conn:
+ self._conn = AnalyticsAdminServiceClient(
+ credentials=self.get_credentials(), client_info=CLIENT_INFO
+ )
+ return self._conn
+
+ def list_accounts(
+ self,
+ page_size: int | None = None,
+ page_token: str | None = None,
+ show_deleted: bool | None = None,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> ListAccountsPager:
+ """Get list of accounts in Google Analytics.
+
+ .. seealso::
+ For more details please check the client library documentation:
+
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/accounts/list
+
+ :param page_size: Optional, number of results to return in the list.
+ :param page_token: Optional. The next_page_token value returned from a
previous List request, if any.
+ :param show_deleted: Optional. Whether to include soft-deleted (ie:
"trashed") Accounts in the results.
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+
+ :returns: List of Google Analytics accounts.
+ """
+ request = {"page_size": page_size, "page_token": page_token,
"show_deleted": show_deleted}
+ client = self.get_conn()
+ return client.list_accounts(request=request, retry=retry,
timeout=timeout, metadata=metadata)
+
+ def create_property(
+ self,
+ analytics_property: Property | dict,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Property:
+ """Create Google Analytics property.
+
+ .. seealso::
+ For more details please check the client library documentation:
+
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties/create
+
+ :param analytics_property: The property to create. Note: the supplied
property must specify its
+ parent.
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+
+ :returns: Created Google Analytics property.
+ """
+ client = self.get_conn()
+ return client.create_property(
+ request={"property": analytics_property},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ def delete_property(
+ self,
+ property_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Property:
+ """Soft delete Google Analytics property.
+
+ .. seealso::
+ For more details please check the client library documentation:
+
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties/delete
+
+ :param property_id: ID of the Property to soft-delete. Format:
properties/{property_id}.
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+
+ :returns: Resource message representing Google Analytics property.
+ """
+ client = self.get_conn()
+ request = {"name": f"properties/{property_id}"}
+ return client.delete_property(request=request, retry=retry,
timeout=timeout, metadata=metadata)
+
+ def create_data_stream(
+ self,
+ property_id: str,
+ data_stream: DataStream | dict,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> DataStream:
+ """Create Google Analytics data stream.
+
+ .. seealso::
+ For more details please check the client library documentation:
+
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties.dataStreams/create
+
+ :param property_id: ID of the parent property for the data stream.
+ :param data_stream: The data stream to create.
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+
+ :returns: Created Google Analytics data stream.
+ """
+ client = self.get_conn()
+ return client.create_data_stream(
+ request={"parent": f"properties/{property_id}", "data_stream":
data_stream},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ def delete_data_stream(
+ self,
+ property_id: str,
+ data_stream_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> None:
+ """Delete Google Analytics data stream.
+
+ .. seealso::
+ For more details please check the client library documentation:
+
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties.dataStreams/delete
+
+ :param property_id: ID of the parent property for the data stream.
+ :param data_stream_id: The data stream id to delete.
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+ """
+ client = self.get_conn()
+ return client.delete_data_stream(
+ request={"name":
f"properties/{property_id}/dataStreams/{data_stream_id}"},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ def list_google_ads_links(
+ self,
+ property_id: str,
+ page_size: int | None = None,
+ page_token: str | None = None,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> ListGoogleAdsLinksPager:
+ """Get list of Google Ads links.
+
+ .. seealso::
+ For more details please check the client library documentation:
+
https://googleapis.dev/python/analyticsadmin/latest/admin_v1beta/analytics_admin_service.html#google.analytics.admin_v1beta.services.analytics_admin_service.AnalyticsAdminServiceAsyncClient.list_google_ads_links
+
+ :param property_id: ID of the parent property.
+ :param page_size: Optional, number of results to return in the list.
+ :param page_token: Optional. The next_page_token value returned from a
previous List request, if any.
+ :param retry: Optional, a retry object used to retry requests. If
`None` is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+
+ :returns: List of Google Analytics accounts.
+ """
+ client = self.get_conn()
+ request = {"parent": f"properties/{property_id}", "page_size":
page_size, "page_token": page_token}
+ return client.list_google_ads_links(request=request, retry=retry,
timeout=timeout, metadata=metadata)
diff --git a/airflow/providers/google/marketing_platform/links/__init__.py
b/airflow/providers/google/marketing_platform/links/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/google/marketing_platform/links/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/airflow/providers/google/marketing_platform/links/analytics_admin.py
b/airflow/providers/google/marketing_platform/links/analytics_admin.py
new file mode 100644
index 0000000000..3ab79c8804
--- /dev/null
+++ b/airflow/providers/google/marketing_platform/links/analytics_admin.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, ClassVar
+
+from airflow.models import BaseOperator, BaseOperatorLink, XCom
+
+if TYPE_CHECKING:
+ from airflow.models.taskinstancekey import TaskInstanceKey
+ from airflow.utils.context import Context
+
+
+BASE_LINK = "https://analytics.google.com/analytics/web/"
+
+
+class GoogleAnalyticsBaseLink(BaseOperatorLink):
+ """Base class for Google Analytics links.
+
+ :meta private:
+ """
+
+ name: ClassVar[str]
+ key: ClassVar[str]
+ format_str: ClassVar[str]
+
+ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) ->
str:
+ if conf := XCom.get_value(key=self.key, ti_key=ti_key):
+ res = BASE_LINK + "#/" + self.format_str.format(**conf)
+ return res
+ return ""
+
+
+class GoogleAnalyticsPropertyLink(GoogleAnalyticsBaseLink):
+ """Helper class for constructing Google Analytics Property Link."""
+
+ name = "Data Analytics Property"
+ key = "data_analytics_property"
+ format_str = "p{property_id}/"
+
+ @staticmethod
+ def persist(
+ context: Context,
+ task_instance: BaseOperator,
+ property_id: str,
+ ):
+ task_instance.xcom_push(
+ context,
+ key=GoogleAnalyticsPropertyLink.key,
+ value={"property_id": property_id},
+ )
diff --git a/airflow/providers/google/marketing_platform/operators/analytics.py
b/airflow/providers/google/marketing_platform/operators/analytics.py
index 0098980e9b..d987b62591 100644
--- a/airflow/providers/google/marketing_platform/operators/analytics.py
+++ b/airflow/providers/google/marketing_platform/operators/analytics.py
@@ -19,9 +19,11 @@
from __future__ import annotations
import csv
+import warnings
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Any, Sequence
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.marketing_platform.hooks.analytics import
GoogleAnalyticsHook
@@ -34,6 +36,10 @@ class GoogleAnalyticsListAccountsOperator(BaseOperator):
"""
Lists all accounts to which the user has access.
+ .. seealso::
+ This operator is deprecated, please use
+
:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListAccountsOperator`:
+
.. seealso::
Check official API docs:
https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/accounts/list
@@ -70,6 +76,13 @@ class GoogleAnalyticsListAccountsOperator(BaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
+ warnings.warn(
+ f"The `{type(self).__name__}` operator is deprecated, please use "
+ f"`GoogleAnalyticsAdminListAccountsOperator` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
+
super().__init__(**kwargs)
self.api_version = api_version
@@ -90,6 +103,10 @@ class GoogleAnalyticsGetAdsLinkOperator(BaseOperator):
"""
Returns a web property-Google Ads link to which the user has access.
+ .. seealso::
+ This operator is deprecated, please use
+
:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminGetGoogleAdsLinkOperator`:
+
.. seealso::
Check official API docs:
https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/webPropertyAdWordsLinks/get
@@ -132,6 +149,12 @@ class GoogleAnalyticsGetAdsLinkOperator(BaseOperator):
**kwargs,
):
super().__init__(**kwargs)
+ warnings.warn(
+ f"The `{type(self).__name__}` operator is deprecated, please use "
+ f"`GoogleAnalyticsAdminGetGoogleAdsLinkOperator` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
self.account_id = account_id
self.web_property_ad_words_link_id = web_property_ad_words_link_id
@@ -158,6 +181,10 @@ class
GoogleAnalyticsRetrieveAdsLinksListOperator(BaseOperator):
"""
Lists webProperty-Google Ads links for a given web property.
+ .. seealso::
+ This operator is deprecated, please use
+
:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListGoogleAdsLinksOperator`:
+
.. seealso::
Check official API docs:
https://developers.google.com/analytics/devguides/config/mgmt/v3/mgmtReference/management/webPropertyAdWordsLinks/list#http-request
@@ -197,6 +224,12 @@ class
GoogleAnalyticsRetrieveAdsLinksListOperator(BaseOperator):
**kwargs,
) -> None:
super().__init__(**kwargs)
+ warnings.warn(
+ f"The `{type(self).__name__}` operator is deprecated, please use "
+ f"`GoogleAnalyticsAdminListGoogleAdsLinksOperator` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
self.account_id = account_id
self.web_property_id = web_property_id
@@ -221,6 +254,10 @@ class
GoogleAnalyticsDataImportUploadOperator(BaseOperator):
"""
Take a file from Cloud Storage and uploads it to GA via data import API.
+ .. seealso::
+ This operator is deprecated, please use
+
:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreateDataStreamOperator`:
+
:param storage_bucket: The Google cloud storage bucket where the file is
stored.
:param storage_name_object: The name of the object in the desired Google
cloud
storage bucket. (templated) If the destination points to an existing
@@ -266,6 +303,12 @@ class
GoogleAnalyticsDataImportUploadOperator(BaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
+ warnings.warn(
+ f"The `{type(self).__name__}` operator is deprecated, please use "
+ f"`GoogleAnalyticsAdminCreateDataStreamOperator` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
super().__init__(**kwargs)
self.storage_bucket = storage_bucket
self.storage_name_object = storage_name_object
@@ -317,6 +360,10 @@ class
GoogleAnalyticsDeletePreviousDataUploadsOperator(BaseOperator):
"""
Deletes previous GA uploads to leave the latest file to control the size
of the Data Set Quota.
+ .. seealso::
+ This operator is deprecated, please use
+
:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeleteDataStreamOperator`:
+
:param account_id: The GA account Id (long) to which the data upload
belongs.
:param web_property_id: The web property UA-string associated with the
upload.
:param custom_data_source_id: The id to which the data import belongs.
@@ -348,6 +395,12 @@ class
GoogleAnalyticsDeletePreviousDataUploadsOperator(BaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
+ warnings.warn(
+ f"The `{type(self).__name__}` operator is deprecated, please use "
+ f"`GoogleAnalyticsAdminDeleteDataStreamOperator` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=1,
+ )
super().__init__(**kwargs)
self.account_id = account_id
diff --git
a/airflow/providers/google/marketing_platform/operators/analytics_admin.py
b/airflow/providers/google/marketing_platform/operators/analytics_admin.py
new file mode 100644
index 0000000000..d961630f86
--- /dev/null
+++ b/airflow/providers/google/marketing_platform/operators/analytics_admin.py
@@ -0,0 +1,579 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Analytics 4 (GA4) operators."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Sequence
+
+from google.analytics.admin_v1beta import (
+ Account,
+ DataStream,
+ GoogleAdsLink,
+ Property,
+)
+from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+
+from airflow.exceptions import AirflowNotFoundException
+from airflow.providers.google.cloud.operators.cloud_base import
GoogleCloudBaseOperator
+from airflow.providers.google.marketing_platform.hooks.analytics_admin import
GoogleAnalyticsAdminHook
+from airflow.providers.google.marketing_platform.links.analytics_admin import
GoogleAnalyticsPropertyLink
+
+if TYPE_CHECKING:
+ from google.api_core.retry import Retry
+ from google.protobuf.message import Message
+
+ from airflow.utils.context import Context
+
+
+class GoogleAnalyticsAdminListAccountsOperator(GoogleCloudBaseOperator):
+ """
+ Lists all accounts to which the user has access.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleAnalyticsAdminListAccountsOperator`
+
+ :param page_size: Optional, number of results to return in the list.
+ :param page_token: Optional. The next_page_token value returned from a
previous List request, if any.
+ :param show_deleted: Optional. Whether to include soft-deleted (ie:
"trashed") Accounts in the results.
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "gcp_conn_id",
+ "impersonation_chain",
+ "page_size",
+ "page_token",
+ )
+
+ def __init__(
+ self,
+ *,
+ page_size: int | None = None,
+ page_token: str | None = None,
+ show_deleted: bool | None = None,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.page_size = page_size
+ self.page_token = page_token
+ self.show_deleted = show_deleted
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(
+ self,
+ context: Context,
+ ) -> Sequence[Message]:
+ hook = GoogleAnalyticsAdminHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info(
+ "Requesting list of Google Analytics accounts. "
+ f"Page size: {self.page_size}, page token: {self.page_token}"
+ )
+ accounts = hook.list_accounts(
+ page_size=self.page_size,
+ page_token=self.page_token,
+ show_deleted=self.show_deleted,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ accounts_list: Sequence[Message] = [Account.to_dict(item) for item in
accounts]
+ n = len(accounts_list)
+ self.log.info("Successful request. Retrieved %s item%s.", n, "s" if n
> 1 else "")
+ return accounts_list
+
+
+class GoogleAnalyticsAdminCreatePropertyOperator(GoogleCloudBaseOperator):
+ """
+ Creates property.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleAnalyticsAdminCreatePropertyOperator`
+
+ :param analytics_property: The property to create. Note: the supplied
property must specify its parent.
+ For more details see:
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties#Property
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "gcp_conn_id",
+ "impersonation_chain",
+ "analytics_property",
+ )
+ operator_extra_links = (GoogleAnalyticsPropertyLink(),)
+
+ def __init__(
+ self,
+ *,
+ analytics_property: Property | dict[str, Any],
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.analytics_property = analytics_property
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(
+ self,
+ context: Context,
+ ) -> Message:
+ hook = GoogleAnalyticsAdminHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Creating a Google Analytics property.")
+ prop = hook.create_property(
+ analytics_property=self.analytics_property,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ self.log.info("The Google Analytics property %s was created
successfully.", prop.name)
+ GoogleAnalyticsPropertyLink.persist(
+ context=context,
+ task_instance=self,
+ property_id=prop.name.lstrip("properties/"),
+ )
+
+ return Property.to_dict(prop)
+
+
+class GoogleAnalyticsAdminDeletePropertyOperator(GoogleCloudBaseOperator):
+ """
+ Soft-delete property.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleAnalyticsAdminDeletePropertyOperator`
+
+ :param property_id: The id of the Property to soft-delete.
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "gcp_conn_id",
+ "impersonation_chain",
+ "property_id",
+ )
+
+ def __init__(
+ self,
+ *,
+ property_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.property_id = property_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(
+ self,
+ context: Context,
+ ) -> Message:
+ hook = GoogleAnalyticsAdminHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Deleting a Google Analytics property.")
+ prop = hook.delete_property(
+ property_id=self.property_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ self.log.info("The Google Analytics property %s was soft-deleted
successfully.", prop.name)
+ return Property.to_dict(prop)
+
+
+class GoogleAnalyticsAdminCreateDataStreamOperator(GoogleCloudBaseOperator):
+ """
+ Creates Data stream.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleAnalyticsAdminCreateDataStreamOperator`
+
+ :param property_id: ID of the parent property for the data stream.
+ :param data_stream: The data stream to create.
+ For more details see:
https://developers.google.com/analytics/devguides/config/admin/v1/rest/v1beta/properties.dataStreams#DataStream
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "gcp_conn_id",
+ "impersonation_chain",
+ "property_id",
+ "data_stream",
+ )
+
+ def __init__(
+ self,
+ *,
+ property_id: str,
+ data_stream: DataStream | dict,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.property_id = property_id
+ self.data_stream = data_stream
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(
+ self,
+ context: Context,
+ ) -> Message:
+ hook = GoogleAnalyticsAdminHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Creating a Google Analytics data stream.")
+ data_stream = hook.create_data_stream(
+ property_id=self.property_id,
+ data_stream=self.data_stream,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ self.log.info("The Google Analytics data stream %s was created
successfully.", data_stream.name)
+ return DataStream.to_dict(data_stream)
+
+
+class GoogleAnalyticsAdminDeleteDataStreamOperator(GoogleCloudBaseOperator):
+ """
+ Deletes Data stream.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleAnalyticsAdminDeleteDataStreamOperator`
+
+ :param property_id: ID of the property which is parent for the data stream.
+ :param data_stream_id: ID of the data stream to delete.
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "gcp_conn_id",
+ "impersonation_chain",
+ "property_id",
+ "data_stream_id",
+ )
+
+ def __init__(
+ self,
+ *,
+ property_id: str,
+ data_stream_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.property_id = property_id
+ self.data_stream_id = data_stream_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(
+ self,
+ context: Context,
+ ) -> None:
+ hook = GoogleAnalyticsAdminHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Deleting a Google Analytics data stream (id %s).",
self.data_stream_id)
+ hook.delete_data_stream(
+ property_id=self.property_id,
+ data_stream_id=self.data_stream_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ self.log.info("The Google Analytics data stream was deleted
successfully.")
+ return None
+
+
+class GoogleAnalyticsAdminListGoogleAdsLinksOperator(GoogleCloudBaseOperator):
+ """
+ Lists all Google Ads links associated with a given property.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleAnalyticsAdminListGoogleAdsLinksOperator`
+
+ :param property_id: ID of the parent property.
+ :param page_size: Optional, number of results to return in the list.
+ :param page_token: Optional. The next_page_token value returned from a
previous List request, if any.
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "gcp_conn_id",
+ "impersonation_chain",
+ "property_id",
+ "page_size",
+ "page_token",
+ )
+
+ def __init__(
+ self,
+ *,
+ property_id: str,
+ page_size: int | None = None,
+ page_token: str | None = None,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.property_id = property_id
+ self.page_size = page_size
+ self.page_token = page_token
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(
+ self,
+ context: Context,
+ ) -> Sequence[Message]:
+ hook = GoogleAnalyticsAdminHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info(
+ "Requesting list of Google Ads links accounts for the property_id
%s, "
+ "page size %s, page token %s",
+ self.property_id,
+ self.page_size,
+ self.page_token,
+ )
+ google_ads_links = hook.list_google_ads_links(
+ property_id=self.property_id,
+ page_size=self.page_size,
+ page_token=self.page_token,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ ads_links_list: Sequence[Message] = [GoogleAdsLink.to_dict(item) for
item in google_ads_links]
+ n = len(ads_links_list)
+ self.log.info("Successful request. Retrieved %s item%s.", n, "s" if n
> 1 else "")
+ return ads_links_list
+
+
+class GoogleAnalyticsAdminGetGoogleAdsLinkOperator(GoogleCloudBaseOperator):
+ """
+ Gets a Google Ads link associated with a given property.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleAnalyticsAdminGetGoogleAdsLinkOperator`
+
+ :param property_id: Parent property id.
+ :param google_ads_link_id: Google Ads link id.
+ :param retry: Optional, a retry object used to retry requests. If `None`
is specified, requests
+ will not be retried.
+ :param timeout: Optional. The timeout for this request.
+ :param metadata: Optional. Strings which should be sent along with the
request as metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional. Service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "gcp_conn_id",
+ "impersonation_chain",
+ "google_ads_link_id",
+ )
+
+ def __init__(
+ self,
+ *,
+ property_id: str,
+ google_ads_link_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.property_id = property_id
+ self.google_ads_link_id = google_ads_link_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(
+ self,
+ context: Context,
+ ) -> Message:
+ hook = GoogleAnalyticsAdminHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info(
+ "Requesting the Google Ads link with id %s for the property_id %s",
+ self.google_ads_link_id,
+ self.property_id,
+ )
+ ads_links = hook.list_google_ads_links(
+ property_id=self.property_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ find_link = (item for item in ads_links if item.name.split("/")[-1] ==
self.google_ads_link_id)
+ if ads_link := next(find_link, None):
+ self.log.info("Successful request.")
+ return GoogleAdsLink.to_dict(ads_link)
+ raise AirflowNotFoundException(
+ f"Google Ads Link with id {self.google_ads_link_id} and property
id {self.property_id} not found"
+ )
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index d7aca84d81..ed9c35637f 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -91,6 +91,7 @@ dependencies:
- gcloud-aio-storage>=9.0.0
- gcsfs>=2023.10.0
- google-ads>=22.1.0
+ - google-analytics-admin
- google-api-core>=2.11.0
- google-api-python-client>=1.6.0
- google-auth>=1.0.0
@@ -145,6 +146,12 @@ dependencies:
- sqlalchemy-spanner>=1.6.2
integrations:
+ - integration-name: Google Analytics (GA4)
+ external-doc-url: https://analytics.google.com/
+ logo: /integration-logos/gcp/Google-Analytics.png
+ how-to-guide:
+ -
/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst
+ tags: [gmp]
- integration-name: Google Analytics360
external-doc-url: https://analytics.google.com/
logo: /integration-logos/gcp/Google-Analytics.png
@@ -605,6 +612,9 @@ operators:
- integration-name: Google Cloud Firestore
python-modules:
- airflow.providers.google.firebase.operators.firestore
+ - integration-name: Google Analytics (GA4)
+ python-modules:
+ - airflow.providers.google.marketing_platform.operators.analytics_admin
- integration-name: Google Analytics360
python-modules:
- airflow.providers.google.marketing_platform.operators.analytics
@@ -849,6 +859,9 @@ hooks:
- integration-name: Google Cloud Firestore
python-modules:
- airflow.providers.google.firebase.hooks.firestore
+ - integration-name: Google Analytics (GA4)
+ python-modules:
+ - airflow.providers.google.marketing_platform.hooks.analytics_admin
- integration-name: Google Analytics360
python-modules:
- airflow.providers.google.marketing_platform.hooks.analytics
@@ -1203,6 +1216,7 @@ extra-links:
-
airflow.providers.google.cloud.links.mlengine.MLEngineModelVersionDetailsLink
- airflow.providers.google.common.links.storage.StorageLink
- airflow.providers.google.common.links.storage.FileDetailsLink
+ -
airflow.providers.google.marketing_platform.links.analytics_admin.GoogleAnalyticsPropertyLink
additional-extras:
- name: apache.beam
diff --git
a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst
b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst
index 9b99c18295..765b6a6063 100644
---
a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst
+++
b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics.rst
@@ -22,6 +22,11 @@ Google Analytics 360 operators allow you to lists all
accounts to which the user
For more information about the Google Analytics 360 API check
`official documentation
<https://developers.google.com/analytics/devguides/config/mgmt/v3>`__.
+Please note that the Google Analytics 360 API is replaced by
+`Google Analytics 4
<https://developers.google.com/analytics/devguides/config/admin/v1>`__ and
+`will be turned down on July 1, 2024
<https://support.google.com/analytics/answer/11583528>`__.
+Thus consider using new :doc:`Google Analytics (GA4) Admin Operators
</operators/marketing_platform/analytics_admin>`.
+
Prerequisite Tasks
^^^^^^^^^^^^^^^^^^
diff --git
a/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst
b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst
new file mode 100644
index 0000000000..b4aab19897
--- /dev/null
+++
b/docs/apache-airflow-providers-google/operators/marketing_platform/analytics_admin.rst
@@ -0,0 +1,151 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Google Analytics (GA4) Admin Operators
+======================================
+
+Google Analytics (GA4) Admin operators allow you to lists all accounts to
which the user has access.
+For more information about the Google Analytics 360 API check
+`official documentation
<https://developers.google.com/analytics/devguides/config/admin/v1>`__.
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: /operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:GoogleAnalyticsAdminListAccountsOperator:
+
+List the Accounts
+^^^^^^^^^^^^^^^^^
+
+To list accounts from Analytics you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListAccountsOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_marketing_platform_list_accounts_operator]
+ :end-before: [END howto_marketing_platform_list_accounts_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListAccountsOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminCreatePropertyOperator:
+
+Create Property
+^^^^^^^^^^^^^^^
+
+Creates a property.
+To create a property you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreatePropertyOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_marketing_platform_create_property_operator]
+ :end-before: [END howto_marketing_platform_create_property_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreatePropertyOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminDeletePropertyOperator:
+
+Delete Property
+^^^^^^^^^^^^^^^
+
+Deletes a property.
+To delete a property you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeletePropertyOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_marketing_platform_delete_property_operator]
+ :end-before: [END howto_marketing_platform_delete_property_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeletePropertyOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminCreateDataStreamOperator:
+
+Create Data stream
+^^^^^^^^^^^^^^^^^^
+
+Creates a data stream.
+To create a data stream you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreateDataStreamOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_marketing_platform_create_data_stream_operator]
+ :end-before: [END howto_marketing_platform_create_data_stream_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminCreateDataStreamOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminDeleteDataStreamOperator:
+
+Delete Data stream
+^^^^^^^^^^^^^^^^^^
+
+Deletes a data stream.
+To delete a data stream you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeleteDataStreamOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_marketing_platform_delete_data_stream_operator]
+ :end-before: [END howto_marketing_platform_delete_data_stream_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminDeleteDataStreamOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminListGoogleAdsLinksOperator:
+
+List Google Ads Links
+^^^^^^^^^^^^^^^^^^^^^
+
+To list Google Ads links you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListGoogleAdsLinksOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_marketing_platform_list_google_ads_links]
+ :end-before: [END howto_marketing_platform_list_google_ads_links]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminListGoogleAdsLinksOperator`
+
+.. _howto/operator:GoogleAnalyticsAdminGetGoogleAdsLinkOperator:
+
+Get the Google Ads link
+^^^^^^^^^^^^^^^^^^^^^^^
+
+To list Google Ads links you can use the
+:class:`~airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminGetGoogleAdsLinkOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_analytics_admin.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_marketing_platform_get_google_ad_link]
+ :end-before: [END howto_marketing_platform_get_google_ad_link]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.analytics_admin.GoogleAnalyticsAdminGetGoogleAdsLinkOperator`
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index c0131f8ca8..fe07b77f44 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -460,6 +460,7 @@
"gcloud-aio-storage>=9.0.0",
"gcsfs>=2023.10.0",
"google-ads>=22.1.0",
+ "google-analytics-admin",
"google-api-core>=2.11.0",
"google-api-python-client>=1.6.0",
"google-auth-httplib2>=0.0.1",
diff --git
a/tests/providers/google/marketing_platform/hooks/test_analytics_admin.py
b/tests/providers/google/marketing_platform/hooks/test_analytics_admin.py
new file mode 100644
index 0000000000..81a5210d49
--- /dev/null
+++ b/tests/providers/google/marketing_platform/hooks/test_analytics_admin.py
@@ -0,0 +1,192 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from airflow.providers.google.marketing_platform.hooks.analytics_admin import
GoogleAnalyticsAdminHook
+from tests.providers.google.cloud.utils.base_gcp_mock import
mock_base_gcp_hook_default_project_id
+
+GCP_CONN_ID = "test_gcp_conn_id"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+TEST_PROPERTY_ID = "123456789"
+TEST_PROPERTY_NAME = f"properties/{TEST_PROPERTY_ID}"
+TEST_DATASTREAM_ID = "987654321"
+TEST_DATASTREAM_NAME =
f"properties/{TEST_PROPERTY_ID}/dataStreams/{TEST_DATASTREAM_ID}"
+ANALYTICS_HOOK_PATH =
"airflow.providers.google.marketing_platform.hooks.analytics_admin"
+
+
+class TestGoogleAnalyticsAdminHook:
+ def setup_method(self):
+ with mock.patch(
+
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
+ new=mock_base_gcp_hook_default_project_id,
+ ):
+ self.hook = GoogleAnalyticsAdminHook(GCP_CONN_ID)
+
+
@mock.patch("airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__")
+ def test_init(self, mock_base_init):
+ GoogleAnalyticsAdminHook(
+ GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_base_init.assert_called_once_with(
+ GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ @mock.patch(f"{ANALYTICS_HOOK_PATH}.CLIENT_INFO")
+
@mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_credentials")
+ @mock.patch(f"{ANALYTICS_HOOK_PATH}.AnalyticsAdminServiceClient")
+ def test_get_conn(self, mock_client, get_credentials, mock_client_info):
+ mock_credentials = mock.MagicMock()
+ get_credentials.return_value = mock_credentials
+
+ result = self.hook.get_conn()
+
+ mock_client.assert_called_once_with(credentials=mock_credentials,
client_info=mock_client_info)
+ assert self.hook._conn == result
+
+ @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+ def test_list_accounts(self, mock_get_conn):
+ list_accounts_expected = mock.MagicMock()
+ mock_list_accounts = mock_get_conn.return_value.list_accounts
+ mock_list_accounts.return_value = list_accounts_expected
+ mock_page_size, mock_page_token, mock_show_deleted, mock_retry,
mock_timeout, mock_metadata = (
+ mock.MagicMock() for _ in range(6)
+ )
+
+ request = {
+ "page_size": mock_page_size,
+ "page_token": mock_page_token,
+ "show_deleted": mock_show_deleted,
+ }
+
+ list_accounts_received = self.hook.list_accounts(
+ page_size=mock_page_size,
+ page_token=mock_page_token,
+ show_deleted=mock_show_deleted,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+ mock_list_accounts.assert_called_once_with(
+ request=request, retry=mock_retry, timeout=mock_timeout,
metadata=mock_metadata
+ )
+ assert list_accounts_received == list_accounts_expected
+
+ @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+ def test_create_property(self, mock_get_conn):
+ property_expected = mock.MagicMock()
+
+ mock_create_property = mock_get_conn.return_value.create_property
+ mock_create_property.return_value = property_expected
+ mock_property, mock_retry, mock_timeout, mock_metadata =
(mock.MagicMock() for _ in range(4))
+
+ property_created = self.hook.create_property(
+ analytics_property=mock_property, retry=mock_retry,
timeout=mock_timeout, metadata=mock_metadata
+ )
+
+ request = {"property": mock_property}
+ mock_create_property.assert_called_once_with(
+ request=request, retry=mock_retry, timeout=mock_timeout,
metadata=mock_metadata
+ )
+ assert property_created == property_expected
+
+ @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+ def test_delete_property(self, mock_get_conn):
+ property_expected = mock.MagicMock()
+ mock_delete_property = mock_get_conn.return_value.delete_property
+ mock_delete_property.return_value = property_expected
+ mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in
range(3))
+
+ property_deleted = self.hook.delete_property(
+ property_id=TEST_PROPERTY_ID,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+ request = {"name": TEST_PROPERTY_NAME}
+ mock_delete_property.assert_called_once_with(
+ request=request,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+ assert property_deleted == property_expected
+
+ @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+ def test_create_data_stream(self, mock_get_conn):
+ data_stream_expected = mock.MagicMock()
+ mock_create_data_stream = mock_get_conn.return_value.create_data_stream
+ mock_create_data_stream.return_value = data_stream_expected
+ mock_data_stream, mock_retry, mock_timeout, mock_metadata =
(mock.MagicMock() for _ in range(4))
+
+ data_stream_created = self.hook.create_data_stream(
+ property_id=TEST_PROPERTY_ID,
+ data_stream=mock_data_stream,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+
+ request = {"parent": TEST_PROPERTY_NAME, "data_stream":
mock_data_stream}
+ mock_create_data_stream.assert_called_once_with(
+ request=request, retry=mock_retry, timeout=mock_timeout,
metadata=mock_metadata
+ )
+ assert data_stream_created == data_stream_expected
+
+ @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+ def test_delete_data_stream(self, mock_get_conn):
+ mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in
range(3))
+
+ self.hook.delete_data_stream(
+ property_id=TEST_PROPERTY_ID,
+ data_stream_id=TEST_DATASTREAM_ID,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+
+ request = {"name": TEST_DATASTREAM_NAME}
+ mock_get_conn.return_value.delete_data_stream.assert_called_once_with(
+ request=request,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+
+ @mock.patch(f"{ANALYTICS_HOOK_PATH}.GoogleAnalyticsAdminHook.get_conn")
+ def test_list_ads_links(self, mock_get_conn):
+ mock_page_size, mock_page_token, mock_retry, mock_timeout,
mock_metadata = (
+ mock.MagicMock() for _ in range(5)
+ )
+
+ self.hook.list_google_ads_links(
+ property_id=TEST_PROPERTY_ID,
+ page_size=mock_page_size,
+ page_token=mock_page_token,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+
+ request = {"parent": TEST_PROPERTY_NAME, "page_size": mock_page_size,
"page_token": mock_page_token}
+
mock_get_conn.return_value.list_google_ads_links.assert_called_once_with(
+ request=request, retry=mock_retry, timeout=mock_timeout,
metadata=mock_metadata
+ )
diff --git a/tests/providers/google/marketing_platform/links/__init__.py
b/tests/providers/google/marketing_platform/links/__init__.py
new file mode 100644
index 0000000000..217e5db960
--- /dev/null
+++ b/tests/providers/google/marketing_platform/links/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/tests/providers/google/marketing_platform/links/test_analytics_admin.py
b/tests/providers/google/marketing_platform/links/test_analytics_admin.py
new file mode 100644
index 0000000000..bb015c9be2
--- /dev/null
+++ b/tests/providers/google/marketing_platform/links/test_analytics_admin.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from airflow.providers.google.marketing_platform.links.analytics_admin import (
+ BASE_LINK,
+ GoogleAnalyticsPropertyLink,
+)
+
+TEST_PROPERTY_ID = "123456789"
+TEST_PROJECT_ID = "test_project"
+TEST_CONF_GOOGLE_ADS_LINK = {"property_id": TEST_PROJECT_ID}
+ANALYTICS_LINKS_PATH =
"airflow.providers.google.marketing_platform.links.analytics_admin"
+
+
+class TestGoogleAnalyticsPropertyLink:
+ @mock.patch(f"{ANALYTICS_LINKS_PATH}.XCom")
+ def test_get_link(self, mock_xcom):
+ mock_ti_key = mock.MagicMock()
+ mock_xcom.get_value.return_value = TEST_CONF_GOOGLE_ADS_LINK
+ url_expected = f"{BASE_LINK}#/p{TEST_PROJECT_ID}/"
+
+ link = GoogleAnalyticsPropertyLink()
+ url = link.get_link(operator=mock.MagicMock(), ti_key=mock_ti_key)
+
+ mock_xcom.get_value.assert_called_once_with(key=link.key,
ti_key=mock_ti_key)
+ assert url == url_expected
+
+ @mock.patch(f"{ANALYTICS_LINKS_PATH}.XCom")
+ def test_get_link_not_found(self, mock_xcom):
+ mock_ti_key = mock.MagicMock()
+ mock_xcom.get_value.return_value = None
+
+ link = GoogleAnalyticsPropertyLink()
+ url = link.get_link(operator=mock.MagicMock(), ti_key=mock_ti_key)
+
+ mock_xcom.get_value.assert_called_once_with(key=link.key,
ti_key=mock_ti_key)
+ assert url == ""
+
+ def test_persist(self):
+ mock_context = mock.MagicMock()
+ mock_task_instance = mock.MagicMock()
+
+ GoogleAnalyticsPropertyLink.persist(
+ context=mock_context,
+ task_instance=mock_task_instance,
+ property_id=TEST_PROPERTY_ID,
+ )
+
+ mock_task_instance.xcom_push.assert_called_once_with(
+ mock_context,
+ key=GoogleAnalyticsPropertyLink.key,
+ value={"property_id": TEST_PROPERTY_ID},
+ )
diff --git
a/tests/providers/google/marketing_platform/operators/test_analytics_admin.py
b/tests/providers/google/marketing_platform/operators/test_analytics_admin.py
new file mode 100644
index 0000000000..f292125140
--- /dev/null
+++
b/tests/providers/google/marketing_platform/operators/test_analytics_admin.py
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.exceptions import AirflowNotFoundException
+from airflow.providers.google.marketing_platform.operators.analytics_admin
import (
+ GoogleAnalyticsAdminCreateDataStreamOperator,
+ GoogleAnalyticsAdminCreatePropertyOperator,
+ GoogleAnalyticsAdminDeleteDataStreamOperator,
+ GoogleAnalyticsAdminDeletePropertyOperator,
+ GoogleAnalyticsAdminGetGoogleAdsLinkOperator,
+ GoogleAnalyticsAdminListAccountsOperator,
+ GoogleAnalyticsAdminListGoogleAdsLinksOperator,
+)
+
+GCP_CONN_ID = "google_cloud_default"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+TEST_GA_GOOGLE_ADS_PROPERTY_ID = "123456789"
+TEST_GA_GOOGLE_ADS_LINK_ID = "987654321"
+TEST_GA_GOOGLE_ADS_LINK_NAME = (
+
f"properties/{TEST_GA_GOOGLE_ADS_PROPERTY_ID}/googleAdsLinks/{TEST_GA_GOOGLE_ADS_LINK_ID}"
+)
+TEST_PROPERTY_ID = "123456789"
+TEST_PROPERTY_NAME = f"properties/{TEST_PROPERTY_ID}"
+TEST_DATASTREAM_ID = "987654321"
+TEST_DATASTREAM_NAME =
f"properties/{TEST_PROPERTY_ID}/dataStreams/{TEST_DATASTREAM_ID}"
+ANALYTICS_PATH =
"airflow.providers.google.marketing_platform.operators.analytics_admin"
+
+
+class TestGoogleAnalyticsAdminListAccountsOperator:
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+ @mock.patch(f"{ANALYTICS_PATH}.Account.to_dict")
+ def test_execute(self, account_to_dict_mock, hook_mock):
+ list_accounts_returned = (mock.MagicMock(), mock.MagicMock(),
mock.MagicMock())
+ hook_mock.return_value.list_accounts.return_value =
list_accounts_returned
+
+ list_accounts_serialized = [mock.MagicMock(), mock.MagicMock(),
mock.MagicMock()]
+ account_to_dict_mock.side_effect = list_accounts_serialized
+
+ mock_page_size, mock_page_token, mock_show_deleted, mock_retry,
mock_timeout, mock_metadata = (
+ mock.MagicMock() for _ in range(6)
+ )
+
+ retrieved_accounts_list = GoogleAnalyticsAdminListAccountsOperator(
+ task_id="test_task",
+ page_size=mock_page_size,
+ page_token=mock_page_token,
+ show_deleted=mock_show_deleted,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ ).execute(context=None)
+
+ hook_mock.assert_called_once()
+ hook_mock.return_value.list_accounts.assert_called_once_with(
+ page_size=mock_page_size,
+ page_token=mock_page_token,
+ show_deleted=mock_show_deleted,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+ account_to_dict_mock.assert_has_calls([mock.call(item) for item in
list_accounts_returned])
+ assert retrieved_accounts_list == list_accounts_serialized
+
+
+class TestGoogleAnalyticsAdminCreatePropertyOperator:
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsPropertyLink")
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+ @mock.patch(f"{ANALYTICS_PATH}.Property.to_dict")
+ def test_execute(self, property_to_dict_mock, hook_mock, _):
+ property_returned = mock.MagicMock()
+ hook_mock.return_value.create_property.return_value = property_returned
+
+ property_serialized = mock.MagicMock()
+ property_to_dict_mock.return_value = property_serialized
+
+ mock_property, mock_retry, mock_timeout, mock_metadata =
(mock.MagicMock() for _ in range(4))
+
+ property_created = GoogleAnalyticsAdminCreatePropertyOperator(
+ task_id="test_task",
+ analytics_property=mock_property,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ ).execute(context=None)
+
+ hook_mock.assert_called_once()
+ hook_mock.return_value.create_property.assert_called_once_with(
+ analytics_property=mock_property,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+ property_to_dict_mock.assert_called_once_with(property_returned)
+ assert property_created == property_serialized
+
+
+class TestGoogleAnalyticsAdminDeletePropertyOperator:
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+ @mock.patch(f"{ANALYTICS_PATH}.Property.to_dict")
+ def test_execute(self, property_to_dict_mock, hook_mock):
+ property_returned = mock.MagicMock()
+ hook_mock.return_value.delete_property.return_value = property_returned
+
+ property_serialized = mock.MagicMock()
+ property_to_dict_mock.return_value = property_serialized
+
+ mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in
range(3))
+
+ property_deleted = GoogleAnalyticsAdminDeletePropertyOperator(
+ task_id="test_task",
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ property_id=TEST_PROPERTY_ID,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ ).execute(context=None)
+
+ hook_mock.assert_called_once()
+ hook_mock.return_value.delete_property.assert_called_once_with(
+ property_id=TEST_PROPERTY_ID,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+ property_to_dict_mock.assert_called_once_with(property_returned)
+ assert property_deleted == property_serialized
+
+
+class TestGoogleAnalyticsAdminCreateDataStreamOperator:
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+ @mock.patch(f"{ANALYTICS_PATH}.DataStream.to_dict")
+ def test_execute(self, data_stream_to_dict_mock, hook_mock):
+ data_stream_returned = mock.MagicMock()
+ hook_mock.return_value.create_data_stream.return_value =
data_stream_returned
+
+ data_stream_serialized = mock.MagicMock()
+ data_stream_to_dict_mock.return_value = data_stream_serialized
+
+ mock_parent, mock_data_stream, mock_retry, mock_timeout, mock_metadata
= (
+ mock.MagicMock() for _ in range(5)
+ )
+
+ data_stream_created = GoogleAnalyticsAdminCreateDataStreamOperator(
+ task_id="test_task",
+ property_id=TEST_PROPERTY_ID,
+ data_stream=mock_data_stream,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ ).execute(context=None)
+
+ hook_mock.assert_called_once()
+ hook_mock.return_value.create_data_stream.assert_called_once_with(
+ property_id=TEST_PROPERTY_ID,
+ data_stream=mock_data_stream,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+ data_stream_to_dict_mock.assert_called_once_with(data_stream_returned)
+ assert data_stream_created == data_stream_serialized
+
+
+class TestGoogleAnalyticsAdminDeleteDataStreamOperator:
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+ def test_execute(self, hook_mock):
+ mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in
range(3))
+
+ GoogleAnalyticsAdminDeleteDataStreamOperator(
+ task_id="test_task",
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ property_id=TEST_PROPERTY_ID,
+ data_stream_id=TEST_DATASTREAM_ID,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ ).execute(context=None)
+
+ hook_mock.assert_called_once()
+ hook_mock.return_value.delete_data_stream.assert_called_once_with(
+ property_id=TEST_PROPERTY_ID,
+ data_stream_id=TEST_DATASTREAM_ID,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+
+
+class TestGoogleAnalyticsAdminListGoogleAdsLinksOperator:
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAdsLink.to_dict")
+ def test_execute(self, ads_link_to_dict_mock, hook_mock):
+ list_ads_links_returned = (mock.MagicMock(), mock.MagicMock(),
mock.MagicMock())
+ hook_mock.return_value.list_google_ads_links.return_value =
list_ads_links_returned
+
+ list_ads_links_serialized = [mock.MagicMock(), mock.MagicMock(),
mock.MagicMock()]
+ ads_link_to_dict_mock.side_effect = list_ads_links_serialized
+
+ mock_page_size, mock_page_token, mock_show_deleted, mock_retry,
mock_timeout, mock_metadata = (
+ mock.MagicMock() for _ in range(6)
+ )
+
+ retrieved_ads_links = GoogleAnalyticsAdminListGoogleAdsLinksOperator(
+ task_id="test_task",
+ property_id=TEST_PROPERTY_ID,
+ page_size=mock_page_size,
+ page_token=mock_page_token,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ ).execute(context=None)
+
+ hook_mock.assert_called_once()
+ hook_mock.return_value.list_google_ads_links.assert_called_once_with(
+ property_id=TEST_PROPERTY_ID,
+ page_size=mock_page_size,
+ page_token=mock_page_token,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+ ads_link_to_dict_mock.assert_has_calls([mock.call(item) for item in
list_ads_links_returned])
+ assert retrieved_ads_links == list_ads_links_serialized
+
+
+class TestGoogleAnalyticsAdminGetGoogleAdsLinkOperator:
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAdsLink")
+ def test_execute(self, mock_google_ads_link, hook_mock):
+ mock_ad_link = mock.MagicMock()
+ mock_ad_link.name = TEST_GA_GOOGLE_ADS_LINK_NAME
+ list_ads_links = hook_mock.return_value.list_google_ads_links
+ list_ads_links.return_value = [mock_ad_link]
+ mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in
range(3))
+
+ GoogleAnalyticsAdminGetGoogleAdsLinkOperator(
+ task_id="test_task",
+ property_id=TEST_GA_GOOGLE_ADS_PROPERTY_ID,
+ google_ads_link_id=TEST_GA_GOOGLE_ADS_LINK_ID,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ ).execute(context=None)
+
+ hook_mock.assert_called_once()
+ hook_mock.return_value.list_google_ads_links.assert_called_once_with(
+ property_id=TEST_PROPERTY_ID,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
+ mock_google_ads_link.to_dict.assert_called_once_with(mock_ad_link)
+
+ @mock.patch(f"{ANALYTICS_PATH}.GoogleAnalyticsAdminHook")
+ def test_execute_not_found(self, hook_mock):
+ list_ads_links = hook_mock.return_value.list_google_ads_links
+ list_ads_links.return_value = []
+ mock_retry, mock_timeout, mock_metadata = (mock.MagicMock() for _ in
range(3))
+
+ with pytest.raises(AirflowNotFoundException):
+ GoogleAnalyticsAdminGetGoogleAdsLinkOperator(
+ task_id="test_task",
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ property_id=TEST_GA_GOOGLE_ADS_PROPERTY_ID,
+ google_ads_link_id=TEST_GA_GOOGLE_ADS_LINK_ID,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ ).execute(context=None)
+
+ hook_mock.assert_called_once()
+ hook_mock.return_value.list_google_ads_links.assert_called_once_with(
+ property_id=TEST_PROPERTY_ID,
+ retry=mock_retry,
+ timeout=mock_timeout,
+ metadata=mock_metadata,
+ )
diff --git
a/tests/system/providers/google/marketing_platform/example_analytics_admin.py
b/tests/system/providers/google/marketing_platform/example_analytics_admin.py
new file mode 100644
index 0000000000..58c5b65795
--- /dev/null
+++
b/tests/system/providers/google/marketing_platform/example_analytics_admin.py
@@ -0,0 +1,203 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that shows how to use Google Analytics (GA4) Admin
Operators.
+
+This DAG relies on the following OS environment variables
+
+* GA_ACCOUNT_ID - Google Analytics account id.
+* GA_GOOGLE_ADS_PROPERTY_ID - Google Analytics property's id associated with
Google Ads Link.
+
+In order to run this test, make sure you followed steps:
+1. Login to https://analytics.google.com
+2. In the settings section create an account and save its ID in the variable
GA_ACCOUNT_ID.
+3. In the settings section go to the Property access management page and add
your service account email with
+Editor permissions. This service account should be created on behalf of the
account from the step 1.
+4. Make sure Google Analytics Admin API is enabled in your GCP project.
+5. Create Google Ads account and link it to your Google Analytics account in
the GA admin panel.
+6. Associate the Google Ads account with a property, and save this property's
id in the variable
+GA_GOOGLE_ADS_PROPERTY_ID.
+"""
+from __future__ import annotations
+
+import json
+import logging
+import os
+from datetime import datetime
+
+from google.analytics import admin_v1beta as google_analytics
+
+from airflow.decorators import task
+from airflow.models import Connection
+from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.marketing_platform.operators.analytics_admin
import (
+ GoogleAnalyticsAdminCreateDataStreamOperator,
+ GoogleAnalyticsAdminCreatePropertyOperator,
+ GoogleAnalyticsAdminDeleteDataStreamOperator,
+ GoogleAnalyticsAdminDeletePropertyOperator,
+ GoogleAnalyticsAdminGetGoogleAdsLinkOperator,
+ GoogleAnalyticsAdminListAccountsOperator,
+ GoogleAnalyticsAdminListGoogleAdsLinksOperator,
+)
+from airflow.settings import Session
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = "example_google_analytics_admin"
+
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
+ACCOUNT_ID = os.environ.get("GA_ACCOUNT_ID", "123456789")
+PROPERTY_ID = "{{
task_instance.xcom_pull('create_property')['name'].split('/')[-1] }}"
+DATA_STREAM_ID = "{{
task_instance.xcom_pull('create_data_stream')['name'].split('/')[-1] }}"
+GA_GOOGLE_ADS_PROPERTY_ID = os.environ.get("GA_GOOGLE_ADS_PROPERTY_ID",
"123456789")
+GA_ADS_LINK_ID = "{{
task_instance.xcom_pull('list_google_ads_links')[0]['name'].split('/')[-1] }}"
+
+log = logging.getLogger(__name__)
+
+with DAG(
+ DAG_ID,
+ schedule="@once", # Override to match your needs,
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "analytics"],
+) as dag:
+
+ @task
+ def setup_connection(**kwargs) -> None:
+ connection = Connection(
+ conn_id=CONNECTION_ID,
+ conn_type="google_cloud_platform",
+ )
+ conn_extra_json = json.dumps(
+ {
+ "scope": "https://www.googleapis.com/auth/analytics.edit,"
+ "https://www.googleapis.com/auth/analytics.readonly",
+ }
+ )
+ connection.set_extra(conn_extra_json)
+
+ session = Session()
+ if session.query(Connection).filter(Connection.conn_id ==
CONNECTION_ID).first():
+ log.warning("Connection %s already exists", CONNECTION_ID)
+ return None
+
+ session.add(connection)
+ session.commit()
+
+ setup_connection_task = setup_connection()
+
+ # [START howto_marketing_platform_list_accounts_operator]
+ list_accounts = GoogleAnalyticsAdminListAccountsOperator(
+ task_id="list_account",
+ gcp_conn_id=CONNECTION_ID,
+ show_deleted=True,
+ )
+ # [END howto_marketing_platform_list_accounts_operator]
+
+ # [START howto_marketing_platform_create_property_operator]
+ create_property = GoogleAnalyticsAdminCreatePropertyOperator(
+ task_id="create_property",
+ analytics_property={
+ "parent": f"accounts/{ACCOUNT_ID}",
+ "display_name": "Test display name",
+ "time_zone": "America/Los_Angeles",
+ },
+ gcp_conn_id=CONNECTION_ID,
+ )
+ # [END howto_marketing_platform_create_property_operator]
+
+ # [START howto_marketing_platform_create_data_stream_operator]
+ create_data_stream = GoogleAnalyticsAdminCreateDataStreamOperator(
+ task_id="create_data_stream",
+ property_id=PROPERTY_ID,
+ data_stream={
+ "display_name": "Test data stream",
+ "web_stream_data": {
+ "default_uri": "www.example.com",
+ },
+ "type_":
google_analytics.DataStream.DataStreamType.WEB_DATA_STREAM,
+ },
+ gcp_conn_id=CONNECTION_ID,
+ )
+ # [END howto_marketing_platform_create_data_stream_operator]
+
+ # [START howto_marketing_platform_delete_data_stream_operator]
+ delete_data_stream = GoogleAnalyticsAdminDeleteDataStreamOperator(
+ task_id="delete_datastream",
+ property_id=PROPERTY_ID,
+ data_stream_id=DATA_STREAM_ID,
+ gcp_conn_id=CONNECTION_ID,
+ )
+ # [END howto_marketing_platform_delete_data_stream_operator]
+
+ # [START howto_marketing_platform_delete_property_operator]
+ delete_property = GoogleAnalyticsAdminDeletePropertyOperator(
+ task_id="delete_property",
+ property_id=PROPERTY_ID,
+ gcp_conn_id=CONNECTION_ID,
+ )
+ # [END howto_marketing_platform_delete_property_operator]
+ delete_property.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_marketing_platform_list_google_ads_links]
+ list_google_ads_links = GoogleAnalyticsAdminListGoogleAdsLinksOperator(
+ task_id="list_google_ads_links",
+ property_id=GA_GOOGLE_ADS_PROPERTY_ID,
+ gcp_conn_id=CONNECTION_ID,
+ )
+ # [END howto_marketing_platform_list_google_ads_links]
+
+ # [START howto_marketing_platform_get_google_ad_link]
+ get_ad_link = GoogleAnalyticsAdminGetGoogleAdsLinkOperator(
+ task_id="get_ad_link",
+ property_id=GA_GOOGLE_ADS_PROPERTY_ID,
+ google_ads_link_id=GA_ADS_LINK_ID,
+ gcp_conn_id=CONNECTION_ID,
+ )
+ # [END howto_marketing_platform_get_google_ad_link]
+
+ delete_connection = BashOperator(
+ task_id="delete_connection",
+ bash_command=f"airflow connections delete {CONNECTION_ID}",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ (
+ # TEST SETUP
+ setup_connection_task
+ # TEST BODY
+ >> list_accounts
+ >> create_property
+ >> create_data_stream
+ >> delete_data_stream
+ >> delete_property
+ >> list_google_ads_links
+ >> get_ad_link
+ # TEST TEARDOWN
+ >> delete_connection
+ )
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)