This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 567926535a Add new Google Search 360 Reporting Operators (#42255)
567926535a is described below

commit 567926535a719f7ea93565e3fee3f2fbd9a03cb9
Author: M. Olcay Tercanlı <[email protected]>
AuthorDate: Thu Sep 19 07:36:06 2024 +0200

    Add new Google Search 360 Reporting Operators (#42255)
    
    * Implement new Google Search 360 operators
    Remove the decommissioned operators
    
    Co-authored-by: Elad Kalif <[email protected]>
    
    ---------
    
    Co-authored-by: Jarek Potiuk <[email protected]>
    Co-authored-by: Elad Kalif <[email protected]>
---
 airflow/providers/google/CHANGELOG.rst             |  12 +
 .../google/marketing_platform/hooks/search_ads.py  | 199 +++++++++--
 .../marketing_platform/operators/search_ads.py     | 373 ++++++++++++---------
 .../marketing_platform/sensors/search_ads.py       |  92 -----
 airflow/providers/google/provider.yaml             |   3 -
 .../operators/marketing_platform/search_ads.rst    |  87 +++--
 tests/always/test_project_structure.py             |   2 +-
 .../marketing_platform/hooks/test_search_ads.py    | 175 ++++++++--
 .../operators/test_search_ads.py                   | 240 +++++++------
 .../marketing_platform/sensors/test_search_ads.py  |  45 ---
 .../marketing_platform/example_search_ads.py       |  94 +++---
 11 files changed, 770 insertions(+), 552 deletions(-)

diff --git a/airflow/providers/google/CHANGELOG.rst 
b/airflow/providers/google/CHANGELOG.rst
index 64ae3b4e91..ff54591fd6 100644
--- a/airflow/providers/google/CHANGELOG.rst
+++ b/airflow/providers/google/CHANGELOG.rst
@@ -27,6 +27,18 @@
 Changelog
 ---------
 
+Main
+.......
+
+.. warning::
+  The previous Search Ads 360 Reporting API 
<https://developers.google.com/search-ads/v2/how-tos/reporting>
+  (which is currently in use in google-provider) was already decommissioned on 
June 30, 2024
+  (see details <https://developers.google.com/search-ads/v2/migration>).
+  All new reporting development should use the new Search Ads 360 Reporting 
API.
+  Currently, the Reporting operators, sensors and hooks are failing due to the 
decommission.
+  The new API is not a replacement for the old one, it has a different 
approach and endpoints.
+  Therefore, new operators implemented for the new API.
+
 10.22.0
 .......
 
diff --git a/airflow/providers/google/marketing_platform/hooks/search_ads.py 
b/airflow/providers/google/marketing_platform/hooks/search_ads.py
index 85cc25c458..d57a89a821 100644
--- a/airflow/providers/google/marketing_platform/hooks/search_ads.py
+++ b/airflow/providers/google/marketing_platform/hooks/search_ads.py
@@ -19,73 +19,212 @@
 
 from __future__ import annotations
 
-from typing import Any, Sequence
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Sequence
 
+from google.oauth2.credentials import Credentials
 from googleapiclient.discovery import build
 
+from airflow.exceptions import AirflowException
 from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
 
+if TYPE_CHECKING:
+    from googleapiclient.discovery import Resource
 
-class GoogleSearchAdsHook(GoogleBaseHook):
-    """Hook for Google Search Ads 360."""
+
+class GoogleSearchAdsReportingHook(GoogleBaseHook):
+    """Hook for the Google Search Ads 360 Reporting API."""
 
     _conn: build | None = None
+    default_api_version: str = "v0"
 
     def __init__(
         self,
-        api_version: str = "v2",
-        gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
-        impersonation_chain: str | Sequence[str] | None = None,
+        api_version: str | None = None,
+        gcp_conn_id: str = "google_search_ads_default",
     ) -> None:
         super().__init__(
             gcp_conn_id=gcp_conn_id,
-            delegate_to=delegate_to,
-            impersonation_chain=impersonation_chain,
         )
-        self.api_version = api_version
+        self.api_version = api_version or self.default_api_version
 
-    def get_conn(self):
-        """Retrieve connection to Google SearchAds."""
+    def _get_config(self) -> None:
+        """
+        Set up Google Search Ads config from Connection.
+
+        This pulls the connections from db, and uses it to set up
+        ``google_search_ads_client``.
+        """
+        conn = self.get_connection(self.gcp_conn_id)
+        if "google_search_ads_client" not in conn.extra_dejson:
+            raise AirflowException("google_search_ads_client not found in 
extra field")
+
+        self.google_search_ads_config = 
conn.extra_dejson["google_search_ads_client"]
+
+    def get_credentials(self) -> Credentials:
+        """Return the credential instance for search ads."""
+        self._get_config()
+        self.logger().info(f"Credential configuration: 
{self.google_search_ads_config}")
+        return Credentials(**self.google_search_ads_config)
+
+    def get_conn(self) -> Resource:
         if not self._conn:
-            http_authorized = self._authorize()
+            creds = self.get_credentials()
+
             self._conn = build(
-                "doubleclicksearch",
+                "searchads360",
                 self.api_version,
-                http=http_authorized,
+                credentials=creds,
                 cache_discovery=False,
             )
         return self._conn
 
-    def insert_report(self, report: dict[str, Any]) -> Any:
+    @cached_property
+    def customer_service(self):
+        return self.get_conn().customers()
+
+    @cached_property
+    def fields_service(self):
+        return self.get_conn().searchAds360Fields()
+
+    def search(
+        self,
+        customer_id: str,
+        query: str,
+        page_token: str | None = None,
+        page_size: int = 10000,
+        return_total_results_count: bool = False,
+        summary_row_setting: str | None = None,
+        validate_only: bool = False,
+    ):
         """
-        Insert a report request into the reporting system.
+        Search and download the report. Use pagination to download entire 
report.
 
-        :param report: Report to be generated.
+        :param customer_id: The ID of the customer being queried.
+        :param query: The query to execute.
+        :param page_token: Token of the page to retrieve. If not specified, 
the first page of results will be
+            returned. Use the value obtained from `next_page_token` in the 
previous response
+            in order to request the next page of results.
+        :param page_size: Number of elements to retrieve in a single page. 
When too large a page is requested,
+            the server may decide to further limit the number of returned 
resources.
+            Default is 10000.
+        :param return_total_results_count: If true, the total number of 
results that match the query ignoring
+            the LIMIT clause will be included in the response. Default is 
false.
+        :param summary_row_setting: Determines whether a summary row will be 
returned. By default,
+            summary row is not returned. If requested, the summary row will be 
sent
+            in a response by itself after all others query results are 
returned.
+        :param validate_only: If true, the request is validated but not 
executed. Default is false.
         """
-        response = 
self.get_conn().reports().request(body=report).execute(num_retries=self.num_retries)
+        params: dict[str, Any] = {
+            "query": query,
+            "pageSize": page_size,
+            "returnTotalResultsCount": return_total_results_count,
+            "validateOnly": validate_only,
+        }
+        if page_token is not None:
+            params.update({"pageToken": page_token})
+        if summary_row_setting is not None:
+            params.update({"summaryRowSetting": summary_row_setting})
+
+        response = (
+            self.customer_service.searchAds360()
+            .search(customerId=customer_id, body=params)
+            .execute(num_retries=self.num_retries)
+        )
+        self.log.info("Search response: %s", response)
         return response
 
-    def get(self, report_id: str) -> Any:
+    def get_custom_column(self, customer_id: str, custom_column_id: str):
         """
-        Poll for the status of a report request.
+        Retrieve the requested custom column in full detail.
 
-        :param report_id: ID of the report request being polled.
+        :param customer_id: The customer id
+        :param custom_column_id: The custom column id
         """
-        response = 
self.get_conn().reports().get(reportId=report_id).execute(num_retries=self.num_retries)
+        resource_name = 
f"customers/{customer_id}/customColumns/{custom_column_id}"
+        response = (
+            self.customer_service.customColumns()
+            .get(resourceName=resource_name)
+            .execute(num_retries=self.num_retries)
+        )
+        self.log.info("Retrieved custom column: %s", response)
         return response
 
-    def get_file(self, report_fragment: int, report_id: str) -> Any:
+    def list_custom_columns(self, customer_id: str):
         """
-        Download a report file encoded in UTF-8.
+        Retrieve all the custom columns associated with the customer in full 
detail.
 
-        :param report_fragment: The index of the report fragment to download.
-        :param report_id: ID of the report.
+        :param customer_id: The customer id
         """
         response = (
-            self.get_conn()
-            .reports()
-            .getFile(reportFragment=report_fragment, reportId=report_id)
+            self.customer_service.customColumns()
+            .list(customerId=customer_id)
             .execute(num_retries=self.num_retries)
         )
+        self.log.info("Listing the custom columns: %s", response)
+        return response
+
+    def get_field(self, field_name: str):
+        """
+        Retrieve the requested field details.
+
+        :param field_name: The name of the field.
+        """
+        resource_name = f"searchAds360Fields/{field_name}"
+        response = 
self.fields_service.get(resourceName=resource_name).execute(num_retries=self.num_retries)
+        self.log.info("Retrieved field: %s", response)
         return response
+
+    def search_fields(self, query: str, page_token: str | None = None, 
page_size: int | None = 10000):
+        """
+        Retrieve all the fields that match with the given search.
+
+        :param query: The query string to execute.
+        :param page_token: Token of the page to retrieve. If not specified, 
the first page of results will be
+            returned. Use the value obtained from `next_page_token` in the 
previous response
+            in order to request the next page of results.
+        :param page_size: Number of elements to retrieve in a single page. 
When too large a page is requested,
+            the server may decide to further limit the number of returned 
resources.
+            Default 10000.
+        """
+        params: dict[str, Any] = {
+            "query": query,
+            "pageSize": page_size,
+        }
+        if page_token is not None:
+            params.update({"pageToken": page_token})
+        response = 
self.fields_service.search(body=params).execute(num_retries=self.num_retries)
+        self.log.info("Retrieved fields: %s", response)
+        return response
+
+
+class GoogleSearchAdsHook(GoogleBaseHook):
+    """Hook for Google Search Ads 360."""
+
+    _conn: build | None = None
+
+    def __init__(
+        self,
+        api_version: str = "v2",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
+        self.api_version = api_version
+
+    def get_conn(self):
+        """Retrieve connection to Google SearchAds."""
+        if not self._conn:
+            http_authorized = self._authorize()
+            self._conn = build(
+                "doubleclicksearch",
+                self.api_version,
+                http=http_authorized,
+                cache_discovery=False,
+            )
+        return self._conn
diff --git 
a/airflow/providers/google/marketing_platform/operators/search_ads.py 
b/airflow/providers/google/marketing_platform/operators/search_ads.py
index 87b5b3f970..067c2a8946 100644
--- a/airflow/providers/google/marketing_platform/operators/search_ads.py
+++ b/airflow/providers/google/marketing_platform/operators/search_ads.py
@@ -19,217 +19,278 @@
 
 from __future__ import annotations
 
-import json
-from tempfile import NamedTemporaryFile
+from functools import cached_property
 from typing import TYPE_CHECKING, Any, Sequence
 
-from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
-from airflow.providers.google.cloud.hooks.gcs import GCSHook
-from airflow.providers.google.marketing_platform.hooks.search_ads import 
GoogleSearchAdsHook
+from airflow.providers.google.marketing_platform.hooks.search_ads import 
GoogleSearchAdsReportingHook
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
-class GoogleSearchAdsInsertReportOperator(BaseOperator):
+class _GoogleSearchAdsBaseOperator(BaseOperator):
     """
-    Inserts a report request into the reporting system.
+    Base class to use in NextGen operator.
+
+    :param api_version: The version of the API that will be requested for 
example 'v0'.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    """
+
+    template_fields: Sequence[str] = (
+        "api_version",
+        "gcp_conn_id",
+    )
+
+    def __init__(
+        self,
+        *,
+        api_version: str = "v0",
+        gcp_conn_id: str = "google_search_ads_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+
+    @cached_property
+    def hook(self):
+        return GoogleSearchAdsReportingHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+        )
+
+
+class GoogleSearchAdsSearchOperator(_GoogleSearchAdsBaseOperator):
+    """
+    Search a report by query.
 
     .. seealso:
         For API documentation check:
-        https://developers.google.com/search-ads/v2/reference/reports/request
+        
https://developers.google.com/search-ads/reporting/api/reference/rest/v0/customers.searchAds360/search
 
     .. seealso::
         For more information on how to use this operator, take a look at the 
guide:
-        :ref:`howto/operator:GoogleSearchAdsInsertReportOperator`
+        :ref:`howto/operator:GoogleSearchAdsSearchOperator`
 
-    :param report: Report to be generated
-    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param customer_id: The ID of the customer being queried.
+    :param query: The query to execute.
+    :param page_token: Token of the page to retrieve. If not specified, the 
first page of results will be
+        returned. Use the value obtained from `next_page_token` in the 
previous response
+        in order to request the next page of results.
+    :param page_size: Number of elements to retrieve in a single page. When 
too large a page is requested,
+        the server may decide to further limit the number of returned 
resources.
+        Default is 10000.
+    :param return_total_results_count: If true, the total number of results 
that match the query ignoring
+        the LIMIT clause will be included in the response. Default is false.
+    :param summary_row_setting: Determines whether a summary row will be 
returned. By default,
+        summary row is not returned. If requested, the summary row will be sent
+        in a response by itself after all others query results are returned.
+    :param validate_only: If true, the request is validated but not executed. 
Default is false.
     :param gcp_conn_id: The connection ID to use when fetching connection info.
-    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
-        if any. For this to work, the service account making the request must 
have
-        domain-wide delegation enabled.
-    :param impersonation_chain: Optional service account to impersonate using 
short-term
-        credentials, or chained list of accounts required to get the 
access_token
-        of the last account in the list, which will be impersonated in the 
request.
-        If set as a string, the account must grant the originating account
-        the Service Account Token Creator IAM role.
-        If set as a sequence, the identities from the list must grant
-        Service Account Token Creator IAM role to the directly preceding 
identity, with first
-        account from the list granting this role to the originating account 
(templated).
+    :param api_version: The version of the API that will be requested for 
example 'v0'.
     """
 
     template_fields: Sequence[str] = (
-        "report",
-        "impersonation_chain",
+        *_GoogleSearchAdsBaseOperator.template_fields,
+        "page_token",
+        "page_size",
     )
-    template_ext: Sequence[str] = (".json",)
 
     def __init__(
         self,
         *,
-        report: dict[str, Any],
-        api_version: str = "v2",
-        gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
-        impersonation_chain: str | Sequence[str] | None = None,
+        customer_id: str,
+        query: str,
+        page_token: str | None = None,
+        page_size: int = 10000,
+        return_total_results_count: bool = False,
+        summary_row_setting: str | None = None,
+        validate_only: bool = False,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
-        self.report = report
-        self.api_version = api_version
-        self.gcp_conn_id = gcp_conn_id
-        self.delegate_to = delegate_to
-        self.impersonation_chain = impersonation_chain
-
-    def prepare_template(self) -> None:
-        # If .json is passed then we have to read the file
-        if isinstance(self.report, str) and self.report.endswith(".json"):
-            with open(self.report) as file:
-                self.report = json.load(file)
+        self.customer_id = customer_id
+        self.query = query
+        self.page_token = page_token
+        self.page_size = page_size
+        self.return_total_results_count = return_total_results_count
+        self.summary_row_setting = summary_row_setting
+        self.validate_only = validate_only
 
     def execute(self, context: Context):
-        hook = GoogleSearchAdsHook(
-            gcp_conn_id=self.gcp_conn_id,
-            delegate_to=self.delegate_to,
-            api_version=self.api_version,
-            impersonation_chain=self.impersonation_chain,
+        self.log.info("Querying Search Ads")
+        response = self.hook.search(
+            customer_id=self.customer_id,
+            query=self.query,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            return_total_results_count=self.return_total_results_count,
+            summary_row_setting=self.summary_row_setting,
+            validate_only=self.validate_only,
         )
-        self.log.info("Generating Search Ads report")
-        response = hook.insert_report(report=self.report)
-        report_id = response.get("id")
-        self.xcom_push(context, key="report_id", value=report_id)
-        self.log.info("Report generated, id: %s", report_id)
+        self.log.info("Query result: %s", response)
+        return response
+
+
+class GoogleSearchAdsGetFieldOperator(_GoogleSearchAdsBaseOperator):
+    """
+    Retrieve metadata for a resource or a field.
+
+    .. seealso:
+        For API documentation check:
+        
https://developers.google.com/search-ads/reporting/api/reference/rest/v0/searchAds360Fields/get
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleSearchAdsGetFieldOperator`
+
+    :param field_name: The name of the field.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param api_version: The version of the API that will be requested for 
example 'v0'.
+    """
+
+    def __init__(
+        self,
+        *,
+        field_name: str,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.field_name = field_name
+
+    def execute(self, context: Context) -> Any:
+        self.log.info("Retrieving the metadata for the field '%s'", 
self.field_name)
+        response = self.hook.get_field(field_name=self.field_name)
+        self.log.info("Retrieved field: %s", response["resourceName"])
         return response
 
 
-class GoogleSearchAdsDownloadReportOperator(BaseOperator):
+class GoogleSearchAdsSearchFieldsOperator(_GoogleSearchAdsBaseOperator):
     """
-    Downloads a report to GCS bucket.
+    Retrieve metadata for resource(s) or field(s) by the query syntax.
 
     .. seealso:
         For API documentation check:
-        https://developers.google.com/search-ads/v2/reference/reports/getFile
+        
https://developers.google.com/search-ads/reporting/api/reference/rest/v0/searchAds360Fields/search
 
     .. seealso::
         For more information on how to use this operator, take a look at the 
guide:
-        :ref:`howto/operator:GoogleSearchAdsGetfileReportOperator`
-
-    :param report_id: ID of the report.
-    :param bucket_name: The bucket to upload to.
-    :param report_name: The report name to set when uploading the local file. 
If not provided then
-        report_id is used.
-    :param gzip: Option to compress local file or file data for upload
-    :param api_version: The version of the api that will be requested for 
example 'v3'.
+        :ref:`howto/operator:GoogleSearchAdsSearchFieldsOperator`
+
+    :param query: The query string to execute.
+    :param page_token: Token of the page to retrieve. If not specified, the 
first page of results will be
+        returned. Use the value obtained from `next_page_token` in the 
previous response
+        in order to request the next page of results.
+    :param page_size: Number of elements to retrieve in a single page. When 
too large a page is requested,
+        the server may decide to further limit the number of returned 
resources.
+        Default 10000.
     :param gcp_conn_id: The connection ID to use when fetching connection info.
-    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
-        if any. For this to work, the service account making the request must 
have
-        domain-wide delegation enabled.
-    :param impersonation_chain: Optional service account to impersonate using 
short-term
-        credentials, or chained list of accounts required to get the 
access_token
-        of the last account in the list, which will be impersonated in the 
request.
-        If set as a string, the account must grant the originating account
-        the Service Account Token Creator IAM role.
-        If set as a sequence, the identities from the list must grant
-        Service Account Token Creator IAM role to the directly preceding 
identity, with first
-        account from the list granting this role to the originating account 
(templated).
+    :param api_version: The version of the API that will be requested for 
example 'v0'.
     """
 
     template_fields: Sequence[str] = (
-        "report_name",
-        "report_id",
-        "bucket_name",
-        "impersonation_chain",
+        *_GoogleSearchAdsBaseOperator.template_fields,
+        "page_token",
+        "page_size",
     )
 
     def __init__(
         self,
         *,
-        report_id: str,
-        bucket_name: str,
-        report_name: str | None = None,
-        gzip: bool = True,
-        chunk_size: int = 10 * 1024 * 1024,
-        api_version: str = "v2",
-        gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
-        impersonation_chain: str | Sequence[str] | None = None,
+        query: str,
+        page_token: str | None = None,
+        page_size: int = 10000,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.query = query
+
+        self.page_token = page_token
+        self.page_size = page_size
+
+    def execute(self, context: Context) -> Any:
+        self.log.info("Retrieving the metadata for %s", self.query)
+        response = self.hook.search_fields(
+            query=self.query,
+            page_token=self.page_token,
+            page_size=self.page_size,
+        )
+        self.log.info("Num of fields retrieved, #%d", len(response["results"]))
+        return response
+
+
+class GoogleSearchAdsGetCustomColumnOperator(_GoogleSearchAdsBaseOperator):
+    """
+    Retrieve details of a custom column for the given customer_id and 
campaign_id.
+
+    .. seealso:
+        For API documentation check:
+        
https://developers.google.com/search-ads/reporting/api/reference/rest/v0/customers.customColumns/get
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleSearchAdsGetCustomColumnOperator`
+
+    :param customer_id: The customer ID for the custom column.
+    :param custom_column_id: The ID for the custom column.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param api_version: The version of the API that will be requested for 
example 'v0'.
+    """
+
+    def __init__(
+        self,
+        *,
+        customer_id: str,
+        custom_column_id: str,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
-        self.report_id = report_id
-        self.api_version = api_version
-        self.gcp_conn_id = gcp_conn_id
-        self.delegate_to = delegate_to
-        self.report_id = report_id
-        self.chunk_size = chunk_size
-        self.gzip = gzip
-        self.bucket_name = bucket_name
-        self.report_name = report_name
-        self.impersonation_chain = impersonation_chain
-
-    def _resolve_file_name(self, name: str) -> str:
-        csv = ".csv"
-        gzip = ".gz"
-        if not name.endswith(csv):
-            name += csv
-        if self.gzip:
-            name += gzip
-        return name
-
-    @staticmethod
-    def _set_bucket_name(name: str) -> str:
-        bucket = name if not name.startswith("gs://") else name[5:]
-        return bucket.strip("/")
-
-    @staticmethod
-    def _handle_report_fragment(fragment: bytes) -> bytes:
-        fragment_records = fragment.split(b"\n", 1)
-        if len(fragment_records) > 1:
-            return fragment_records[1]
-        return b""
+        self.customer_id = customer_id
+        self.custom_column_id = custom_column_id
 
     def execute(self, context: Context):
-        hook = GoogleSearchAdsHook(
-            gcp_conn_id=self.gcp_conn_id,
-            delegate_to=self.delegate_to,
-            api_version=self.api_version,
-            impersonation_chain=self.impersonation_chain,
+        self.log.info(
+            "Retrieving the custom column for the customer %s with the id of 
%s",
+            self.customer_id,
+            self.custom_column_id,
         )
-
-        gcs_hook = GCSHook(
-            gcp_conn_id=self.gcp_conn_id,
-            delegate_to=self.delegate_to,
-            impersonation_chain=self.impersonation_chain,
+        response = self.hook.get_custom_column(
+            customer_id=self.customer_id,
+            custom_column_id=self.custom_column_id,
         )
+        self.log.info("Retrieved custom column: %s", response["id"])
+        return response
+
+
+class GoogleSearchAdsListCustomColumnsOperator(_GoogleSearchAdsBaseOperator):
+    """
+    List all custom columns.
+
+    .. seealso:
+        For API documentation check:
+        
https://developers.google.com/search-ads/reporting/api/reference/rest/v0/customers.customColumns/list
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleSearchAdsListCustomColumnsOperator`
 
-        # Resolve file name of the report
-        report_name = self.report_name or self.report_id
-        report_name = self._resolve_file_name(report_name)
-
-        response = hook.get(report_id=self.report_id)
-        if not response["isReportReady"]:
-            raise AirflowException(f"Report {self.report_id} is not ready yet")
-
-        # Resolve report fragments
-        fragments_count = len(response["files"])
-
-        # Download chunks of report's data
-        self.log.info("Downloading Search Ads report %s", self.report_id)
-        with NamedTemporaryFile() as temp_file:
-            for i in range(fragments_count):
-                byte_content = hook.get_file(report_fragment=i, 
report_id=self.report_id)
-                fragment = byte_content if i == 0 else 
self._handle_report_fragment(byte_content)
-                temp_file.write(fragment)
-
-            temp_file.flush()
-
-            bucket_name = self._set_bucket_name(self.bucket_name)
-            gcs_hook.upload(
-                bucket_name=bucket_name,
-                object_name=report_name,
-                gzip=self.gzip,
-                filename=temp_file.name,
-            )
-        self.xcom_push(context, key="file_name", value=report_name)
+    :param customer_id: The customer ID for the custom column.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param api_version: The version of the API that will be requested for 
example 'v0'.
+    """
+
+    def __init__(
+        self,
+        *,
+        customer_id: str,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.customer_id = customer_id
+
+    def execute(self, context: Context):
+        self.log.info("Listing the custom columns for %s", self.customer_id)
+        response = self.hook.list_custom_columns(customer_id=self.customer_id)
+        self.log.info("Num of retrieved custom column: %d", 
len(response.get("customColumns")))
+        return response
diff --git a/airflow/providers/google/marketing_platform/sensors/search_ads.py 
b/airflow/providers/google/marketing_platform/sensors/search_ads.py
deleted file mode 100644
index 6b044f7e2a..0000000000
--- a/airflow/providers/google/marketing_platform/sensors/search_ads.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module contains Google Search Ads sensor."""
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING, Sequence
-
-from airflow.providers.google.marketing_platform.hooks.search_ads import 
GoogleSearchAdsHook
-from airflow.sensors.base import BaseSensorOperator
-
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
-
-class GoogleSearchAdsReportSensor(BaseSensorOperator):
-    """
-    Polls for the status of a report request.
-
-    .. seealso::
-        For API documentation check:
-        https://developers.google.com/search-ads/v2/reference/reports/get
-
-    .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
-        :ref:`howto/operator:GoogleSearchAdsReportSensor`
-
-    :param report_id: ID of the report request being polled.
-    :param api_version: The version of the api that will be requested for 
example 'v3'.
-    :param gcp_conn_id: The connection ID to use when fetching connection info.
-    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
-        if any. For this to work, the service account making the request must 
have
-        domain-wide delegation enabled.
-    :param impersonation_chain: Optional service account to impersonate using 
short-term
-        credentials, or chained list of accounts required to get the 
access_token
-        of the last account in the list, which will be impersonated in the 
request.
-        If set as a string, the account must grant the originating account
-        the Service Account Token Creator IAM role.
-        If set as a sequence, the identities from the list must grant
-        Service Account Token Creator IAM role to the directly preceding 
identity, with first
-        account from the list granting this role to the originating account 
(templated).
-    """
-
-    template_fields: Sequence[str] = (
-        "report_id",
-        "impersonation_chain",
-    )
-
-    def __init__(
-        self,
-        *,
-        report_id: str,
-        api_version: str = "v2",
-        gcp_conn_id: str = "google_cloud_default",
-        delegate_to: str | None = None,
-        mode: str = "reschedule",
-        poke_interval: int = 5 * 60,
-        impersonation_chain: str | Sequence[str] | None = None,
-        **kwargs,
-    ) -> None:
-        super().__init__(mode=mode, poke_interval=poke_interval, **kwargs)
-        self.report_id = report_id
-        self.api_version = api_version
-        self.gcp_conn_id = gcp_conn_id
-        self.delegate_to = delegate_to
-        self.impersonation_chain = impersonation_chain
-
-    def poke(self, context: Context):
-        hook = GoogleSearchAdsHook(
-            gcp_conn_id=self.gcp_conn_id,
-            delegate_to=self.delegate_to,
-            api_version=self.api_version,
-            impersonation_chain=self.impersonation_chain,
-        )
-        self.log.info("Checking status of %s report.", self.report_id)
-        response = hook.get(report_id=self.report_id)
-        return response["isReportReady"]
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index d69b3f8fd6..a28aaa479a 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -754,9 +754,6 @@ sensors:
   - integration-name: Google Display&Video 360
     python-modules:
       - airflow.providers.google.marketing_platform.sensors.display_video
-  - integration-name: Google Search Ads 360
-    python-modules:
-      - airflow.providers.google.marketing_platform.sensors.search_ads
   - integration-name: Google Looker
     python-modules:
       - airflow.providers.google.cloud.sensors.looker
diff --git 
a/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst
 
b/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst
index 22343125d4..928d60f671 100644
--- 
a/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst
+++ 
b/docs/apache-airflow-providers-google/operators/marketing_platform/search_ads.rst
@@ -26,65 +26,94 @@ Prerequisite Tasks
 
 .. include:: /operators/_partials/prerequisite_tasks.rst
 
-.. _howto/operator:GoogleSearchAdsInsertReportOperator:
+.. _howto/operator:GoogleSearchAdsSearchOperator:
 
-Inserting a report
+Querying a report
 ^^^^^^^^^^^^^^^^^^
 
-To insert a Search Ads report use the
-:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsInsertReportOperator`.
+To query a Search Ads report use the
+:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsSearchOperator`.
 
 .. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_search_ads.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_search_ads_generate_report_operator]
-    :end-before: [END howto_search_ads_generate_report_operator]
+    :start-after: [START howto_search_ads_search_query_reports]
+    :end-before: [END howto_search_ads_search_query_reports]
 
 You can use :ref:`Jinja templating <concepts:jinja-templating>` with
-:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsInsertReportOperator`
-parameters which allows you to dynamically determine values. You can provide 
report definition using ``
-.json`` file as this operator supports this template extension.
-The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used 
by other operators:
+:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsSearchOperator`
+parameters which allows you to dynamically determine values.
+
+.. _howto/operator:GoogleSearchAdsGetFieldOperator:
+
+Retrieve a field metadata
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To retrieve metadata of a field use
+:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsGetFieldOperator`.
 
 .. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_search_ads.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_search_ads_get_report_id]
-    :end-before: [END howto_search_ads_get_report_id]
+    :start-after: [START howto_search_ads_get_field]
+    :end-before: [END howto_search_ads_get_field]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsGetFieldOperator`
+parameters which allows you to dynamically determine values.
 
-.. _howto/operator:GoogleSearchAdsReportSensor:
+.. _howto/operator:GoogleSearchAdsSearchFieldsOperator:
 
-Awaiting for a report
-^^^^^^^^^^^^^^^^^^^^^
+Retrieve metadata for multiple fields
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-To wait for a report to be ready for download use
-:class:`~airflow.providers.google.marketing_platform.sensors.search_ads.GoogleSearchAdsReportSensor`.
+To retrieve metadata of multiple fields use the
+:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsSearchFieldsOperator`.
 
 .. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_search_ads.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_search_ads_get_report_operator]
-    :end-before: [END howto_search_ads_get_report_operator]
+    :start-after: [START howto_search_ads_search_fields]
+    :end-before: [END howto_search_ads_search_fields]
 
 You can use :ref:`Jinja templating <concepts:jinja-templating>` with
-:template-fields:`airflow.providers.google.marketing_platform.sensors.search_ads.GoogleSearchAdsReportSensor`
+:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsSearchFieldsOperator`
 parameters which allows you to dynamically determine values.
 
-.. _howto/operator:GoogleSearchAdsGetfileReportOperator:
 
-Downloading a report
-^^^^^^^^^^^^^^^^^^^^
+.. _howto/operator:GoogleSearchAdsGetCustomColumnOperator:
+
+Retrieve a custom column details
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To retrieve details of a custom column use
+:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsGetCustomColumnOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_search_ads.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_search_ads_get_custom_column]
+    :end-before: [END howto_search_ads_get_custom_column]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsGetCustomColumnOperator`
+parameters which allows you to dynamically determine values.
+
+.. _howto/operator:GoogleSearchAdsListCustomColumnsOperator:
+
+Retrieve a custom column details
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To retrieve the list of all custom columns use
 
-To download a Search Ads report to Google Cloud Storage bucket use the
-:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsDownloadReportOperator`.
+:class:`~airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsListCustomColumnsOperator`.
 
 .. exampleinclude:: 
/../../tests/system/providers/google/marketing_platform/example_search_ads.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_search_ads_getfile_report_operator]
-    :end-before: [END howto_search_ads_getfile_report_operator]
+    :start-after: [START howto_search_ads_list_custom_columns]
+    :end-before: [END howto_search_ads_list_custom_columns]
 
 You can use :ref:`Jinja templating <concepts:jinja-templating>` with
-:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsDownloadReportOperator`
+:template-fields:`airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsListCustomColumnsOperator`
 parameters which allows you to dynamically determine values.
-The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used 
by other operators.
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index b846534d8d..1e684888dc 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -411,6 +411,7 @@ class 
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
         
"airflow.providers.google.cloud.operators.dataproc._DataprocStartStopClusterBaseOperator",
         
"airflow.providers.google.cloud.operators.vertex_ai.custom_job.CustomTrainingJobBaseOperator",
         
"airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator",
+        
"airflow.providers.google.marketing_platform.operators.search_ads._GoogleSearchAdsBaseOperator",
     }
 
     MISSING_EXAMPLES_FOR_CLASSES = {
@@ -509,7 +510,6 @@ class 
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
         "GoogleDisplayVideo360GetSDFDownloadOperationSensor",
         "airflow.providers.google.marketing_platform.sensors.display_video."
         "GoogleDisplayVideo360ReportSensor",
-        
"airflow.providers.google.marketing_platform.sensors.search_ads.GoogleSearchAdsReportSensor",
     }
 
     @pytest.mark.xfail(reason="We did not reach full coverage yet")
diff --git a/tests/providers/google/marketing_platform/hooks/test_search_ads.py 
b/tests/providers/google/marketing_platform/hooks/test_search_ads.py
index 1e0205a956..1fc08237f1 100644
--- a/tests/providers/google/marketing_platform/hooks/test_search_ads.py
+++ b/tests/providers/google/marketing_platform/hooks/test_search_ads.py
@@ -19,75 +19,176 @@ from __future__ import annotations
 
 from unittest import mock
 
-from airflow.providers.google.marketing_platform.hooks.search_ads import 
GoogleSearchAdsHook
+import pytest
+
+from airflow.providers.google.marketing_platform.hooks.search_ads import (
+    GoogleSearchAdsHook,
+    GoogleSearchAdsReportingHook,
+)
 from tests.providers.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_default_project_id
 
-API_VERSION = "v2"
 GCP_CONN_ID = "google_cloud_default"
+API_VERSION = "v0"
+CUSTOMER_ID = "customer_id"
+QUERY = "SELECT * FROM campaigns WHERE segments.date DURING LAST_30_DAYS"
 
 
-class TestSearchAdsHook:
+class TestGoogleSearchAdsReportingHook:
     def setup_method(self):
         with mock.patch(
             
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleBaseHook.__init__",
             new=mock_base_gcp_hook_default_project_id,
         ):
-            self.hook = GoogleSearchAdsHook(gcp_conn_id=GCP_CONN_ID)
+            self.hook = GoogleSearchAdsReportingHook(gcp_conn_id=GCP_CONN_ID)
 
-    
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsHook._authorize")
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.get_credentials"
+    )
     
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.build")
-    def test_gen_conn(self, mock_build, mock_authorize):
+    def test_gen_conn(self, mock_build, mock_get_credentials):
         result = self.hook.get_conn()
         mock_build.assert_called_once_with(
-            "doubleclicksearch",
+            "searchads360",
             API_VERSION,
-            http=mock_authorize.return_value,
+            credentials=mock_get_credentials.return_value,
             cache_discovery=False,
         )
         assert mock_build.return_value == result
 
-    
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsHook.get_conn")
-    def test_insert(self, get_conn_mock):
-        report = {"report": "test"}
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.customer_service"
+    )
+    @pytest.mark.parametrize(
+        "given_args, expected_args_extras",
+        [
+            ({"page_token": None}, {}),
+            ({"page_token": "next_page_token"}, {"pageToken": 
"next_page_token"}),
+            ({"summary_row_setting": "summary line content"}, 
{"summaryRowSetting": "summary line content"}),
+            ({"page_size": 10, "validate_only": True}, {"pageSize": 10, 
"validateOnly": True}),
+        ],
+    )
+    def test_search(self, customer_service_mock, given_args, 
expected_args_extras):
+        return_value = {"results": [{"x": 1}]}
+        (
+            
customer_service_mock.searchAds360.return_value.search.return_value.execute
+        ).return_value = return_value
+
+        result = self.hook.search(customer_id=CUSTOMER_ID, query=QUERY, 
**given_args)
+
+        expected_args = {
+            "customerId": CUSTOMER_ID,
+            "body": {
+                "query": QUERY,
+                "pageSize": 10000,
+                "returnTotalResultsCount": False,
+                "validateOnly": False,
+                **expected_args_extras,
+            },
+        }
+        
customer_service_mock.searchAds360.return_value.search.assert_called_once_with(**expected_args)
 
-        return_value = "TEST"
-        
get_conn_mock.return_value.reports.return_value.request.return_value.execute.return_value
 = (
-            return_value
-        )
+        assert return_value == result
 
-        result = self.hook.insert_report(report=report)
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.customer_service"
+    )
+    def test_get_custom_column(self, customer_service_mock):
+        custom_column_id = "custom_column_id"
+        return_value = {"resourceName": 1}
+        (
+            
customer_service_mock.customColumns.return_value.get.return_value.execute
+        ).return_value = return_value
 
-        
get_conn_mock.return_value.reports.return_value.request.assert_called_once_with(body=report)
+        result = self.hook.get_custom_column(customer_id=CUSTOMER_ID, 
custom_column_id=custom_column_id)
 
-        assert return_value == result
+        
customer_service_mock.customColumns.return_value.get.assert_called_once_with(
+            
resourceName=f"customers/{CUSTOMER_ID}/customColumns/{custom_column_id}"
+        )
 
-    
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsHook.get_conn")
-    def test_get(self, get_conn_mock):
-        report_id = "REPORT_ID"
+        assert return_value == result
 
-        return_value = "TEST"
-        
get_conn_mock.return_value.reports.return_value.get.return_value.execute.return_value
 = return_value
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.customer_service"
+    )
+    def test_list_custom_columns(self, customer_service_mock):
+        return_value = {
+            "results": [
+                {"resourceName": 
f"customers/{CUSTOMER_ID}/customColumns/col1"},
+                {"resourceName": 
f"customers/{CUSTOMER_ID}/customColumns/col2"},
+            ]
+        }
+        (
+            
customer_service_mock.customColumns.return_value.list.return_value.execute
+        ).return_value = return_value
 
-        result = self.hook.get(report_id=report_id)
+        result = self.hook.list_custom_columns(customer_id=CUSTOMER_ID)
 
-        
get_conn_mock.return_value.reports.return_value.get.assert_called_once_with(reportId=report_id)
+        
customer_service_mock.customColumns.return_value.list.assert_called_once_with(customerId=CUSTOMER_ID)
 
         assert return_value == result
 
-    
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsHook.get_conn")
-    def test_get_file(self, get_conn_mock):
-        report_fragment = 42
-        report_id = "REPORT_ID"
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.fields_service"
+    )
+    def test_get_field(self, fields_service_mock):
+        field_name = "field_name"
+        return_value = {
+            "name": "Field 1",
+            "resourceName": 
f"customers/{CUSTOMER_ID}/searchAds360Fields/field1",
+        }
+        fields_service_mock.get.return_value.execute.return_value = 
return_value
 
-        return_value = "TEST"
-        
get_conn_mock.return_value.reports.return_value.getFile.return_value.execute.return_value
 = (
-            return_value
-        )
+        result = self.hook.get_field(field_name=field_name)
 
-        result = self.hook.get_file(report_fragment=report_fragment, 
report_id=report_id)
+        
fields_service_mock.get.assert_called_once_with(resourceName=f"searchAds360Fields/{field_name}")
 
-        
get_conn_mock.return_value.reports.return_value.getFile.assert_called_once_with(
-            reportFragment=report_fragment, reportId=report_id
-        )
+        assert return_value == result
+
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsReportingHook.fields_service"
+    )
+    @pytest.mark.parametrize(
+        "given_args, expected_args_extras",
+        [
+            ({"page_token": None}, {}),
+            ({"page_token": "next_page_token"}, {"pageToken": 
"next_page_token"}),
+            ({"page_size": 10}, {"pageSize": 10}),
+        ],
+    )
+    def test_search_fields(self, fields_service_mock, given_args, 
expected_args_extras):
+        query = "SELECT field1, field2 FROM campaigns;"
+        return_value = {
+            "results": [
+                {"name": "Field 1", "resourceName": 
f"customers/{CUSTOMER_ID}/searchAds360Fields/field1"},
+                {"name": "Field 2", "resourceName": 
f"customers/{CUSTOMER_ID}/searchAds360Fields/field2"},
+            ]
+        }
+        fields_service_mock.search.return_value.execute.return_value = 
return_value
+
+        result = self.hook.search_fields(query=query, **given_args)
+
+        expected_args = {"query": query, "pageSize": 10000, 
**expected_args_extras}
+        fields_service_mock.search.assert_called_once_with(body=expected_args)
 
         assert return_value == result
+
+
+class TestSearchAdsHook:
+    def setup_method(self):
+        with mock.patch(
+            
"airflow.providers.google.marketing_platform.hooks.search_ads.GoogleBaseHook.__init__",
+            new=mock_base_gcp_hook_default_project_id,
+        ):
+            self.hook = GoogleSearchAdsHook(gcp_conn_id=GCP_CONN_ID)
+
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.GoogleSearchAdsHook._authorize")
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.search_ads.build")
+    def test_gen_conn(self, mock_build, mock_authorize):
+        result = self.hook.get_conn()
+        mock_build.assert_called_once_with(
+            "doubleclicksearch",
+            "v2",
+            http=mock_authorize.return_value,
+            cache_discovery=False,
+        )
+        assert mock_build.return_value == result
diff --git 
a/tests/providers/google/marketing_platform/operators/test_search_ads.py 
b/tests/providers/google/marketing_platform/operators/test_search_ads.py
index ef0b59ac07..80cc0d26fe 100644
--- a/tests/providers/google/marketing_platform/operators/test_search_ads.py
+++ b/tests/providers/google/marketing_platform/operators/test_search_ads.py
@@ -17,160 +17,158 @@
 # under the License.
 from __future__ import annotations
 
-import json
-from tempfile import NamedTemporaryFile
 from unittest import mock
 
-import pytest
-
-from airflow.models import DAG, TaskInstance as TI
 from airflow.providers.google.marketing_platform.operators.search_ads import (
-    GoogleSearchAdsDownloadReportOperator,
-    GoogleSearchAdsInsertReportOperator,
+    GoogleSearchAdsGetCustomColumnOperator,
+    GoogleSearchAdsGetFieldOperator,
+    GoogleSearchAdsListCustomColumnsOperator,
+    GoogleSearchAdsSearchFieldsOperator,
+    GoogleSearchAdsSearchOperator,
 )
-from airflow.utils import timezone
-from airflow.utils.session import create_session
-
-API_VERSION = "api_version"
-GCP_CONN_ID = "google_cloud_default"
 
-DEFAULT_DATE = timezone.datetime(2021, 1, 1)
-END_DATE = timezone.datetime(2021, 1, 2)
-REPORT_ID = "report_id"
-BUCKET_NAME = "test_bucket"
-REPORT_NAME = "test_report.csv"
-FILE_NAME = "test"
+GCP_CONN_ID = "google_search_ads_default"
+API_VERSION = "v0"
+CUSTOMER_ID = "customer_id"
 
 
-class TestGoogleSearchAdsInsertReportOperator:
-    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
-    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+class TestGoogleSearchAdsSearchOperator:
     @mock.patch(
-        "airflow.providers.google.marketing_platform."
-        "operators.search_ads.GoogleSearchAdsInsertReportOperator.xcom_push"
+        
"airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsReportingHook"
     )
-    def test_execute(self, xcom_mock, mock_base_op, hook_mock):
-        report = {"report": "test"}
-        hook_mock.return_value.insert_report.return_value = {"id": REPORT_ID}
-        op = GoogleSearchAdsInsertReportOperator(report=report, 
api_version=API_VERSION, task_id="test_task")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+    def test_execute(self, mock_base_op, hook_mock):
+        query = "SELECT * FROM campaigns WHERE segments.date DURING 
LAST_30_DAYS"
+        hook_mock.return_value.search.return_value = {"results": []}
+        op = GoogleSearchAdsSearchOperator(
+            customer_id=CUSTOMER_ID,
+            query=query,
+            api_version=API_VERSION,
+            task_id="test_task",
+        )
         op.execute(context=None)
         hook_mock.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
-            delegate_to=None,
-            api_version=API_VERSION,
-            impersonation_chain=None,
+            api_version="v0",
+        )
+        hook_mock.return_value.search.assert_called_once_with(
+            customer_id=CUSTOMER_ID,
+            query=query,
+            page_size=10000,
+            page_token=None,
+            return_total_results_count=False,
+            summary_row_setting=None,
+            validate_only=False,
         )
-        
hook_mock.return_value.insert_report.assert_called_once_with(report=report)
-        xcom_mock.assert_called_once_with(None, key="report_id", 
value=REPORT_ID)
-
-    def test_prepare_template(self):
-        report = {"key": "value"}
-        with NamedTemporaryFile("w+", suffix=".json") as f:
-            f.write(json.dumps(report))
-            f.flush()
-            op = GoogleSearchAdsInsertReportOperator(
-                report=report, api_version=API_VERSION, task_id="test_task"
-            )
-            op.prepare_template()
-
-        assert isinstance(op.report, dict)
-        assert op.report == report
-
-
[email protected]_if_database_isolation_mode
[email protected]_test
-class TestGoogleSearchAdsDownloadReportOperator:
-    def setup_method(self):
-        with create_session() as session:
-            session.query(TI).delete()
 
-    def teardown_method(self):
-        with create_session() as session:
-            session.query(TI).delete()
 
-    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.NamedTemporaryFile")
-    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GCSHook")
-    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
+class TestGoogleSearchAdsGetFieldOperator:
     @mock.patch(
-        "airflow.providers.google.marketing_platform."
-        "operators.search_ads.GoogleSearchAdsDownloadReportOperator.xcom_push"
+        
"airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsReportingHook"
     )
-    def test_execute(self, xcom_mock, hook_mock, gcs_hook_mock, tempfile_mock):
-        temp_file_name = "TEMP"
-        data = b"data"
-
-        hook_mock.return_value.get.return_value = {"files": [0], 
"isReportReady": True}
-        hook_mock.return_value.get_file.return_value = data
-        tempfile_mock.return_value.__enter__.return_value.name = temp_file_name
-
-        op = GoogleSearchAdsDownloadReportOperator(
-            report_id=REPORT_ID,
-            report_name=FILE_NAME,
-            bucket_name=BUCKET_NAME,
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+    def test_execute(self, mock_base_op, hook_mock):
+        field_name = "the_field"
+        hook_mock.return_value.get_field.return_value = {
+            "name": field_name,
+            "resourceName": f"searchAds360Fields/{field_name}",
+        }
+        op = GoogleSearchAdsGetFieldOperator(
+            field_name=field_name,
             api_version=API_VERSION,
             task_id="test_task",
         )
         op.execute(context=None)
         hook_mock.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
-            delegate_to=None,
-            api_version=API_VERSION,
-            impersonation_chain=None,
+            api_version="v0",
         )
-        
hook_mock.return_value.get_file.assert_called_once_with(report_fragment=0, 
report_id=REPORT_ID)
-        
tempfile_mock.return_value.__enter__.return_value.write.assert_called_once_with(data)
-        gcs_hook_mock.return_value.upload.assert_called_once_with(
-            bucket_name=BUCKET_NAME,
-            gzip=True,
-            object_name=FILE_NAME + ".csv.gz",
-            filename=temp_file_name,
+        hook_mock.return_value.get_field.assert_called_once_with(
+            field_name=field_name,
         )
-        xcom_mock.assert_called_once_with(None, key="file_name", 
value=FILE_NAME + ".csv.gz")
-
-    @pytest.mark.parametrize(
-        "test_bucket_name",
-        [BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{ 
ti.xcom_pull(task_ids='f') }}"],
-    )
-    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.NamedTemporaryFile")
-    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GCSHook")
-    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsHook")
-    def test_set_bucket_name(self, hook_mock, gcs_hook_mock, tempfile_mock, 
test_bucket_name):
-        temp_file_name = "TEMP"
-        data = b"data"
 
-        hook_mock.return_value.get.return_value = {"files": [0], 
"isReportReady": True}
-        hook_mock.return_value.get_file.return_value = data
-        tempfile_mock.return_value.__enter__.return_value.name = temp_file_name
 
-        dag = DAG(
-            dag_id="test_set_bucket_name",
-            start_date=DEFAULT_DATE,
-            schedule=None,
-            catchup=False,
+class TestGoogleSearchAdsSearchFieldsOperator:
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsReportingHook"
+    )
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+    def test_execute(self, mock_base_op, hook_mock):
+        field_name = "the_field"
+        query = (
+            "SELECT "
+            "  name, category, selectable, filterable, sortable, 
selectable_with, data_type, "
+            "  is_repeated "
+            "WHERE "
+            "  name LIKE 'ad_group.%'"
+        )
+        hook_mock.return_value.search_fields.return_value = {
+            "results": [{"name": field_name, "resource_name": 
f"searchAds360Fields/{field_name}"}]
+        }
+        op = GoogleSearchAdsSearchFieldsOperator(
+            query=query,
+            api_version=API_VERSION,
+            task_id="test_task",
+        )
+        op.execute(context=None)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version="v0",
+        )
+        hook_mock.return_value.search_fields.assert_called_once_with(
+            query=query,
+            page_token=None,
+            page_size=10000,
         )
 
-        if BUCKET_NAME not in test_bucket_name:
 
-            @dag.task
-            def f():
-                return BUCKET_NAME
+class TestGoogleSearchAdsGetCustomColumnOperator:
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsReportingHook"
+    )
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+    def test_execute(self, mock_base_op, hook_mock):
+        custom_column_id = "custom_column_id"
+        hook_mock.return_value.get_custom_column.return_value = {"id": 
custom_column_id}
+        op = GoogleSearchAdsGetCustomColumnOperator(
+            customer_id=CUSTOMER_ID,
+            custom_column_id=custom_column_id,
+            api_version=API_VERSION,
+            task_id="test_task",
+        )
+        op.execute(context=None)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version="v0",
+        )
+        hook_mock.return_value.get_custom_column.assert_called_once_with(
+            customer_id=CUSTOMER_ID,
+            custom_column_id=custom_column_id,
+        )
 
-            taskflow_op = f()
-            taskflow_op.operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE)
 
-        op = GoogleSearchAdsDownloadReportOperator(
-            report_id=REPORT_ID,
-            report_name=FILE_NAME,
-            bucket_name=test_bucket_name if test_bucket_name != "XComArg" else 
taskflow_op,
+class TestGoogleSearchAdsListCustomColumnsOperator:
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.operators.search_ads.GoogleSearchAdsReportingHook"
+    )
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.search_ads.BaseOperator")
+    def test_execute(self, mock_base_op, hook_mock):
+        customs_columns = [
+            {"id": "custom_column_id_1"},
+            {"id": "custom_column_id_2"},
+            {"id": "custom_column_id_3"},
+        ]
+        hook_mock.return_value.list_custom_columns.return_value = 
{"customColumns": customs_columns}
+        op = GoogleSearchAdsListCustomColumnsOperator(
+            customer_id=CUSTOMER_ID,
             api_version=API_VERSION,
             task_id="test_task",
-            dag=dag,
         )
-        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-        gcs_hook_mock.return_value.upload.assert_called_once_with(
-            bucket_name=BUCKET_NAME,
-            gzip=True,
-            object_name=FILE_NAME + ".csv.gz",
-            filename=temp_file_name,
+        op.execute(context=None)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version="v0",
+        )
+        hook_mock.return_value.list_custom_columns.assert_called_once_with(
+            customer_id=CUSTOMER_ID,
         )
diff --git 
a/tests/providers/google/marketing_platform/sensors/test_search_ads.py 
b/tests/providers/google/marketing_platform/sensors/test_search_ads.py
deleted file mode 100644
index e03b64a3c6..0000000000
--- a/tests/providers/google/marketing_platform/sensors/test_search_ads.py
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from unittest import mock
-
-import pytest
-
-from airflow.providers.google.marketing_platform.sensors.search_ads import 
GoogleSearchAdsReportSensor
-
-pytestmark = pytest.mark.db_test
-
-API_VERSION = "api_version"
-GCP_CONN_ID = "google_cloud_default"
-
-
-class TestSearchAdsReportSensor:
-    
@mock.patch("airflow.providers.google.marketing_platform.sensors.search_ads.GoogleSearchAdsHook")
-    
@mock.patch("airflow.providers.google.marketing_platform.sensors.search_ads.BaseSensorOperator")
-    def test_poke(self, mock_base_op, hook_mock):
-        report_id = "REPORT_ID"
-        op = GoogleSearchAdsReportSensor(report_id=report_id, 
api_version=API_VERSION, task_id="test_task")
-        op.poke(context=None)
-        hook_mock.assert_called_once_with(
-            gcp_conn_id=GCP_CONN_ID,
-            delegate_to=None,
-            api_version=API_VERSION,
-            impersonation_chain=None,
-        )
-        hook_mock.return_value.get.assert_called_once_with(report_id=report_id)
diff --git 
a/tests/system/providers/google/marketing_platform/example_search_ads.py 
b/tests/system/providers/google/marketing_platform/example_search_ads.py
index d1e34c35ad..ba531dc964 100644
--- a/tests/system/providers/google/marketing_platform/example_search_ads.py
+++ b/tests/system/providers/google/marketing_platform/example_search_ads.py
@@ -23,65 +23,83 @@ from __future__ import annotations
 
 import os
 from datetime import datetime
-from typing import cast
 
 from airflow.models.dag import DAG
-from airflow.models.xcom_arg import XComArg
 from airflow.providers.google.marketing_platform.operators.search_ads import (
-    GoogleSearchAdsDownloadReportOperator,
-    GoogleSearchAdsInsertReportOperator,
+    GoogleSearchAdsGetCustomColumnOperator,
+    GoogleSearchAdsGetFieldOperator,
+    GoogleSearchAdsListCustomColumnsOperator,
+    GoogleSearchAdsSearchFieldsOperator,
+    GoogleSearchAdsSearchOperator,
 )
-from airflow.providers.google.marketing_platform.sensors.search_ads import 
GoogleSearchAdsReportSensor
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_search_ads"
+DAG_ID = "search_ads"
 
 # [START howto_search_ads_env_variables]
-AGENCY_ID = os.environ.get("GMP_AGENCY_ID")
-ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID")
-GCS_BUCKET = os.environ.get("GMP_GCS_BUCKET", "test-cm-bucket")
-
-REPORT = {
-    "reportScope": {"agencyId": AGENCY_ID, "advertiserId": ADVERTISER_ID},
-    "reportType": "account",
-    "columns": [{"columnName": "agency"}, {"columnName": 
"lastModifiedTimestamp"}],
-    "includeRemovedEntities": False,
-    "statisticsCurrency": "usd",
-    "maxRowsPerFile": 1000000,
-    "downloadFormat": "csv",
-}
+CUSTOMER_ID: str = os.environ.get("GSA_CUSTOMER_ID", default="")
+QUERY = """
+    SELECT
+        campaign.name,
+        campaign.id,
+        campaign.status
+    FROM campaign;
+"""
+FIELD_NAME: str = os.environ.get("GSA_FIELD_NAME", default="")
+SEARCH_FIELDS_QUERY: str = """
+    SELECT
+        f1,
+        f2
+    FROM t1;
+"""
+CUSTOM_COLUMN_ID: str = os.environ.get("GSA_CUSTOM_COLUMN_ID", default="")
 # [END howto_search_ads_env_variables]
 
 with DAG(
-    DAG_ID,
+    dag_id=DAG_ID,
     schedule="@once",  # Override to match your needs,
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example"],
+    tags=["search", "search-ads", "ads"],
 ) as dag:
-    # [START howto_search_ads_generate_report_operator]
-    generate_report = GoogleSearchAdsInsertReportOperator(report=REPORT, 
task_id="generate_report")
-    # [END howto_search_ads_generate_report_operator]
+    # [START howto_search_ads_search_query_reports]
+    query_report = GoogleSearchAdsSearchOperator(
+        task_id="query_report",
+        customer_id=CUSTOMER_ID,
+        query=QUERY,
+    )
+    # [END howto_search_ads_search_query_reports]
 
-    # [START howto_search_ads_get_report_id]
-    report_id = cast(str, XComArg(generate_report, key="report_id"))
-    # [END howto_search_ads_get_report_id]
+    # [START howto_search_ads_get_field]
+    get_field = GoogleSearchAdsGetFieldOperator(
+        task_id="get_field",
+        field_name=FIELD_NAME,
+    )
+    # [END howto_search_ads_get_field]
 
-    # [START howto_search_ads_get_report_operator]
-    wait_for_report = GoogleSearchAdsReportSensor(report_id=report_id, 
task_id="wait_for_report")
-    # [END howto_search_ads_get_report_operator]
+    # [START howto_search_ads_search_fields]
+    search_fields = GoogleSearchAdsSearchFieldsOperator(
+        task_id="search_fields",
+        query=SEARCH_FIELDS_QUERY,
+    )
+    # [END howto_search_ads_search_fields]
 
-    # [START howto_search_ads_getfile_report_operator]
-    download_report = GoogleSearchAdsDownloadReportOperator(
-        report_id=report_id, bucket_name=GCS_BUCKET, task_id="download_report"
+    # [START howto_search_ads_get_custom_column]
+    get_custom_column = GoogleSearchAdsGetCustomColumnOperator(
+        task_id="get_custom_column",
+        customer_id=CUSTOMER_ID,
+        custom_column_id=CUSTOM_COLUMN_ID,
     )
-    # [END howto_search_ads_getfile_report_operator]
+    # [END howto_search_ads_get_custom_column]
 
-    wait_for_report >> download_report
+    # [START howto_search_ads_list_custom_columns]
+    list_custom_columns = GoogleSearchAdsListCustomColumnsOperator(
+        task_id="list_custom_columns",
+        customer_id=CUSTOMER_ID,
+    )
+    # [END howto_search_ads_list_custom_columns]
 
-    # Task dependencies created via `XComArgs`:
-    #   generate_report >> wait_for_report
-    #   generate_report >> download_report
+    (query_report >> get_field >> search_fields >> get_custom_column >> 
list_custom_columns)
 
 
 from tests.system.utils import get_test_run  # noqa: E402

Reply via email to