This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 567926535a Add new Google Search 360 Reporting Operators (#42255)
567926535a is described below
commit 567926535a719f7ea93565e3fee3f2fbd9a03cb9
Author: M. Olcay Tercanlı <[email protected]>
AuthorDate: Thu Sep 19 07:36:06 2024 +0200
Add new Google Search 360 Reporting Operators (#42255)
* Implement new Google Search 360 operators
Remove the decommissioned operators
Co-authored-by: Elad Kalif <[email protected]>
---------
Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
---
airflow/providers/google/CHANGELOG.rst | 12 +
.../google/marketing_platform/hooks/search_ads.py | 199 +++++++++--
.../marketing_platform/operators/search_ads.py | 373 ++++++++++++---------
.../marketing_platform/sensors/search_ads.py | 92 -----
airflow/providers/google/provider.yaml | 3 -
.../operators/marketing_platform/search_ads.rst | 87 +++--
tests/always/test_project_structure.py | 2 +-
.../marketing_platform/hooks/test_search_ads.py | 175 ++++++++--
.../operators/test_search_ads.py | 240 +++++++------
.../marketing_platform/sensors/test_search_ads.py | 45 ---
.../marketing_platform/example_search_ads.py | 94 +++---
11 files changed, 770 insertions(+), 552 deletions(-)
diff --git a/airflow/providers/google/CHANGELOG.rst
b/airflow/providers/google/CHANGELOG.rst
index 64ae3b4e91..ff54591fd6 100644
--- a/airflow/providers/google/CHANGELOG.rst
+++ b/airflow/providers/google/CHANGELOG.rst
@@ -27,6 +27,18 @@
Changelog
---------
+Main
+.......
+
+.. warning::
+ The previous Search Ads 360 Reporting API
<https://developers.google.com/search-ads/v2/how-tos/reporting>
+ (which is currently in use in google-provider) was already decommissioned on
June 30, 2024
+ (see details <https://developers.google.com/search-ads/v2/migration>).
+ All new reporting development should use the new Search Ads 360 Reporting
API.
+ Currently, the Reporting operators, sensors and hooks are failing due to the
decommission.
+ The new API is not a replacement for the old one, it has a different
approach and endpoints.
+ Therefore, new operators implemented for the new API.
+
10.22.0
.......
diff --git a/airflow/providers/google/marketing_platform/hooks/search_ads.py
b/airflow/providers/google/marketing_platform/hooks/search_ads.py
index 85cc25c458..d57a89a821 100644
--- a/airflow/providers/google/marketing_platform/hooks/search_ads.py
+++ b/airflow/providers/google/marketing_platform/hooks/search_ads.py
@@ -19,73 +19,212 @@
from __future__ import annotations
-from typing import Any, Sequence
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Sequence
+from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
+from airflow.exceptions import AirflowException
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+if TYPE_CHECKING:
+ from googleapiclient.discovery import Resource
-class GoogleSearchAdsHook(GoogleBaseHook):
- """Hook for Google Search Ads 360."""
+
+class GoogleSearchAdsReportingHook(GoogleBaseHook):
+ """Hook for the Google Search Ads 360 Reporting API."""
_conn: build | None = None
+ default_api_version: str = "v0"
def __init__(
self,
- api_version: str = "v2",
- gcp_conn_id: str = "google_cloud_default",
- delegate_to: str | None = None,
- impersonation_chain: str | Sequence[str] | None = None,
+ api_version: str | None = None,
+ gcp_conn_id: str = "google_search_ads_default",
) -> None:
super().__init__(
gcp_conn_id=gcp_conn_id,
- delegate_to=delegate_to,
- impersonation_chain=impersonation_chain,
)
- self.api_version = api_version
+ self.api_version = api_version or self.default_api_version
- def get_conn(self):
- """Retrieve connection to Google SearchAds."""
+ def _get_config(self) -> None:
+ """
+ Set up Google Search Ads config from Connection.
+
+ This pulls the connections from db, and uses it to set up
+ ``google_search_ads_client``.
+ """
+ conn = self.get_connection(self.gcp_conn_id)
+ if "google_search_ads_client" not in conn.extra_dejson:
+ raise AirflowException("google_search_ads_client not found in
extra field")
+
+ self.google_search_ads_config =
conn.extra_dejson["google_search_ads_client"]
+
+ def get_credentials(self) -> Credentials:
+ """Return the credential instance for search ads."""
+ self._get_config()
+ self.logger().info(f"Credential configuration:
{self.google_search_ads_config}")
+ return Credentials(**self.google_search_ads_config)
+
+ def get_conn(self) -> Resource:
if not self._conn:
- http_authorized = self._authorize()
+ creds = self.get_credentials()
+
self._conn = build(
- "doubleclicksearch",
+ "searchads360",
self.api_version,
- http=http_authorized,
+ credentials=creds,
cache_discovery=False,
)
return self._conn
- def insert_report(self, report: dict[str, Any]) -> Any:
+ @cached_property
+ def customer_service(self):
+ return self.get_conn().customers()
+
+ @cached_property
+ def fields_service(self):
+ return self.get_conn().searchAds360Fields()
+
+ def search(
+ self,
+ customer_id: str,
+ query: str,
+ page_token: str | None = None,
+ page_size: int = 10000,
+ return_total_results_count: bool = False,
+ summary_row_setting: str | None = None,
+ validate_only: bool = False,
+ ):
"""
- Insert a report request into the reporting system.
+ Search and download the report. Use pagination to download entire
report.
- :param report: Report to be generated.
+ :param customer_id: The ID of the customer being queried.
+ :param query: The query to execute.
+ :param page_token: Token of the page to retrieve. If not specified,
the first page of results will be
+ returned. Use the value obtained from `next_page_token` in the
previous response
+ in order to request the next page of results.
+ :param page_size: Number of elements to retrieve in a single page.
When too large a page is requested,
+ the server may decide to further limit the number of returned
resources.
+ Default is 10000.
+ :param return_total_results_count: If true, the total number of
results that match the query ignoring
+ the LIMIT clause will be included in the response. Default is
false.
+ :param summary_row_setting: Determines whether a summary row will be
returned. By default,
+ summary row is not returned. If requested, the summary row will be
sent
+ in a response by itself after all others query results are
returned.
+ :param validate_only: If true, the request is validated but not
executed. Default is false.
"""
- response =
self.get_conn().reports().request(body=report).execute(num_retries=self.num_retries)
+ params: dict[str, Any] = {
+ "query": query,
+ "pageSize": page_size,
+ "returnTotalResultsCount": return_total_results_count,
+ "validateOnly": validate_only,
+ }
+ if page_token is not None:
+ params.update({"pageToken": page_token})
+ if summary_row_setting is not None:
+ params.update({"summaryRowSetting": summary_row_setting})
+
+ response = (
+ self.customer_service.searchAds360()
+ .search(customerId=customer_id, body=params)
+ .execute(num_retries=self.num_retries)
+ )
+ self.log.info("Search response: %s", response)
return response
- def get(self, report_id: str) -> Any:
+ def get_custom_column(self, customer_id: str, custom_column_id: str):
"""
- Poll for the status of a report request.
+ Retrieve the requested custom column in full detail.
- :param report_id: ID of the report request being polled.
+ :param customer_id: The customer id
+ :param custom_column_id: The custom column id
"""
- response =
self.get_conn().reports().get(reportId=report_id).execute(num_retries=self.num_retries)
+ resource_name =
f"customers/{customer_id}/customColumns/{custom_column_id}"
+ response = (
+ self.customer_service.customColumns()
+ .get(resourceName=resource_name)
+ .execute(num_retries=self.num_retries)
+ )
+ self.log.info("Retrieved custom column: %s", response)
return response
- def get_file(self, report_fragment: int, report_id: str) -> Any:
+ def list_custom_columns(self, customer_id: str):
"""
- Download a report file encoded in UTF-8.
+ Retrieve all the custom columns associated with the customer in full
detail.
- :param report_fragment: The index of the report fragment to download.
- :param report_id: ID of the report.
+ :param customer_id: The customer id
"""
response = (
- self.get_conn()
- .reports()
- .getFile(reportFragment=report_fragment, reportId=report_id)
+ self.customer_service.customColumns()
+ .list(customerId=customer_id)
.execute(num_retries=self.num_retries)
)
+ self.log.info("Listing the custom columns: %s", response)
+ return response
+
+ def get_field(self, field_name: str):
+ """
+ Retrieve the requested field details.
+
+ :param field_name: The name of the field.
+ """
+ resource_name = f"searchAds360Fields/{field_name}"
+ response =
self.fields_service.get(resourceName=resource_name).execute(num_retries=self.num_retries)
+ self.log.info("Retrieved field: %s", response)
return response
+
+ def search_fields(self, query: str, page_token: str | None = None,
page_size: int | None = 10000):
+ """
+ Retrieve all the fields that match with the given search.
+
+ :param query: The query string to execute.
+ :param page_token: Token of the page to retrieve. If not specified,
the first page of results will be
+ returned. Use the value obtained from `next_page_token` in the
previous response
+ in order to request the next page of results.
+ :param page_size: Number of elements to retrieve in a single page.
When too large a page is requested,
+ the server may decide to further limit the number of returned
resources.
+ Default 10000.
+ """
+ params: dict[str, Any] = {
+ "query": query,
+ "pageSize": page_size,
+ }
+ if page_token is not None:
+ params.update({"pageToken": page_token})
+ response =
self.fields_service.search(body=params).execute(num_retries=self.num_retries)
+ self.log.info("Retrieved fields: %s", response)
+ return response
+
+
+class GoogleSearchAdsHook(GoogleBaseHook):
+ """Hook for Google Search Ads 360."""
+
+ _conn: build | None = None
+
+ def __init__(
+ self,
+ api_version: str = "v2",
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
+ self.api_version = api_version
+
+ def get_conn(self):
+ """Retrieve connection to Google SearchAds."""
+ if not self._conn:
+ http_authorized = self._authorize()
+ self._conn = build(
+ "doubleclicksearch",
+ self.api_version,
+ http=http_authorized,
+ cache_discovery=False,
+ )
+ return self._conn
diff --git
a/airflow/providers/google/marketing_platform/operators/search_ads.py
b/airflow/providers/google/marketing_platform/operators/search_ads.py
index 87b5b3f970..067c2a8946 100644
--- a/airflow/providers/google/marketing_platform/operators/search_ads.py
+++ b/airflow/providers/google/marketing_platform/operators/search_ads.py
@@ -19,217 +19,278 @@
from __future__ import annotations
-import json
-from tempfile import NamedTemporaryFile
+from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence
-from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.gcs import GCSHook
-from airflow.providers.google.marketing_platform.hooks.search_ads import
GoogleSearchAdsHook
+from airflow.providers.google.marketing_platform.hooks.search_ads import
GoogleSearchAdsReportingHook
if TYPE_CHECKING:
from airflow.utils.context import Context
-class GoogleSearchAdsInsertReportOperator(BaseOperator):
+class _GoogleSearchAdsBaseOperator(BaseOperator):
"""
- Inserts a report request into the reporting system.
+ Base class to use in NextGen operator.
+
+ :param api_version: The version of the API that will be requested for
example 'v0'.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ """
+
+ template_fields: Sequence[str] = (
+ "api_version",
+ "gcp_conn_id",
+ )
+
+ def __init__(
+ self,
+ *,
+ api_version: str = "v0",
+ gcp_conn_id: str = "google_search_ads_default",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.api_version = api_version
+ self.gcp_conn_id = gcp_conn_id
+
+ @cached_property
+ def hook(self):
+ return GoogleSearchAdsReportingHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ )
+
+
+class GoogleSearchAdsSearchOperator(_GoogleSearchAdsBaseOperator):
+ """
+ Search a report by query.
.. seealso:
For API documentation check:
- https://developers.google.com/search-ads/v2/reference/reports/request
+
https://developers.google.com/search-ads/reporting/api/reference/rest/v0/customers.searchAds360/search
.. seealso::
For more information on how to use this operator, take a look at the
guide:
- :ref:`howto/operator:GoogleSearchAdsInsertReportOperator`
+ :ref:`howto/operator:GoogleSearchAdsSearchOperator`
- :param report: Report to be generated
- :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param customer_id: The ID of the customer being queried.
+ :param query: The query to execute.
+ :param page_token: Token of the page to retrieve. If not specified, the
first page of results will be
+ returned. Use the value obtained from `next_page_token` in the
previous response
+ in order to request the next page of results.
+ :param page_size: Number of elements to retrieve in a single page. When
too large a page is requested,
+ the server may decide to further limit the number of returned
resources.
+ Default is 10000.
+ :param return_total_results_count: If true, the total number of results
that match the query ignoring
+ the LIMIT clause will be included in the response. Default is false.
+ :param summary_row_setting: Determines whether a summary row will be
returned. By default,
+ summary row is not returned. If requested, the summary row will be sent
+ in a response by itself after all others query results are returned.
+ :param validate_only: If true, the request is validated but not executed.
Default is false.
:param gcp_conn_id: The connection ID to use when fetching connection info.
- :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
- if any. For this to work, the service account making the request must
have
- domain-wide delegation enabled.
- :param impersonation_chain: Optional service account to impersonate using
short-term
- credentials, or chained list of accounts required to get the
access_token
- of the last account in the list, which will be impersonated in the
request.
- If set as a string, the account must grant the originating account
- the Service Account Token Creator IAM role.
- If set as a sequence, the identities from the list must grant
- Service Account Token Creator IAM role to the directly preceding
identity, with first
- account from the list granting this role to the originating account
(templated).
+ :param api_version: The version of the API that will be requested for
example 'v0'.
"""
template_fields: Sequence[str] = (
- "report",
- "impersonation_chain",
+ *_GoogleSearchAdsBaseOperator.template_fields,
+ "page_token",
+ "page_size",
)
- template_ext: Sequence[str] = (".json",)
def __init__(
self,
*,
- report: dict[str, Any],
- api_version: str = "v2",
- gcp_conn_id: str = "google_cloud_default",
- delegate_to: str | None = None,
- impersonation_chain: str | Sequence[str] | None = None,
+ customer_id: str,
+ query: str,
+ page_token: str | None = None,
+ page_size: int = 10000,
+ return_total_results_count: bool = False,
+ summary_row_setting: str | None = None,
+ validate_only: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
- self.report = report
- self.api_version = api_version
- self.gcp_conn_id = gcp_conn_id
- self.delegate_to = delegate_to
- self.impersonation_chain = impersonation_chain
-
- def prepare_template(self) -> None:
- # If .json is passed then we have to read the file
- if isinstance(self.report, str) and self.report.endswith(".json"):
- with open(self.report) as file:
- self.report = json.load(file)
+ self.customer_id = customer_id
+ self.query = query
+ self.page_token = page_token
+ self.page_size = page_size
+ self.return_total_results_count = return_total_results_count
+ self.summary_row_setting = summary_row_setting
+ self.validate_only = validate_only
def execute(self, context: Context):
- hook = GoogleSearchAdsHook(
- gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to,
- api_version=self.api_version,
- impersonation_chain=self.impersonation_chain,
+ self.log.info("Querying Search Ads")
+ response = self.hook.search(
+ customer_id=self.customer_id,
+ query=self.query,
+ page_size=self.page_size,
+ page_token=self.page_token,
+ return_total_results_count=self.return_total_results_count,
+ summary_row_setting=self.summary_row_setting,
+ validate_only=self.validate_only,
)
- self.log.info("Generating Search Ads report")
- response = hook.insert_report(report=self.report)
- report_id = response.get("id")
- self.xcom_push(context, key="report_id", value=report_id)
- self.log.info("Report generated, id: %s", report_id)
+ self.log.info("Query result: %s", response)
+ return response
+
+
+class GoogleSearchAdsGetFieldOperator(_GoogleSearchAdsBaseOperator):
+ """
+ Retrieve metadata for a resource or a field.
+
+ .. seealso:
+ For API documentation check:
+
https://developers.google.com/search-ads/reporting/api/reference/rest/v0/searchAds360Fields/get
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleSearchAdsGetFieldOperator`
+
+ :param field_name: The name of the field.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param api_version: The version of the API that will be requested for
example 'v0'.
+ """
+
+ def __init__(
+ self,
+ *,
+ field_name: str,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.field_name = field_name
+
+ def execute(self, context: Context) -> Any:
+ self.log.info("Retrieving the metadata for the field '%s'",
self.field_name)
+ response = self.hook.get_field(field_name=self.field_name)
+ self.log.info("Retrieved field: %s", response["resourceName"])
return response
-class GoogleSearchAdsDownloadReportOperator(BaseOperator):
+class GoogleSearchAdsSearchFieldsOperator(_GoogleSearchAdsBaseOperator):
"""
- Downloads a report to GCS bucket.
+ Retrieve metadata for resource(s) or field(s) by the query syntax.
.. seealso:
For API documentation check:
- https://developers.google.com/search-ads/v2/reference/reports/getFile
+
https://developers.google.com/search-ads/reporting/api/reference/rest/v0/searchAds360Fields/search
.. seealso::
For more information on how to use this operator, take a look at the
guide:
- :ref:`howto/operator:GoogleSearchAdsGetfileReportOperator`
-
- :param report_id: ID of the report.
- :param bucket_name: The bucket to upload to.
- :param report_name: The report name to set when uploading the local file.
If not provided then
- report_id is used.
- :param gzip: Option to compress local file or file data for upload
- :param api_version: The version of the api that will be requested for
example 'v3'.
+ :ref:`howto/operator:GoogleSearchAdsSearchFieldsOperator`
+
+ :param query: The query string to execute.
+ :param page_token: Token of the page to retrieve. If not specified, the
first page of results will be
+ returned. Use the value obtained from `next_page_token` in the
previous response
+ in order to request the next page of results.
+ :param page_size: Number of elements to retrieve in a single page. When
too large a page is requested,
+ the server may decide to further limit the number of returned
resources.
+ Default 10000.
:param gcp_conn_id: The connection ID to use when fetching connection info.
- :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
- if any. For this to work, the service account making the request must
have
- domain-wide delegation enabled.
- :param impersonation_chain: Optional service account to impersonate using
short-term
- credentials, or chained list of accounts required to get the
access_token
- of the last account in the list, which will be impersonated in the
request.
- If set as a string, the account must grant the originating account
- the Service Account Token Creator IAM role.
- If set as a sequence, the identities from the list must grant
- Service Account Token Creator IAM role to the directly preceding
identity, with first
- account from the list granting this role to the originating account
(templated).
+ :param api_version: The version of the API that will be requested for
example 'v0'.
"""
template_fields: Sequence[str] = (
- "report_name",
- "report_id",
- "bucket_name",
- "impersonation_chain",
+ *_GoogleSearchAdsBaseOperator.template_fields,
+ "page_token",
+ "page_size",
)
def __init__(
self,
*,
- report_id: str,
- bucket_name: str,
- report_name: str | None = None,
- gzip: bool = True,
- chunk_size: int = 10 * 1024 * 1024,
- api_version: str = "v2",
- gcp_conn_id: str = "google_cloud_default",
- delegate_to: str | None = None,
- impersonation_chain: str | Sequence[str] | None = None,
+ query: str,
+ page_token: str | None = None,
+ page_size: int = 10000,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.query = query
+
+ self.page_token = page_token
+ self.page_size = page_size
+
+ def execute(self, context: Context) -> Any:
+ self.log.info("Retrieving the metadata for %s", self.query)
+ response = self.hook.search_fields(
+ query=self.query,
+ page_token=self.page_token,
+ page_size=self.page_size,
+ )
+ self.log.info("Num of fields retrieved, #%d", len(response["results"]))
+ return response
+
+
+class GoogleSearchAdsGetCustomColumnOperator(_GoogleSearchAdsBaseOperator):
+ """
+ Retrieve details of a custom column for the given customer_id and
campaign_id.
+
+ .. seealso:
+ For API documentation check:
+
https://developers.google.com/search-ads/reporting/api/reference/rest/v0/customers.customColumns/get
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleSearchAdsGetCustomColumnOperator`
+
+ :param customer_id: The customer ID for the custom column.
+ :param custom_column_id: The ID for the custom column.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param api_version: The version of the API that will be requested for
example 'v0'.
+ """
+
+ def __init__(
+ self,
+ *,
+ customer_id: str,
+ custom_column_id: str,
**kwargs,
) -> None:
super().__init__(**kwargs)
- self.report_id = report_id
- self.api_version = api_version
- self.gcp_conn_id = gcp_conn_id
- self.delegate_to = delegate_to
- self.report_id = report_id
- self.chunk_size = chunk_size
- self.gzip = gzip
- self.bucket_name = bucket_name
- self.report_name = report_name
- self.impersonation_chain = impersonation_chain
-
- def _resolve_file_name(self, name: str) -> str:
- csv = ".csv"
- gzip = ".gz"
- if not name.endswith(csv):
- name += csv
- if self.gzip:
- name += gzip
- return name
-
- @staticmethod
- def _set_bucket_name(name: str) -> str:
- bucket = name if not name.startswith("gs://") else name[5:]
- return bucket.strip("/")
-
- @staticmethod
- def _handle_report_fragment(fragment: bytes) -> bytes:
- fragment_records = fragment.split(b"\n", 1)
- if len(fragment_records) > 1:
- return fragment_records[1]
- return b""
+ self.customer_id = customer_id
+ self.custom_column_id = custom_column_id
def execute(self, context: Context):
- hook = GoogleSearchAdsHook(
- gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to,
- api_version=self.api_version,
- impersonation_chain=self.impersonation_chain,
+ self.log.info(
+ "Retrieving the custom column for the customer %s with the id of
%s",
+ self.customer_id,
+ self.custom_column_id,
)
-
- gcs_hook = GCSHook(
- gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to,
- impersonation_chain=self.impersonation_chain,
+ response = self.hook.get_custom_column(
+ customer_id=self.customer_id,
+ custom_column_id=self.custom_column_id,
)
+ self.log.info("Retrieved custom column: %s", response["id"])
+ return response
+
+
+class GoogleSearchAdsListCustomColumnsOperator(_GoogleSearchAdsBaseOperator):
+ """
+ List all custom columns.
+
+ .. seealso:
+ For API documentation check:
+
https://developers.google.com/search-ads/reporting/api/reference/rest/v0/customers.customColumns/list
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleSearchAdsListCustomColumnsOperator`
- # Resolve file name of the report
- report_name = self.report_name or self.report_id
- report_name = self._resolve_file_name(report_name)
-
- response = hook.get(report_id=self.report_id)
- if not response["isReportReady"]:
- raise AirflowException(f"Report {self.report_id} is not ready yet")
-
- # Resolve report fragments
- fragments_count = len(response["files"])
-
- # Download chunks of report's data
- self.log.info("Downloading Search Ads report %s", self.report_id)
- with NamedTemporaryFile() as temp_file:
- for i in range(fragments_count):
- byte_content = hook.get_file(report_fragment=i,
report_id=self.report_id)
- fragment = byte_content if i == 0 else
self._handle_report_fragment(byte_content)
- temp_file.write(fragment)
-
- temp_file.flush()
-
- bucket_name = self._set_bucket_name(self.bucket_name)
- gcs_hook.upload(
- bucket_name=bucket_name,
- object_name=report_name,
- gzip=self.gzip,
- filename=temp_file.name,
- )
- self.xcom_push(context, key="file_name", value=report_name)
+ :param customer_id: The customer ID for the custom column.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param api_version: The version of the API that will be requested for
example 'v0'.
+ """
+
+ def __init__(
+ self,
+ *,
+ customer_id: str,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.customer_id = customer_id
+
+ def execute(self, context: Context):
+ self.log.info("Listing the custom columns for %s", self.customer_id)
+ response = self.hook.list_custom_columns(customer_id=self.customer_id)
+ self.log.info("Num of retrieved custom column: %d",
len(response.get("customColumns")))
+ return response
diff --git a/airflow/providers/google/marketing_platform/sensors/search_ads.py
b/airflow/providers/google/marketing_platform/sensors/search_ads.py
deleted file mode 100644
index 6b044f7e2a..0000000000
--- a/airflow/providers/google/marketing_platform/sensors/search_ads.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module contains Google Search Ads sensor."""
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING, Sequence
-
-from airflow.providers.google.marketing_platform.hooks.search_ads import
GoogleSearchAdsHook
-from airflow.sensors.base import BaseSensorOperator
-
-if TYPE_CHECKING:
- from airflow.utils.context import Context
-
-
-class GoogleSearchAdsReportSensor(BaseSensorOperator):
- """
- Polls for the status of a report request.
-
- .. seealso::
- For API documentation check:
- https://developers.google.com/search-ads/v2/reference/reports/get
-
- .. seealso::
- For more information on how to use this operator, take a look at the
guide:
- :ref:`howto/operator:GoogleSearchAdsReportSensor`
-
- :param report_id: ID of the report request being polled.
- :param api_version: The version of the api that will be requested for
example 'v3'.
- :param gcp_conn_id: The connection ID to use when fetching connection info.
- :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
- if any. For this to work, the service account making the request must
have
- domain-wide delegation enabled.
- :param impersonation_chain: Optional service account to impersonate using
short-term
- credentials, or chained list of accounts required to get the
access_token
- of the last account in the list, which will be impersonated in the
request.
- If set as a string, the account must grant the originating account
- the Service Account Token Creator IAM role.
- If set as a sequence, the identities from the list must grant
- Service Account Token Creator IAM role to the directly preceding
identity, with first
- account from the list granting this role to the originating account
(templated).
- """
-
- template_fields: Sequence[str] = (
- "report_id",
- "impersonation_chain",
- )
-
- def __init__(
- self,
- *,
- report_id: str,
- api_version: str = "v2",
- gcp_conn_id: str = "google_cloud_default",
- delegate_to: str | None = None,
- mode: str = "reschedule",
- poke_interval: int = 5 * 60,
- impersonation_chain: str | Sequence[str] | None = None,
- **kwargs,
- ) -> None:
- super().__init__(mode=mode, poke_interval=poke_interval, **kwargs)
- self.report_id = report_id
- self.api_version = api_version
- self.gcp_conn_id = gcp_conn_id
- self.delegate_to = delegate_to
- self.impersonation_chain = impersonation_chain
-
- def poke(self, context: Context):
- hook = GoogleSearchAdsHook(
- gcp_conn_id=self.gcp_conn_id,
- delegate_to=self.delegate_to,
- api_version=self.api_version,
- impersonation_chain=self.impersonation_chain,
- )
- self.log.info("Checking status of %s report.", self.report_id)
- response = hook.get(report_id=self.report_id)
- return response["isReportReady"]
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index d69b3f8fd6..a28aaa479a 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -754,9 +754,6 @@ sensors:
- integration-name: Google Display&Video 360
python-modules:
- airflow.providers.google.marketing_platform.sensors.display_video
- - integration-name: Google Search Ads 360
- python-modules:
- - airflow.providers.google.marketing_platform.sensors.search_ads
- integration-name: Google Looker
python-modules:
- airflow.providers.google.cloud.sensors.looker
diff --git
a/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst
b/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst
index 22343125d4..928d60f671 100644
---
a/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst
+++
b/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst
@@ -26,65 +26,94 @@ Prerequisite Tasks
.. include:: /operators/_partials/prerequisite_tasks.rst
-.. _howto/operator:GoogleSearchAdsInsertReportOperator:
+.. _howto/operator:GoogleSearchAdsSearchOperator:
-Inserting a report
+Querying a report
^^^^^^^^^^^^^^^^^^
-To insert a Search Ads report use the
-:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsInsertReportOperator`.
+To query a Search Ads report use the
+:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsSearchOperator`.
.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_search_ads.py
:language: python
:dedent: 4
- :start-after: [START howto_search_ads_generate_report_operator]
- :end-before: [END howto_search_ads_generate_report_operator]
+ :start-after: [START howto_search_ads_search_query_reports]
+ :end-before: [END howto_search_ads_search_query_reports]
You can use :ref:`Jinja templating <concepts:jinja-templating>` with
-:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsInsertReportOperator`
-parameters which allows you to dynamically determine values. You can provide
report definition using ``
-.json`` file as this operator supports this template extension.
-The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used
by other operators:
+:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsSearchOperator`
+parameters which allows you to dynamically determine values.
+
+.. _howto/operator:GoogleSearchAdsGetFieldOperator:
+
+Retrieve a field metadata
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To retrieve metadata of a field use
+:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsGetFieldOperator`.
.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_search_ads.py
:language: python
:dedent: 4
- :start-after: [START howto_search_ads_get_report_id]
- :end-before: [END howto_search_ads_get_report_id]
+ :start-after: [START howto_search_ads_get_field]
+ :end-before: [END howto_search_ads_get_field]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsGetFieldOperator`
+parameters which allows you to dynamically determine values.
-.. _howto/operator:GoogleSearchAdsReportSensor:
+.. _howto/operator:GoogleSearchAdsSearchFieldsOperator:
-Awaiting for a report
-^^^^^^^^^^^^^^^^^^^^^
+Retrieve metadata for multiple fields
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-To wait for a report to be ready for download use
-:class:`~airflow.providers.google.marketing_platform.sensors.search_ads.GoogleSearchAdsReportSensor`.
+To retrieve metadata of multiple fields use the
+:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsSearchFieldsOperator`.
.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_search_ads.py
:language: python
:dedent: 4
- :start-after: [START howto_search_ads_get_report_operator]
- :end-before: [END howto_search_ads_get_report_operator]
+ :start-after: [START howto_search_ads_search_fields]
+ :end-before: [END howto_search_ads_search_fields]
You can use :ref:`Jinja templating <concepts:jinja-templating>` with
-:template-fields:`airflow.providers.google.marketing_platform.sensors.search_ads.GoogleSearchAdsReportSensor`
+:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsSearchFieldsOperator`
parameters which allows you to dynamically determine values.
-.. _howto/operator:GoogleSearchAdsGetfileReportOperator:
-Downloading a report
-^^^^^^^^^^^^^^^^^^^^
+.. _howto/operator:GoogleSearchAdsGetCustomColumnOperator:
+
+Retrieve a custom column details
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To retrieve details of a custom column use
+:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsGetCustomColumnOperator`.
+
+.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_search_ads.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_search_ads_get_custom_column]
+ :end-before: [END howto_search_ads_get_custom_column]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsGetCustomColumnOperator`
+parameters which allows you to dynamically determine values.
+
+.. _howto/operator:GoogleSearchAdsListCustomColumnsOperator:
+
+Retrieve a custom column details
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To retrieve the list of all custom columns use
-To download a Search Ads report to Google Cloud Storage bucket use the
-:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsDownloadReportOperator`.
+:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsListCustomColumnsOperator`.
.. exampleinclude::
/../../tests/system/providers/google/marketing_platform/example_search_ads.py
:language: python
:dedent: 4
- :start-after: [START howto_search_ads_getfile_report_operator]
- :end-before: [END howto_search_ads_getfile_report_operator]
+ :start-after: [START howto_search_ads_list_custom_columns]
+ :end-before: [END howto_search_ads_list_custom_columns]
You can use :ref:`Jinja templating <concepts:jinja-templating>` with
-:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsDownloadReportOperator`
+:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsListCustomColumnsOperator`
parameters which allows you to dynamically determine values.
-The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used
by other operators.
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index b846534d8d..1e684888dc 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -411,6 +411,7 @@ class
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
"airflow.providers.google.cloud.operators.dataproc._DataprocStartStopClusterBaseOperator",
"airflow.providers.google.cloud.operators.vertex_ai.custom_job.CustomTrainingJobBaseOperator",
"airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator",
+
"airflow.providers.google.marketing_platform.operators.search_ads._GoogleSearchAdsBaseOperator",
}
MISSING_EXAMPLES_FOR_CLASSES = {
@@ -509,7 +510,6 @@ class
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
"GoogleDisplayVideo360GetSDFDownloadOperationSensor",
"airflow.providers.google.marketing_platform.sensors.display_video."
"GoogleDisplayVideo360ReportSensor",
-
"airflow.providers.google.marketing_platform.sensors.search_ads.GoogleSearchAdsReportSensor",
}
@pytest.mark.xfail(reason="We did not reach full coverage yet")
diff --git a/tests/providers/google/marketing_platform/hooks/test_search_ads.py
b/tests/providers/google/marketing_platform/hooks/test_search_ads.py
index 1e0205a956..1fc08237f1 100644
--- a/tests/providers/google/marketing_platform/hooks/test_search_ads.py
+++ b/tests/providers/google/marketing_platform/hooks/test_search_ads.py
@@ -19,75 +19,176 @@ from __future__ import annotations
from unittest import mock
-from airflow.providers.google.marketing_platform.hooks.search_ads import
GoogleSearchAdsHook
+import pytest
+
+from airflow.providers.google.marketing_platform.hooks.search_ads import (
+ GoogleSearchAdsHook,
+ GoogleSearchAdsReportingHook,
+)
from tests.providers.google.cloud.utils.base_gcp_mock import
mock_base_gcp_hook_default_project_id
-API_VERSION = "v2"
GCP_CONN_ID = "google_cloud_default"
+API_VERSION = "v0"
+CUSTOMER_ID = "customer_id"
+QUERY = "SELECT * FROM campaigns WHERE segments.date DURING LAST_30_DAYS"
-class TestSearchAdsHook:
+class TestGoogleSearchAdsReportingHook:
def setup_method(self):
with mock.patch(
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleBaseHook.__init__",
new=mock_base_gcp_hook_default_project_id,
):
- self.hook = GoogleSearchAdsHook(gcp_conn_id=GCP_CONN_ID)
+ self.hook = GoogleSearchAdsReportingHook(gcp_conn_id=GCP_CONN_ID)
-
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsHook._authorize")
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.get_credentials"
+ )
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.build")
- def test_gen_conn(self, mock_build, mock_authorize):
+ def test_gen_conn(self, mock_build, mock_get_credentials):
result = self.hook.get_conn()
mock_build.assert_called_once_with(
- "doubleclicksearch",
+ "searchads360",
API_VERSION,
- http=mock_authorize.return_value,
+ credentials=mock_get_credentials.return_value,
cache_discovery=False,
)
assert mock_build.return_value == result
-
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsHook.get_conn")
- def test_insert(self, get_conn_mock):
- report = {"report": "test"}
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.customer_service"
+ )
+ @pytest.mark.parametrize(
+ "given_args, expected_args_extras",
+ [
+ ({"page_token": None}, {}),
+ ({"page_token": "next_page_token"}, {"pageToken":
"next_page_token"}),
+ ({"summary_row_setting": "summary line content"},
{"summaryRowSetting": "summary line content"}),
+ ({"page_size": 10, "validate_only": True}, {"pageSize": 10,
"validateOnly": True}),
+ ],
+ )
+ def test_search(self, customer_service_mock, given_args,
expected_args_extras):
+ return_value = {"results": [{"x": 1}]}
+ (
+
customer_service_mock.searchAds360.return_value.search.return_value.execute
+ ).return_value = return_value
+
+ result = self.hook.search(customer_id=CUSTOMER_ID, query=QUERY,
**given_args)
+
+ expected_args = {
+ "customerId": CUSTOMER_ID,
+ "body": {
+ "query": QUERY,
+ "pageSize": 10000,
+ "returnTotalResultsCount": False,
+ "validateOnly": False,
+ **expected_args_extras,
+ },
+ }
+
customer_service_mock.searchAds360.return_value.search.assert_called_once_with(**expected_args)
- return_value = "TEST"
-
get_conn_mock.return_value.reports.return_value.request.return_value.execute.return_value
= (
- return_value
- )
+ assert return_value == result
- result = self.hook.insert_report(report=report)
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.customer_service"
+ )
+ def test_get_custom_column(self, customer_service_mock):
+ custom_column_id = "custom_column_id"
+ return_value = {"resourceName": 1}
+ (
+
customer_service_mock.customColumns.return_value.get.return_value.execute
+ ).return_value = return_value
-
get_conn_mock.return_value.reports.return_value.request.assert_called_once_with(body=report)
+ result = self.hook.get_custom_column(customer_id=CUSTOMER_ID,
custom_column_id=custom_column_id)
- assert return_value == result
+
customer_service_mock.customColumns.return_value.get.assert_called_once_with(
+
resourceName=f"customers/{CUSTOMER_ID}/customColumns/{custom_column_id}"
+ )
-
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsHook.get_conn")
- def test_get(self, get_conn_mock):
- report_id = "REPORT_ID"
+ assert return_value == result
- return_value = "TEST"
-
get_conn_mock.return_value.reports.return_value.get.return_value.execute.return_value
= return_value
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.customer_service"
+ )
+ def test_list_custom_columns(self, customer_service_mock):
+ return_value = {
+ "results": [
+ {"resourceName":
f"customers/{CUSTOMER_ID}/customColumns/col1"},
+ {"resourceName":
f"customers/{CUSTOMER_ID}/customColumns/col2"},
+ ]
+ }
+ (
+
customer_service_mock.customColumns.return_value.list.return_value.execute
+ ).return_value = return_value
- result = self.hook.get(report_id=report_id)
+ result = self.hook.list_custom_columns(customer_id=CUSTOMER_ID)
-
get_conn_mock.return_value.reports.return_value.get.assert_called_once_with(reportId=report_id)
+
customer_service_mock.customColumns.return_value.list.assert_called_once_with(customerId=CUSTOMER_ID)
assert return_value == result
-
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsHook.get_conn")
- def test_get_file(self, get_conn_mock):
- report_fragment = 42
- report_id = "REPORT_ID"
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.fields_service"
+ )
+ def test_get_field(self, fields_service_mock):
+ field_name = "field_name"
+ return_value = {
+ "name": "Field 1",
+ "resourceName":
f"customers/{CUSTOMER_ID}/searchAds360Fields/field1",
+ }
+ fields_service_mock.get.return_value.execute.return_value =
return_value
- return_value = "TEST"
-
get_conn_mock.return_value.reports.return_value.getFile.return_value.execute.return_value
= (
- return_value
- )
+ result = self.hook.get_field(field_name=field_name)
- result = self.hook.get_file(report_fragment=report_fragment,
report_id=report_id)
+
fields_service_mock.get.assert_called_once_with(resourceName=f"searchAds360Fields/{field_name}")
-
get_conn_mock.return_value.reports.return_value.getFile.assert_called_once_with(
- reportFragment=report_fragment, reportId=report_id
- )
+ assert return_value == result
+
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.fields_service"
+ )
+ @pytest.mark.parametrize(
+ "given_args, expected_args_extras",
+ [
+ ({"page_token": None}, {}),
+ ({"page_token": "next_page_token"}, {"pageToken":
"next_page_token"}),
+ ({"page_size": 10}, {"pageSize": 10}),
+ ],
+ )
+ def test_search_fields(self, fields_service_mock, given_args,
expected_args_extras):
+ query = "SELECT field1, field2 FROM campaigns;"
+ return_value = {
+ "results": [
+ {"name": "Field 1", "resourceName":
f"customers/{CUSTOMER_ID}/searchAds360Fields/field1"},
+ {"name": "Field 2", "resourceName":
f"customers/{CUSTOMER_ID}/searchAds360Fields/field2"},
+ ]
+ }
+ fields_service_mock.search.return_value.execute.return_value =
return_value
+
+ result = self.hook.search_fields(query=query, **given_args)
+
+ expected_args = {"query": query, "pageSize": 10000,
**expected_args_extras}
+ fields_service_mock.search.assert_called_once_with(body=expected_args)
assert return_value == result
+
+
+class TestSearchAdsHook:
+ def setup_method(self):
+ with mock.patch(
+
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleBaseHook.__init__",
+ new=mock_base_gcp_hook_default_project_id,
+ ):
+ self.hook = GoogleSearchAdsHook(gcp_conn_id=GCP_CONN_ID)
+
+
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsHook._authorize")
+
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.build")
+ def test_gen_conn(self, mock_build, mock_authorize):
+ result = self.hook.get_conn()
+ mock_build.assert_called_once_with(
+ "doubleclicksearch",
+ "v2",
+ http=mock_authorize.return_value,
+ cache_discovery=False,
+ )
+ assert mock_build.return_value == result
diff --git
a/tests/providers/google/marketing_platform/operators/test_search_ads.py
b/tests/providers/google/marketing_platform/operators/test_search_ads.py
index ef0b59ac07..80cc0d26fe 100644
--- a/tests/providers/google/marketing_platform/operators/test_search_ads.py
+++ b/tests/providers/google/marketing_platform/operators/test_search_ads.py
@@ -17,160 +17,158 @@
# under the License.
from __future__ import annotations
-import json
-from tempfile import NamedTemporaryFile
from unittest import mock
-import pytest
-
-from airflow.models import DAG, TaskInstance as TI
from airflow.providers.google.marketing_platform.operators.search_ads import (
- GoogleSearchAdsDownloadReportOperator,
- GoogleSearchAdsInsertReportOperator,
+ GoogleSearchAdsGetCustomColumnOperator,
+ GoogleSearchAdsGetFieldOperator,
+ GoogleSearchAdsListCustomColumnsOperator,
+ GoogleSearchAdsSearchFieldsOperator,
+ GoogleSearchAdsSearchOperator,
)
-from airflow.utils import timezone
-from airflow.utils.session import create_session
-
-API_VERSION = "api_version"
-GCP_CONN_ID = "google_cloud_default"
-DEFAULT_DATE = timezone.datetime(2021, 1, 1)
-END_DATE = timezone.datetime(2021, 1, 2)
-REPORT_ID = "report_id"
-BUCKET_NAME = "test_bucket"
-REPORT_NAME = "test_report.csv"
-FILE_NAME = "test"
+GCP_CONN_ID = "google_search_ads_default"
+API_VERSION = "v0"
+CUSTOMER_ID = "customer_id"
-class TestGoogleSearchAdsInsertReportOperator:
-
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
-
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+class TestGoogleSearchAdsSearchOperator:
@mock.patch(
- "airflow.providers.google.marketing_platform."
- "operators.search_ads.GoogleSearchAdsInsertReportOperator.xcom_push"
+
"airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsReportingHook"
)
- def test_execute(self, xcom_mock, mock_base_op, hook_mock):
- report = {"report": "test"}
- hook_mock.return_value.insert_report.return_value = {"id": REPORT_ID}
- op = GoogleSearchAdsInsertReportOperator(report=report,
api_version=API_VERSION, task_id="test_task")
+
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+ def test_execute(self, mock_base_op, hook_mock):
+ query = "SELECT * FROM campaigns WHERE segments.date DURING
LAST_30_DAYS"
+ hook_mock.return_value.search.return_value = {"results": []}
+ op = GoogleSearchAdsSearchOperator(
+ customer_id=CUSTOMER_ID,
+ query=query,
+ api_version=API_VERSION,
+ task_id="test_task",
+ )
op.execute(context=None)
hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
- delegate_to=None,
- api_version=API_VERSION,
- impersonation_chain=None,
+ api_version="v0",
+ )
+ hook_mock.return_value.search.assert_called_once_with(
+ customer_id=CUSTOMER_ID,
+ query=query,
+ page_size=10000,
+ page_token=None,
+ return_total_results_count=False,
+ summary_row_setting=None,
+ validate_only=False,
)
-
hook_mock.return_value.insert_report.assert_called_once_with(report=report)
- xcom_mock.assert_called_once_with(None, key="report_id",
value=REPORT_ID)
-
- def test_prepare_template(self):
- report = {"key": "value"}
- with NamedTemporaryFile("w+", suffix=".json") as f:
- f.write(json.dumps(report))
- f.flush()
- op = GoogleSearchAdsInsertReportOperator(
- report=report, api_version=API_VERSION, task_id="test_task"
- )
- op.prepare_template()
-
- assert isinstance(op.report, dict)
- assert op.report == report
-
-
[email protected]_if_database_isolation_mode
[email protected]_test
-class TestGoogleSearchAdsDownloadReportOperator:
- def setup_method(self):
- with create_session() as session:
- session.query(TI).delete()
- def teardown_method(self):
- with create_session() as session:
- session.query(TI).delete()
-
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.NamedTemporaryFile")
-
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GCSHook")
-
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
+class TestGoogleSearchAdsGetFieldOperator:
@mock.patch(
- "airflow.providers.google.marketing_platform."
- "operators.search_ads.GoogleSearchAdsDownloadReportOperator.xcom_push"
+
"airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsReportingHook"
)
- def test_execute(self, xcom_mock, hook_mock, gcs_hook_mock, tempfile_mock):
- temp_file_name = "TEMP"
- data = b"data"
-
- hook_mock.return_value.get.return_value = {"files": [0],
"isReportReady": True}
- hook_mock.return_value.get_file.return_value = data
- tempfile_mock.return_value.__enter__.return_value.name = temp_file_name
-
- op = GoogleSearchAdsDownloadReportOperator(
- report_id=REPORT_ID,
- report_name=FILE_NAME,
- bucket_name=BUCKET_NAME,
+
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+ def test_execute(self, mock_base_op, hook_mock):
+ field_name = "the_field"
+ hook_mock.return_value.get_field.return_value = {
+ "name": field_name,
+ "resourceName": f"searchAds360Fields/{field_name}",
+ }
+ op = GoogleSearchAdsGetFieldOperator(
+ field_name=field_name,
api_version=API_VERSION,
task_id="test_task",
)
op.execute(context=None)
hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
- delegate_to=None,
- api_version=API_VERSION,
- impersonation_chain=None,
+ api_version="v0",
)
-
hook_mock.return_value.get_file.assert_called_once_with(report_fragment=0,
report_id=REPORT_ID)
-
tempfile_mock.return_value.__enter__.return_value.write.assert_called_once_with(data)
- gcs_hook_mock.return_value.upload.assert_called_once_with(
- bucket_name=BUCKET_NAME,
- gzip=True,
- object_name=FILE_NAME + ".csv.gz",
- filename=temp_file_name,
+ hook_mock.return_value.get_field.assert_called_once_with(
+ field_name=field_name,
)
- xcom_mock.assert_called_once_with(None, key="file_name",
value=FILE_NAME + ".csv.gz")
-
- @pytest.mark.parametrize(
- "test_bucket_name",
- [BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{
ti.xcom_pull(task_ids='f') }}"],
- )
-
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.NamedTemporaryFile")
-
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GCSHook")
-
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
- def test_set_bucket_name(self, hook_mock, gcs_hook_mock, tempfile_mock,
test_bucket_name):
- temp_file_name = "TEMP"
- data = b"data"
- hook_mock.return_value.get.return_value = {"files": [0],
"isReportReady": True}
- hook_mock.return_value.get_file.return_value = data
- tempfile_mock.return_value.__enter__.return_value.name = temp_file_name
- dag = DAG(
- dag_id="test_set_bucket_name",
- start_date=DEFAULT_DATE,
- schedule=None,
- catchup=False,
+class TestGoogleSearchAdsSearchFieldsOperator:
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsReportingHook"
+ )
+
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+ def test_execute(self, mock_base_op, hook_mock):
+ field_name = "the_field"
+ query = (
+ "SELECT "
+ " name, category, selectable, filterable, sortable,
selectable_with, data_type, "
+ " is_repeated "
+ "WHERE "
+ " name LIKE 'ad_group.%'"
+ )
+ hook_mock.return_value.search_fields.return_value = {
+ "results": [{"name": field_name, "resource_name":
f"searchAds360Fields/{field_name}"}]
+ }
+ op = GoogleSearchAdsSearchFieldsOperator(
+ query=query,
+ api_version=API_VERSION,
+ task_id="test_task",
+ )
+ op.execute(context=None)
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version="v0",
+ )
+ hook_mock.return_value.search_fields.assert_called_once_with(
+ query=query,
+ page_token=None,
+ page_size=10000,
)
- if BUCKET_NAME not in test_bucket_name:
- @dag.task
- def f():
- return BUCKET_NAME
+class TestGoogleSearchAdsGetCustomColumnOperator:
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsReportingHook"
+ )
+
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+ def test_execute(self, mock_base_op, hook_mock):
+ custom_column_id = "custom_column_id"
+ hook_mock.return_value.get_custom_column.return_value = {"id":
custom_column_id}
+ op = GoogleSearchAdsGetCustomColumnOperator(
+ customer_id=CUSTOMER_ID,
+ custom_column_id=custom_column_id,
+ api_version=API_VERSION,
+ task_id="test_task",
+ )
+ op.execute(context=None)
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version="v0",
+ )
+ hook_mock.return_value.get_custom_column.assert_called_once_with(
+ customer_id=CUSTOMER_ID,
+ custom_column_id=custom_column_id,
+ )
- taskflow_op = f()
- taskflow_op.operator.run(start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
- op = GoogleSearchAdsDownloadReportOperator(
- report_id=REPORT_ID,
- report_name=FILE_NAME,
- bucket_name=test_bucket_name if test_bucket_name != "XComArg" else
taskflow_op,
+class TestGoogleSearchAdsListCustomColumnsOperator:
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsReportingHook"
+ )
+
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+ def test_execute(self, mock_base_op, hook_mock):
+ customs_columns = [
+ {"id": "custom_column_id_1"},
+ {"id": "custom_column_id_2"},
+ {"id": "custom_column_id_3"},
+ ]
+ hook_mock.return_value.list_custom_columns.return_value =
{"customColumns": customs_columns}
+ op = GoogleSearchAdsListCustomColumnsOperator(
+ customer_id=CUSTOMER_ID,
api_version=API_VERSION,
task_id="test_task",
- dag=dag,
)
- op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
- gcs_hook_mock.return_value.upload.assert_called_once_with(
- bucket_name=BUCKET_NAME,
- gzip=True,
- object_name=FILE_NAME + ".csv.gz",
- filename=temp_file_name,
+ op.execute(context=None)
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version="v0",
+ )
+ hook_mock.return_value.list_custom_columns.assert_called_once_with(
+ customer_id=CUSTOMER_ID,
)
diff --git
a/tests/providers/google/marketing_platform/sensors/test_search_ads.py
b/tests/providers/google/marketing_platform/sensors/test_search_ads.py
deleted file mode 100644
index e03b64a3c6..0000000000
--- a/tests/providers/google/marketing_platform/sensors/test_search_ads.py
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from unittest import mock
-
-import pytest
-
-from airflow.providers.google.marketing_platform.sensors.search_ads import
GoogleSearchAdsReportSensor
-
-pytestmark = pytest.mark.db_test
-
-API_VERSION = "api_version"
-GCP_CONN_ID = "google_cloud_default"
-
-
-class TestSearchAdsReportSensor:
-
@mock.patch("airflow.providers.google.marketing_platform.sensors.search_ads.GoogleSearchAdsHook")
-
@mock.patch("airflow.providers.google.marketing_platform.sensors.search_ads.BaseSensorOperator")
- def test_poke(self, mock_base_op, hook_mock):
- report_id = "REPORT_ID"
- op = GoogleSearchAdsReportSensor(report_id=report_id,
api_version=API_VERSION, task_id="test_task")
- op.poke(context=None)
- hook_mock.assert_called_once_with(
- gcp_conn_id=GCP_CONN_ID,
- delegate_to=None,
- api_version=API_VERSION,
- impersonation_chain=None,
- )
- hook_mock.return_value.get.assert_called_once_with(report_id=report_id)
diff --git
a/tests/system/providers/google/marketing_platform/example_search_ads.py
b/tests/system/providers/google/marketing_platform/example_search_ads.py
index d1e34c35ad..ba531dc964 100644
--- a/tests/system/providers/google/marketing_platform/example_search_ads.py
+++ b/tests/system/providers/google/marketing_platform/example_search_ads.py
@@ -23,65 +23,83 @@ from __future__ import annotations
import os
from datetime import datetime
-from typing import cast
from airflow.models.dag import DAG
-from airflow.models.xcom_arg import XComArg
from airflow.providers.google.marketing_platform.operators.search_ads import (
- GoogleSearchAdsDownloadReportOperator,
- GoogleSearchAdsInsertReportOperator,
+ GoogleSearchAdsGetCustomColumnOperator,
+ GoogleSearchAdsGetFieldOperator,
+ GoogleSearchAdsListCustomColumnsOperator,
+ GoogleSearchAdsSearchFieldsOperator,
+ GoogleSearchAdsSearchOperator,
)
-from airflow.providers.google.marketing_platform.sensors.search_ads import
GoogleSearchAdsReportSensor
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_search_ads"
+DAG_ID = "search_ads"
# [START howto_search_ads_env_variables]
-AGENCY_ID = os.environ.get("GMP_AGENCY_ID")
-ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID")
-GCS_BUCKET = os.environ.get("GMP_GCS_BUCKET", "test-cm-bucket")
-
-REPORT = {
- "reportScope": {"agencyId": AGENCY_ID, "advertiserId": ADVERTISER_ID},
- "reportType": "account",
- "columns": [{"columnName": "agency"}, {"columnName":
"lastModifiedTimestamp"}],
- "includeRemovedEntities": False,
- "statisticsCurrency": "usd",
- "maxRowsPerFile": 1000000,
- "downloadFormat": "csv",
-}
+CUSTOMER_ID: str = os.environ.get("GSA_CUSTOMER_ID", default="")
+QUERY = """
+ SELECT
+ campaign.name,
+ campaign.id,
+ campaign.status
+ FROM campaign;
+"""
+FIELD_NAME: str = os.environ.get("GSA_FIELD_NAME", default="")
+SEARCH_FIELDS_QUERY: str = """
+ SELECT
+ f1,
+ f2
+ FROM t1;
+"""
+CUSTOM_COLUMN_ID: str = os.environ.get("GSA_CUSTOM_COLUMN_ID", default="")
# [END howto_search_ads_env_variables]
with DAG(
- DAG_ID,
+ dag_id=DAG_ID,
schedule="@once", # Override to match your needs,
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["search", "search-ads", "ads"],
) as dag:
- # [START howto_search_ads_generate_report_operator]
- generate_report = GoogleSearchAdsInsertReportOperator(report=REPORT,
task_id="generate_report")
- # [END howto_search_ads_generate_report_operator]
+ # [START howto_search_ads_search_query_reports]
+ query_report = GoogleSearchAdsSearchOperator(
+ task_id="query_report",
+ customer_id=CUSTOMER_ID,
+ query=QUERY,
+ )
+ # [END howto_search_ads_search_query_reports]
- # [START howto_search_ads_get_report_id]
- report_id = cast(str, XComArg(generate_report, key="report_id"))
- # [END howto_search_ads_get_report_id]
+ # [START howto_search_ads_get_field]
+ get_field = GoogleSearchAdsGetFieldOperator(
+ task_id="get_field",
+ field_name=FIELD_NAME,
+ )
+ # [END howto_search_ads_get_field]
- # [START howto_search_ads_get_report_operator]
- wait_for_report = GoogleSearchAdsReportSensor(report_id=report_id,
task_id="wait_for_report")
- # [END howto_search_ads_get_report_operator]
+ # [START howto_search_ads_search_fields]
+ search_fields = GoogleSearchAdsSearchFieldsOperator(
+ task_id="search_fields",
+ query=SEARCH_FIELDS_QUERY,
+ )
+ # [END howto_search_ads_search_fields]
- # [START howto_search_ads_getfile_report_operator]
- download_report = GoogleSearchAdsDownloadReportOperator(
- report_id=report_id, bucket_name=GCS_BUCKET, task_id="download_report"
+ # [START howto_search_ads_get_custom_column]
+ get_custom_column = GoogleSearchAdsGetCustomColumnOperator(
+ task_id="get_custom_column",
+ customer_id=CUSTOMER_ID,
+ custom_column_id=CUSTOM_COLUMN_ID,
)
- # [END howto_search_ads_getfile_report_operator]
+ # [END howto_search_ads_get_custom_column]
- wait_for_report >> download_report
+ # [START howto_search_ads_list_custom_columns]
+ list_custom_columns = GoogleSearchAdsListCustomColumnsOperator(
+ task_id="list_custom_columns",
+ customer_id=CUSTOMER_ID,
+ )
+ # [END howto_search_ads_list_custom_columns]
- # Task dependencies created via `XComArgs`:
- # generate_report >> wait_for_report
- # generate_report >> download_report
+ (query_report >> get_field >> search_fields >> get_custom_column >>
list_custom_columns)
from tests.system.utils import get_test_run # noqa: E402