This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 71db47a73d Update DV360 operators to use API v2 (#30326)
71db47a73d is described below

commit 71db47a73d741015d8ffeaa2276635f19d51f8e7
Author: Ɓukasz Wyszomirski <[email protected]>
AuthorDate: Sun Apr 9 12:15:42 2023 +0200

    Update DV360 operators to use API v2 (#30326)
    
    * Update DV360 operators to use API v2
    
    * Update display_video.rst
    
    * fixup! Update display_video.rst
    
    * fixup! Update display_video.rst
    
    ---------
    
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 airflow/providers/google/CHANGELOG.rst             |  10 +-
 .../example_dags/example_display_video.py          |  69 ++++-
 .../marketing_platform/hooks/display_video.py      |  59 ++++-
 .../marketing_platform/operators/display_video.py  | 294 ++++++++++++++++++++-
 .../marketing_platform/sensors/display_video.py    |  72 ++++-
 airflow/providers/google/provider.yaml             |   2 +-
 .../operators/marketing_platform/display_video.rst |  68 +++++
 docs/spelling_wordlist.txt                         |   6 +
 scripts/in_container/verify_providers.py           |   4 +
 tests/always/test_project_structure.py             |   4 +
 .../marketing_platform/hooks/test_display_video.py | 140 +++++++++-
 .../operators/test_display_video.py                | 200 +++++++++++++-
 .../sensors/test_display_video.py                  |  18 ++
 13 files changed, 928 insertions(+), 18 deletions(-)

diff --git a/airflow/providers/google/CHANGELOG.rst 
b/airflow/providers/google/CHANGELOG.rst
index 599958ab78..eebf2b2b7d 100644
--- a/airflow/providers/google/CHANGELOG.rst
+++ b/airflow/providers/google/CHANGELOG.rst
@@ -23,8 +23,14 @@
 Changelog
 ---------
 
-8.12.1
-......
+9.0.0
+.....
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+Google  announced sunset of Bid manager API v1 and v1.1 by April 27, 2023 for 
more information
+please check: `docs <https://developers.google.com/bid-manager/v1.1>`_  As a 
result default value of api_version in GoogleDisplayVideo360Hook and related 
operators updated to v2
 
 This version of provider contains a temporary workaround to issue with ``v11`` 
version of
 google-ads API being discontinued, while the google provider dependencies 
preventing installing
diff --git 
a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
 
b/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
index dc9325e95e..7e6c3c38ac 100644
--- 
a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+++ 
b/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
@@ -29,11 +29,14 @@ from airflow.models.xcom_arg import XComArg
 from airflow.providers.google.cloud.transfers.gcs_to_bigquery import 
GCSToBigQueryOperator
 from airflow.providers.google.marketing_platform.hooks.display_video import 
GoogleDisplayVideo360Hook
 from airflow.providers.google.marketing_platform.operators.display_video 
import (
+    GoogleDisplayVideo360CreateQueryOperator,
     GoogleDisplayVideo360CreateReportOperator,
     GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
     GoogleDisplayVideo360DeleteReportOperator,
     GoogleDisplayVideo360DownloadLineItemsOperator,
     GoogleDisplayVideo360DownloadReportOperator,
+    GoogleDisplayVideo360DownloadReportV2Operator,
+    GoogleDisplayVideo360RunQueryOperator,
     GoogleDisplayVideo360RunReportOperator,
     GoogleDisplayVideo360SDFtoGCSOperator,
     GoogleDisplayVideo360UploadLineItemsOperator,
@@ -41,6 +44,7 @@ from 
airflow.providers.google.marketing_platform.operators.display_video import
 from airflow.providers.google.marketing_platform.sensors.display_video import (
     GoogleDisplayVideo360GetSDFDownloadOperationSensor,
     GoogleDisplayVideo360ReportSensor,
+    GoogleDisplayVideo360RunQuerySensor,
 )
 
 # [START howto_display_video_env_variables]
@@ -50,7 +54,7 @@ OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", 
"files/report.csv")
 PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", 
"test-gcs-example.txt")
 PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", 
"test-gcs-example-download.txt")
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
-SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
+SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_5")
 BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")
 GMP_PARTNER_ID = os.environ.get("GMP_PARTNER_ID", 123)
 ENTITY_TYPE = os.environ.get("GMP_ENTITY_TYPE", "LineItem")
@@ -74,7 +78,25 @@ REPORT = {
     "schedule": {"frequency": "ONE_TIME"},
 }
 
-PARAMETERS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
+REPORT_V2 = {
+    "metadata": {
+        "title": "Airflow Test Report",
+        "dataRange": {"range": "LAST_7_DAYS"},
+        "format": "CSV",
+        "sendNotification": False,
+    },
+    "params": {
+        "type": "STANDARD",
+        "groupBys": ["FILTER_DATE", "FILTER_PARTNER"],
+        "filters": [{"type": "FILTER_PARTNER", "value": ADVERTISER_ID}],
+        "metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"],
+    },
+    "schedule": {"frequency": "ONE_TIME"},
+}
+
+PARAMETERS = {
+    "dataRange": {"range": "LAST_7_DAYS"},
+}
 
 CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST: dict = {
     "version": SDF_VERSION,
@@ -209,3 +231,46 @@ with models.DAG(
 
     # Task dependency created via `XComArgs`:
     #   save_sdf_in_gcs >> upload_sdf_to_big_query
+
+with models.DAG(
+    "example_display_video_v2",
+    start_date=START_DATE,
+    catchup=False,
+) as dag:
+    # [START howto_google_display_video_create_query_operator]
+    create_query_v2 = GoogleDisplayVideo360CreateQueryOperator(body=REPORT_V2, 
task_id="create_query")
+
+    query_id = cast(str, XComArg(create_query_v2, key="query_id"))
+    # [END howto_google_display_video_create_query_operator]
+
+    # [START howto_google_display_video_run_query_report_operator]
+    run_query_v2 = GoogleDisplayVideo360RunQueryOperator(
+        query_id=query_id, parameters=PARAMETERS, task_id="run_report"
+    )
+
+    query_id = cast(str, XComArg(run_query_v2, key="query_id"))
+    report_id = cast(str, XComArg(run_query_v2, key="report_id"))
+    # [END howto_google_display_video_run_query_report_operator]
+
+    # [START howto_google_display_video_wait_run_query_sensor]
+    wait_for_query = GoogleDisplayVideo360RunQuerySensor(
+        task_id="wait_for_query",
+        query_id=query_id,
+        report_id=report_id,
+    )
+    # [END howto_google_display_video_wait_run_query_sensor]
+
+    # [START howto_google_display_video_get_report_operator]
+    get_report_v2 = GoogleDisplayVideo360DownloadReportV2Operator(
+        query_id=query_id,
+        report_id=report_id,
+        task_id="get_report",
+        bucket_name=BUCKET,
+        report_name="test1.csv",
+    )
+    # # [END howto_google_display_video_get_report_operator]
+    # # [START howto_google_display_video_delete_query_report_operator]
+    delete_report_v2 = 
GoogleDisplayVideo360DeleteReportOperator(report_id=report_id, 
task_id="delete_report")
+    # # [END howto_google_display_video_delete_query_report_operator]
+
+    create_query_v2 >> run_query_v2 >> wait_for_query >> get_report_v2 >> 
delete_report_v2
diff --git a/airflow/providers/google/marketing_platform/hooks/display_video.py 
b/airflow/providers/google/marketing_platform/hooks/display_video.py
index 05bba7a8ce..9422426fbe 100644
--- a/airflow/providers/google/marketing_platform/hooks/display_video.py
+++ b/airflow/providers/google/marketing_platform/hooks/display_video.py
@@ -18,6 +18,7 @@
 """This module contains Google DisplayVideo hook."""
 from __future__ import annotations
 
+import warnings
 from typing import Any, Sequence
 
 from googleapiclient.discovery import Resource, build
@@ -32,7 +33,7 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
 
     def __init__(
         self,
-        api_version: str = "v1",
+        api_version: str = "v2",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
@@ -42,6 +43,11 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
             delegate_to=delegate_to,
             impersonation_chain=impersonation_chain,
         )
+        if api_version in ["v1", "v1.1"]:
+            warnings.warn(
+                f"API {api_version} is deprecated and shortly will be removed 
please use v2",
+                DeprecationWarning,
+            )
         self.api_version = api_version
 
     def get_conn(self) -> Resource:
@@ -93,7 +99,10 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
 
         :param query: Query object to be passed to request body.
         """
-        response = 
self.get_conn().queries().createquery(body=query).execute(num_retries=self.num_retries)
+        if self.api_version in ["v1", "v1.1"]:
+            response = 
self.get_conn().queries().createquery(body=query).execute(num_retries=self.num_retries)
+        else:
+            response = 
self.get_conn().queries().create(body=query).execute(num_retries=self.num_retries)
         return response
 
     def delete_query(self, query_id: str) -> None:
@@ -102,7 +111,10 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
 
         :param query_id: Query ID to delete.
         """
-        
(self.get_conn().queries().deletequery(queryId=query_id).execute(num_retries=self.num_retries))
+        if self.api_version in ["v1", "v1.1"]:
+            
self.get_conn().queries().deletequery(queryId=query_id).execute(num_retries=self.num_retries)
+        else:
+            
self.get_conn().queries().delete(queryId=query_id).execute(num_retries=self.num_retries)
 
     def get_query(self, query_id: str) -> dict:
         """
@@ -110,25 +122,56 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
 
         :param query_id: Query ID to retrieve.
         """
-        response = 
self.get_conn().queries().getquery(queryId=query_id).execute(num_retries=self.num_retries)
+        if self.api_version in ["v1", "v1.1"]:
+            response = (
+                
self.get_conn().queries().getquery(queryId=query_id).execute(num_retries=self.num_retries)
+            )
+        else:
+            response = 
self.get_conn().queries().get(queryId=query_id).execute(num_retries=self.num_retries)
         return response
 
     def list_queries(self) -> list[dict]:
         """Retrieves stored queries."""
-        response = 
self.get_conn().queries().listqueries().execute(num_retries=self.num_retries)
+        if self.api_version in ["v1", "v1.1"]:
+            response = 
self.get_conn().queries().listqueries().execute(num_retries=self.num_retries)
+        else:
+            response = 
self.get_conn().queries().list().execute(num_retries=self.num_retries)
         return response.get("queries", [])
 
-    def run_query(self, query_id: str, params: dict[str, Any] | None) -> None:
+    def run_query(self, query_id: str, params: dict[str, Any] | None) -> dict:
         """
         Runs a stored query to generate a report.
 
         :param query_id: Query ID to run.
         :param params: Parameters for the report.
         """
-        (
+        if self.api_version in ["v1", "v1.1"]:
+            return (
+                self.get_conn()
+                .queries()
+                .runquery(queryId=query_id, body=params)
+                .execute(num_retries=self.num_retries)
+            )
+        else:
+            return (
+                self.get_conn()
+                .queries()
+                .run(queryId=query_id, body=params)
+                .execute(num_retries=self.num_retries)
+            )
+
+    def get_report(self, query_id: str, report_id: str) -> dict:
+        """
+        Retrieves a report.
+
+        :param query_id: Query ID for which report was generated.
+        :param report_id: Report ID to retrieve.
+        """
+        return (
             self.get_conn()
             .queries()
-            .runquery(queryId=query_id, body=params)
+            .reports()
+            .get(queryId=query_id, reportId=report_id)
             .execute(num_retries=self.num_retries)
         )
 
diff --git 
a/airflow/providers/google/marketing_platform/operators/display_video.py 
b/airflow/providers/google/marketing_platform/operators/display_video.py
index d33ffb46d7..738d734bad 100644
--- a/airflow/providers/google/marketing_platform/operators/display_video.py
+++ b/airflow/providers/google/marketing_platform/operators/display_video.py
@@ -23,6 +23,7 @@ import json
 import shutil
 import tempfile
 import urllib.request
+import warnings
 from typing import TYPE_CHECKING, Any, Sequence
 from urllib.parse import urlsplit
 
@@ -82,6 +83,11 @@ class 
GoogleDisplayVideo360CreateReportOperator(BaseOperator):
     ) -> None:
         super().__init__(**kwargs)
         self.body = body
+
+        warnings.warn(
+            "This operator is deprecated. Please use 
`GoogleDisplayVideo360CreateQueryOperator`",
+            DeprecationWarning,
+        )
         self.api_version = api_version
         self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
@@ -108,6 +114,79 @@ class 
GoogleDisplayVideo360CreateReportOperator(BaseOperator):
         return response
 
 
+class GoogleDisplayVideo360CreateQueryOperator(BaseOperator):
+    """
+    Creates a query.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        ``GoogleDisplayVideo360CreateQueryOperator``
+
+    .. seealso::
+        Check also the official API docs:
+        `https://developers.google.com/bid-manager/v2/queries/create`
+
+    :param body: Report object passed to the request's body as described here:
+        https://developers.google.com/bid-manager/v2/queries#Query
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "body",
+        "impersonation_chain",
+    )
+    template_ext: Sequence[str] = (".json",)
+
+    def __init__(
+        self,
+        *,
+        body: dict[str, Any],
+        api_version: str = "v2",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.body = body
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def prepare_template(self) -> None:
+        # If .json is passed then we have to read the file
+        if isinstance(self.body, str) and self.body.endswith(".json"):
+            with open(self.body) as file:
+                self.body = json.load(file)
+
+    def execute(self, context: Context) -> dict:
+        hook = GoogleDisplayVideo360Hook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Creating Display & Video 360 query.")
+        response = hook.create_query(query=self.body)
+        query_id = response["queryId"]
+        self.xcom_push(context, key="query_id", value=query_id)
+        self.log.info("Created query with ID: %s", query_id)
+        return response
+
+
 class GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
     """
     Deletes a stored query as well as the associated stored reports.
@@ -118,7 +197,7 @@ class 
GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
 
     .. seealso::
         Check also the official API docs:
-        `https://developers.google.com/bid-manager/v1/queries/deletequery`
+        `https://developers.google.com/bid-manager/v2/queries/delete`
 
     :param report_id: Report ID to delete.
     :param report_name: Name of the report to delete.
@@ -147,7 +226,7 @@ class 
GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
         *,
         report_id: str | None = None,
         report_name: str | None = None,
-        api_version: str = "v1",
+        api_version: str = "v2",
         gcp_conn_id: str = "google_cloud_default",
         delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
@@ -156,6 +235,11 @@ class 
GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
         super().__init__(**kwargs)
         self.report_id = report_id
         self.report_name = report_name
+        if api_version in ["v1", "v1.1"]:
+            warnings.warn(
+                f"API {api_version} is deprecated and shortly will be removed 
please use v2",
+                DeprecationWarning,
+            )
         self.api_version = api_version
         self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
@@ -242,6 +326,10 @@ class 
GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
+        warnings.warn(
+            "This operator is deprecated. Please use 
`GoogleDisplayVideo360DownloadReportV2Operator`",
+            DeprecationWarning,
+        )
         self.report_id = report_id
         self.chunk_size = chunk_size
         self.gzip = gzip
@@ -310,6 +398,131 @@ class 
GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
         self.xcom_push(context, key="report_name", value=report_name)
 
 
+class GoogleDisplayVideo360DownloadReportV2Operator(BaseOperator):
+    """
+    Retrieves a stored query.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleDisplayVideo360DownloadReportV2Operator`
+
+    .. seealso::
+        Check also the official API docs:
+        `https://developers.google.com/bid-manager/v2/queries/get`
+
+    :param report_id: Report ID to retrieve.
+    :param bucket_name: The bucket to upload to.
+    :param report_name: The report name to set when uploading the local file.
+    :param chunk_size: File will be downloaded in chunks of this many bytes.
+    :param gzip: Option to compress local file or file data for upload
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "query_id",
+        "report_id",
+        "bucket_name",
+        "report_name",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        query_id: str,
+        report_id: str,
+        bucket_name: str,
+        report_name: str | None = None,
+        gzip: bool = True,
+        chunk_size: int = 10 * 1024 * 1024,
+        api_version: str = "v2",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.query_id = query_id
+        self.report_id = report_id
+        self.chunk_size = chunk_size
+        self.gzip = gzip
+        self.bucket_name = bucket_name
+        self.report_name = report_name
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def _resolve_file_name(self, name: str) -> str:
+        new_name = name if name.endswith(".csv") else f"{name}.csv"
+        new_name = f"{new_name}.gz" if self.gzip else new_name
+        return new_name
+
+    @staticmethod
+    def _set_bucket_name(name: str) -> str:
+        bucket = name if not name.startswith("gs://") else name[5:]
+        return bucket.strip("/")
+
+    def execute(self, context: Context):
+        hook = GoogleDisplayVideo360Hook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        gcs_hook = GCSHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        resource = hook.get_report(query_id=self.query_id, 
report_id=self.report_id)
+        status = resource.get("metadata", {}).get("status", {}).get("state")
+        if resource and status not in ["DONE", "FAILED"]:
+            raise AirflowException(f"Report {self.report_id} for query 
{self.query_id} is still running")
+
+        # If no custom report_name provided, use DV360 name
+        file_url = resource["metadata"]["googleCloudStoragePath"]
+        report_name = self.report_name or 
urlsplit(file_url).path.split("/")[-1]
+        report_name = self._resolve_file_name(report_name)
+
+        # Download the report
+        self.log.info("Starting downloading report %s", self.report_id)
+        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
+            with urllib.request.urlopen(file_url) as response:
+                shutil.copyfileobj(response, temp_file, length=self.chunk_size)
+
+            temp_file.flush()
+            # Upload the local file to bucket
+            bucket_name = self._set_bucket_name(self.bucket_name)
+            gcs_hook.upload(
+                bucket_name=bucket_name,
+                object_name=report_name,
+                gzip=self.gzip,
+                filename=temp_file.name,
+                mime_type="text/csv",
+            )
+        self.log.info(
+            "Report %s was saved in bucket %s as %s.",
+            self.report_id,
+            self.bucket_name,
+            report_name,
+        )
+        self.xcom_push(context, key="report_name", value=report_name)
+
+
 class GoogleDisplayVideo360RunReportOperator(BaseOperator):
     """
     Runs a stored query to generate a report.
@@ -359,6 +572,10 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):
     ) -> None:
         super().__init__(**kwargs)
         self.report_id = report_id
+        warnings.warn(
+            "This operator is deprecated. Please use 
`GoogleDisplayVideo360RunQueryOperator`",
+            DeprecationWarning,
+        )
         self.api_version = api_version
         self.gcp_conn_id = gcp_conn_id
         self.delegate_to = delegate_to
@@ -380,6 +597,79 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):
         hook.run_query(query_id=self.report_id, params=self.parameters)
 
 
+class GoogleDisplayVideo360RunQueryOperator(BaseOperator):
+    """
+    Runs a stored query to generate a report.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleDisplayVideo360RunQueryOperator`
+
+    .. seealso::
+        Check also the official API docs:
+        `https://developers.google.com/bid-manager/v2/queries/run`
+
+    :param report_id: Report ID to run.
+    :param parameters: Parameters for running a report as described here:
+        https://developers.google.com/bid-manager/v2/queries/run
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "query_id",
+        "parameters",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        query_id: str,
+        parameters: dict[str, Any] | None = None,
+        api_version: str = "v2",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.query_id = query_id
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.parameters = parameters
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context) -> dict:
+        hook = GoogleDisplayVideo360Hook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info(
+            "Running query %s with the following parameters:\n %s",
+            self.query_id,
+            self.parameters,
+        )
+        response = hook.run_query(query_id=self.query_id, 
params=self.parameters)
+        self.xcom_push(context, key="query_id", 
value=response["key"]["queryId"])
+        self.xcom_push(context, key="report_id", 
value=response["key"]["reportId"])
+        return response
+
+
 class GoogleDisplayVideo360DownloadLineItemsOperator(BaseOperator):
     """
     Retrieves line items in CSV format.
diff --git 
a/airflow/providers/google/marketing_platform/sensors/display_video.py 
b/airflow/providers/google/marketing_platform/sensors/display_video.py
index 5605e3cbd4..d4d10ba955 100644
--- a/airflow/providers/google/marketing_platform/sensors/display_video.py
+++ b/airflow/providers/google/marketing_platform/sensors/display_video.py
@@ -17,6 +17,7 @@
 """Sensor for detecting the completion of DV360 reports."""
 from __future__ import annotations
 
+import warnings
 from typing import TYPE_CHECKING, Sequence
 
 from airflow import AirflowException
@@ -67,7 +68,10 @@ class GoogleDisplayVideo360ReportSensor(BaseSensorOperator):
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
-
+        warnings.warn(
+            "This operator is deprecated. Please use 
`GoogleDisplayVideo360RunQuerySensor`",
+            DeprecationWarning,
+        )
         self.report_id = report_id
         self.api_version = api_version
         self.gcp_conn_id = gcp_conn_id
@@ -153,3 +157,69 @@ class 
GoogleDisplayVideo360GetSDFDownloadOperationSensor(BaseSensorOperator):
         if operation and operation.get("done"):
             return True
         return False
+
+
+class GoogleDisplayVideo360RunQuerySensor(BaseSensorOperator):
+    """
+    Sensor for detecting the completion of DV360 reports for API v2.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GoogleDisplayVideo360RunQuerySensor`
+
+    :param query_id: Query ID for which report was generated
+    :param report_id: Report ID for which you want to wait
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "query_id",
+        "report_id",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        query_id: str,
+        report_id: str,
+        api_version: str = "v2",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.query_id = query_id
+        self.report_id = report_id
+        self.api_version = api_version
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def poke(self, context: Context) -> bool:
+        hook = GoogleDisplayVideo360Hook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        response = hook.get_report(query_id=self.query_id, 
report_id=self.report_id)
+        status = response.get("metadata", {}).get("status", {}).get("state")
+        self.log.info(f"STATUS OF THE REPORT {self.report_id} FOR QUERY 
{self.query_id}: {status}")
+        if response and status in ["DONE", "FAILED"]:
+            return True
+        return False
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index 283daf2055..6b3006c8c0 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -30,7 +30,7 @@ description: |
 
 suspended: false
 versions:
-  - 8.12.1
+  - 9.0.0
   - 8.12.0
   - 8.11.0
   - 8.10.0
diff --git 
a/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst
 
b/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst
index 60cd3db322..c0e225f86e 100644
--- 
a/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst
+++ 
b/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst
@@ -30,6 +30,8 @@ Prerequisite Tasks
 Creating a report
 ^^^^^^^^^^^^^^^^^
 
+This Operator is Deprecated and will be removed soon. Please look at 
``GoogleDisplayVideo360CreateQueryOperator``.
+
 To create Display&Video 360 report use
 
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateReportOperator`.
 
@@ -47,6 +49,45 @@ The result is saved to :ref:`XCom <concepts:xcom>`, which 
allows the result to b
 
 .. _howto/operator:GoogleDisplayVideo360DeleteReportOperator:
 
+
+Creating a Query
+^^^^^^^^^^^^^^^^
+
+To create Display&Video 360 query use
+:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateQueryOperator`.
+
+.. exampleinclude:: 
/../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_display_video_create_query_operator]
+    :end-before: [END howto_google_display_video_create_query_operator]
+
+Use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateQueryOperator`
+parameters which allow you to dynamically determine values. You can provide 
body definition using ``
+.json`` file as this operator supports this template extension.
+The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to 
be used by other operators.
+
+.. _howto/operator:GoogleDisplayVideo360RunQueryOperator:
+
+Run Query
+^^^^^^^^^
+
+:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360RunQueryOperator`.
+
+.. exampleinclude:: 
/../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_display_video_run_query_report_operator]
+    :end-before: [END howto_google_display_video_run_query_report_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360RunQueryOperator`
+parameters which allow you to dynamically determine values.
+The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to 
be used by other operators.
+
+.. _howto/operator:GoogleDisplayVideo360RunQuerySensor:
+
 Deleting a report
 ^^^^^^^^^^^^^^^^^
 
@@ -68,6 +109,9 @@ parameters which allow you to dynamically determine values.
 Waiting for report
 ^^^^^^^^^^^^^^^^^^
 
+This Operator is Deprecated and will be removed soon. Please look:
+.. _howto/operators:GoogleDisplayVideo360RunQuerySensor
+
 To wait for the report use
 
:class:`~airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360ReportSensor`.
 
@@ -83,9 +127,30 @@ parameters which allow you to dynamically determine values.
 
 .. _howto/operator:GoogleDisplayVideo360DownloadReportOperator:
 
+Waiting for query
+^^^^^^^^^^^^^^^^^
+
+To wait for the report use
+:class:`~airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360RunQuerySensor`.
+
+.. exampleinclude:: 
/../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_google_display_video_wait_run_query_sensor]
+    :end-before: [END howto_google_display_video_wait_run_query_sensor]
+
+Use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360RunQuerySensor`
+parameters which allow you to dynamically determine values.
+
+.. _howto/operator:GoogleDisplayVideo360DownloadReportV2Operator:
+
 Downloading a report
 ^^^^^^^^^^^^^^^^^^^^
 
+This Operator is Deprecated and will be removed soon. Please look:
+.. _howto/operators:GoogleDisplayVideo360DownloadReportV2Operator
+
 To download a report to GCS bucket use
 
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DownloadReportOperator`.
 
@@ -105,6 +170,9 @@ parameters which allow you to dynamically determine values.
 Running a report
 ^^^^^^^^^^^^^^^^
 
+This Operator is Deprecated and will be removed soon. Please look:
+.. _howto/operators:GoogleDisplayVideo360RunQueryOperator
+
 To run Display&Video 360 report use
 
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360RunReportOperator`.
 
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index d603114eea..d7074f4cfd 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -300,6 +300,7 @@ cpu
 cpus
 crd
 createDisposition
+CreateQueryOperator
 creationTimestamp
 credssp
 Cron
@@ -443,6 +444,7 @@ discoverability
 discoverable
 diskTypes
 displayName
+DisplayVideo
 distcp
 distro
 distros
@@ -469,6 +471,7 @@ dogstatsd
 donot
 Dont
 DOS'ing
+DownloadReportV
 downscaling
 Drillbit
 Drivy
@@ -638,6 +641,7 @@ golang
 googleads
 GoogleAdsRow
 googleapiclient
+GoogleDisplayVideo
 gpu
 gpus
 greenlet
@@ -1252,6 +1256,8 @@ rtype
 ru
 runAsUser
 runnable
+RunQueryOperator
+RunQuerySensor
 runspace
 RunSubmitTaskSettings
 runtime
diff --git a/scripts/in_container/verify_providers.py 
b/scripts/in_container/verify_providers.py
index dd09e2f7c2..582cd56d2c 100755
--- a/scripts/in_container/verify_providers.py
+++ b/scripts/in_container/verify_providers.py
@@ -152,6 +152,10 @@ KNOWN_COMMON_DEPRECATED_MESSAGES: set[str] = {
     "This module is deprecated. Please use 
`airflow.providers.cncf.kubernetes.operators.pod` instead.",
     "urllib3 (1.26.6) or chardet (5.1.0)/charset_normalizer (2.0.12) doesn't 
match a supported version!",
     "urllib3 (1.26.9) or chardet (5.1.0)/charset_normalizer (2.0.12) doesn't 
match a supported version!",
+    "This operator is deprecated. Please use 
`GoogleDisplayVideo360CreateQueryOperator`",
+    "This operator is deprecated. Please use 
`GoogleDisplayVideo360RunQueryOperator`",
+    "This operator is deprecated. Please use 
`GoogleDisplayVideo360RunQuerySensor`",
+    "This operator is deprecated. Please use 
`GoogleDisplayVideo360DownloadReportV2Operator`",
 }
 
 # The set of warning messages generated by direct importing of some deprecated 
modules. We should only
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index d0aa829f38..ec26f526a8 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -266,6 +266,10 @@ class 
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
         
"airflow.providers.google.cloud.operators.bigquery.BigQueryPatchDatasetOperator",
         
"airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator",
         
"airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator",
+        
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360CreateQueryOperator",
+        
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360RunQueryOperator",
+        
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360DownloadReportV2Operator",
+        
"airflow.providers.google.marketing_platform.sensors.GoogleDisplayVideo360RunQuerySensor",
     }
 
     BASE_CLASSES = {
diff --git 
a/tests/providers/google/marketing_platform/hooks/test_display_video.py 
b/tests/providers/google/marketing_platform/hooks/test_display_video.py
index f179dd436f..55a3c91308 100644
--- a/tests/providers/google/marketing_platform/hooks/test_display_video.py
+++ b/tests/providers/google/marketing_platform/hooks/test_display_video.py
@@ -32,7 +32,7 @@ class TestGoogleDisplayVideo360Hook:
             
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
             new=mock_base_gcp_hook_default_project_id,
         ):
-            self.hook = GoogleDisplayVideo360Hook(gcp_conn_id=GCP_CONN_ID)
+            self.hook = GoogleDisplayVideo360Hook(api_version=API_VERSION, 
gcp_conn_id=GCP_CONN_ID)
 
     @mock.patch(
         "airflow.providers.google.marketing_platform.hooks."
@@ -371,3 +371,141 @@ class TestGoogleDisplayVideo360Hook:
         
get_conn_to_display_video.return_value.media.return_value.download_media.assert_called_once_with(
             resourceName=resource_name
         )
+
+
+class TestGoogleDisplayVideo360v2Hook:
+    def setup_method(self):
+        with mock.patch(
+            
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
+            new=mock_base_gcp_hook_default_project_id,
+        ):
+            self.api_version = "v2"
+            self.hook = 
GoogleDisplayVideo360Hook(api_version=self.api_version, gcp_conn_id=GCP_CONN_ID)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook._authorize"
+    )
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.display_video.build")
+    def test_gen_conn(self, mock_build, mock_authorize):
+        result = self.hook.get_conn()
+        mock_build.assert_called_once_with(
+            "doubleclickbidmanager",
+            self.api_version,
+            http=mock_authorize.return_value,
+            cache_discovery=False,
+        )
+        assert mock_build.return_value == result
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook._authorize"
+    )
+    
@mock.patch("airflow.providers.google.marketing_platform.hooks.display_video.build")
+    def test_get_conn_to_display_video(self, mock_build, mock_authorize):
+        result = self.hook.get_conn_to_display_video()
+        mock_build.assert_called_once_with(
+            "displayvideo",
+            self.api_version,
+            http=mock_authorize.return_value,
+            cache_discovery=False,
+        )
+        assert mock_build.return_value == result
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn"
+    )
+    def test_create_query(self, get_conn_mock):
+        body = {"body": "test"}
+
+        return_value = "TEST"
+        
get_conn_mock.return_value.queries.return_value.create.return_value.execute.return_value
 = (
+            return_value
+        )
+
+        result = self.hook.create_query(query=body)
+
+        
get_conn_mock.return_value.queries.return_value.create.assert_called_once_with(body=body)
+
+        assert return_value == result
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn"
+    )
+    def test_delete_query(self, get_conn_mock):
+        query_id = "QUERY_ID"
+
+        return_value = "TEST"
+        
get_conn_mock.return_value.queries.return_value.delete.return_value.execute.return_value
 = (
+            return_value
+        )
+
+        self.hook.delete_query(query_id=query_id)
+
+        
get_conn_mock.return_value.queries.return_value.delete.assert_called_once_with(queryId=query_id)
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn"
+    )
+    def test_get_query(self, get_conn_mock):
+        query_id = "QUERY_ID"
+
+        return_value = "TEST"
+        
get_conn_mock.return_value.queries.return_value.get.return_value.execute.return_value
 = return_value
+
+        result = self.hook.get_query(query_id=query_id)
+
+        
get_conn_mock.return_value.queries.return_value.get.assert_called_once_with(queryId=query_id)
+
+        assert return_value == result
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn"
+    )
+    def test_list_queries(self, get_conn_mock):
+        queries = ["test"]
+        return_value = {"queries": queries}
+        
get_conn_mock.return_value.queries.return_value.list.return_value.execute.return_value
 = return_value
+
+        result = self.hook.list_queries()
+
+        
get_conn_mock.return_value.queries.return_value.list.assert_called_once_with()
+
+        assert queries == result
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn"
+    )
+    def test_run_query(self, get_conn_mock):
+        query_id = "QUERY_ID"
+        params = {"params": "test"}
+
+        self.hook.run_query(query_id=query_id, params=params)
+
+        
get_conn_mock.return_value.queries.return_value.run.assert_called_once_with(
+            queryId=query_id, body=params
+        )
+
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.hooks."
+        "display_video.GoogleDisplayVideo360Hook.get_conn"
+    )
+    def test_get_report(self, get_conn_mock):
+        query_id = "QUERY_ID"
+        report_id = "REPORT_ID"
+
+        return_value = "TEST"
+        
get_conn_mock.return_value.queries.return_value.reports.return_value.get.return_value.execute.return_value
 = (  # noqa
+            return_value
+        )
+
+        self.hook.get_report(query_id=query_id, report_id=report_id)
+
+        
get_conn_mock.return_value.queries.return_value.reports.return_value.get.assert_called_once_with(
+            queryId=query_id, reportId=report_id
+        )
diff --git 
a/tests/providers/google/marketing_platform/operators/test_display_video.py 
b/tests/providers/google/marketing_platform/operators/test_display_video.py
index bf9552485c..d55a9a729a 100644
--- a/tests/providers/google/marketing_platform/operators/test_display_video.py
+++ b/tests/providers/google/marketing_platform/operators/test_display_video.py
@@ -25,11 +25,14 @@ import pytest
 
 from airflow.models import DAG, TaskInstance as TI
 from airflow.providers.google.marketing_platform.operators.display_video 
import (
+    GoogleDisplayVideo360CreateQueryOperator,
     GoogleDisplayVideo360CreateReportOperator,
     GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
     GoogleDisplayVideo360DeleteReportOperator,
     GoogleDisplayVideo360DownloadLineItemsOperator,
     GoogleDisplayVideo360DownloadReportOperator,
+    GoogleDisplayVideo360DownloadReportV2Operator,
+    GoogleDisplayVideo360RunQueryOperator,
     GoogleDisplayVideo360RunReportOperator,
     GoogleDisplayVideo360SDFtoGCSOperator,
     GoogleDisplayVideo360UploadLineItemsOperator,
@@ -37,7 +40,7 @@ from 
airflow.providers.google.marketing_platform.operators.display_video import
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 
-API_VERSION = "api_version"
+API_VERSION = "v2"
 GCP_CONN_ID = "google_cloud_default"
 DELEGATE_TO: str | None = None
 IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
@@ -235,6 +238,132 @@ class TestGoogleDisplayVideo360DownloadReportOperator:
         )
 
 
+class TestGoogleDisplayVideo360DownloadReportV2Operator:
+    def setup_method(self):
+        with create_session() as session:
+            session.query(TI).delete()
+
+    def teardown_method(self):
+        with create_session() as session:
+            session.query(TI).delete()
+
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.shutil")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.urllib.request")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.tempfile")
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.operators."
+        "display_video.GoogleDisplayVideo360DownloadReportV2Operator.xcom_push"
+    )
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.GCSHook")
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
+    )
+    def test_execute(
+        self,
+        mock_hook,
+        mock_gcs_hook,
+        mock_xcom,
+        mock_temp,
+        mock_request,
+        mock_shutil,
+    ):
+        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name 
= FILENAME
+        mock_hook.return_value.get_report.return_value = {
+            "metadata": {
+                "status": {"state": "DONE"},
+                "googleCloudStoragePath": "TEST",
+            }
+        }
+        op = GoogleDisplayVideo360DownloadReportV2Operator(
+            query_id=QUERY_ID,
+            report_id=REPORT_ID,
+            bucket_name=BUCKET_NAME,
+            report_name=REPORT_NAME,
+            task_id="test_task",
+        )
+        op.execute(context=None)
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=None,
+            api_version="v2",
+            impersonation_chain=None,
+        )
+        
mock_hook.return_value.get_report.assert_called_once_with(report_id=REPORT_ID, 
query_id=QUERY_ID)
+
+        mock_gcs_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=None,
+            impersonation_chain=None,
+        )
+        mock_gcs_hook.return_value.upload.assert_called_once_with(
+            bucket_name=BUCKET_NAME,
+            filename=FILENAME,
+            gzip=True,
+            mime_type="text/csv",
+            object_name=REPORT_NAME + ".gz",
+        )
+        mock_xcom.assert_called_once_with(None, key="report_name", 
value=REPORT_NAME + ".gz")
+
+    @pytest.mark.parametrize(
+        "test_bucket_name",
+        [BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{ 
ti.xcom_pull(task_ids='f') }}"],
+    )
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.shutil")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.urllib.request")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.tempfile")
+    
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.GCSHook")
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
+    )
+    def test_set_bucket_name(
+        self,
+        mock_hook,
+        mock_gcs_hook,
+        mock_temp,
+        mock_request,
+        mock_shutil,
+        test_bucket_name,
+    ):
+        mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name 
= FILENAME
+        mock_hook.return_value.get_report.return_value = {
+            "metadata": {"status": {"state": "DONE"}, 
"googleCloudStoragePath": "TEST"}
+        }
+
+        dag = DAG(
+            dag_id="test_set_bucket_name",
+            start_date=DEFAULT_DATE,
+            schedule=None,
+            catchup=False,
+        )
+
+        if BUCKET_NAME not in test_bucket_name:
+
+            @dag.task
+            def f():
+                return BUCKET_NAME
+
+            taskflow_op = f()
+            taskflow_op.operator.run(start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE)
+
+        op = GoogleDisplayVideo360DownloadReportV2Operator(
+            query_id=QUERY_ID,
+            report_id=REPORT_ID,
+            bucket_name=test_bucket_name if test_bucket_name != "XComArg" else 
taskflow_op,
+            report_name=REPORT_NAME,
+            task_id="test_task",
+            dag=dag,
+        )
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        mock_gcs_hook.return_value.upload.assert_called_once_with(
+            bucket_name=BUCKET_NAME,
+            filename=FILENAME,
+            gzip=True,
+            mime_type="text/csv",
+            object_name=REPORT_NAME + ".gz",
+        )
+
+
 class TestGoogleDisplayVideo360RunReportOperator:
     @mock.patch(
         
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
@@ -257,6 +386,41 @@ class TestGoogleDisplayVideo360RunReportOperator:
         
hook_mock.return_value.run_query.assert_called_once_with(query_id=REPORT_ID, 
params=parameters)
 
 
+class TestGoogleDisplayVideo360RunQueryOperator:
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.operators."
+        "display_video.GoogleDisplayVideo360RunQueryOperator.xcom_push"
+    )
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
+    )
+    def test_execute(self, hook_mock, mock_xcom):
+        parameters = {"param": "test"}
+        hook_mock.return_value.run_query.return_value = {
+            "key": {
+                "queryId": QUERY_ID,
+                "reportId": REPORT_ID,
+            }
+        }
+        op = GoogleDisplayVideo360RunQueryOperator(
+            query_id=QUERY_ID,
+            parameters=parameters,
+            api_version=API_VERSION,
+            task_id="test_task",
+        )
+        op.execute(context=None)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=None,
+            api_version=API_VERSION,
+            impersonation_chain=None,
+        )
+
+        mock_xcom.assert_any_call(None, key="query_id", value=QUERY_ID)
+        mock_xcom.assert_any_call(None, key="report_id", value=REPORT_ID)
+        
hook_mock.return_value.run_query.assert_called_once_with(query_id=QUERY_ID, 
params=parameters)
+
+
 class TestGoogleDisplayVideo360DownloadLineItemsOperator:
     @mock.patch(
         
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
@@ -456,3 +620,37 @@ class 
TestGoogleDisplayVideo360CreateSDFDownloadTaskOperator:
             body_request=body_request
         )
         xcom_mock.assert_called_once_with(None, key="name", value=test_name)
+
+
+class TestGoogleDisplayVideo360CreateQueryOperator:
+    @mock.patch(
+        "airflow.providers.google.marketing_platform.operators."
+        "display_video.GoogleDisplayVideo360CreateQueryOperator.xcom_push"
+    )
+    @mock.patch(
+        
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
+    )
+    def test_execute(self, hook_mock, xcom_mock):
+        body = {"body": "test"}
+        hook_mock.return_value.create_query.return_value = {"queryId": 
QUERY_ID}
+        op = GoogleDisplayVideo360CreateQueryOperator(body=body, 
task_id="test_task")
+        op.execute(context=None)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=None,
+            api_version="v2",
+            impersonation_chain=None,
+        )
+        hook_mock.return_value.create_query.assert_called_once_with(query=body)
+        xcom_mock.assert_called_once_with(None, key="query_id", value=QUERY_ID)
+
+    def test_prepare_template(self):
+        body = {"key": "value"}
+        with NamedTemporaryFile("w+", suffix=".json") as f:
+            f.write(json.dumps(body))
+            f.flush()
+            op = GoogleDisplayVideo360CreateQueryOperator(body=body, 
task_id="test_task")
+            op.prepare_template()
+
+        assert isinstance(op.body, dict)
+        assert op.body == body
diff --git 
a/tests/providers/google/marketing_platform/sensors/test_display_video.py 
b/tests/providers/google/marketing_platform/sensors/test_display_video.py
index fb7a7c9091..193ba18e6b 100644
--- a/tests/providers/google/marketing_platform/sensors/test_display_video.py
+++ b/tests/providers/google/marketing_platform/sensors/test_display_video.py
@@ -22,6 +22,7 @@ from unittest import mock
 from airflow.providers.google.marketing_platform.sensors.display_video import (
     GoogleDisplayVideo360GetSDFDownloadOperationSensor,
     GoogleDisplayVideo360ReportSensor,
+    GoogleDisplayVideo360RunQuerySensor,
 )
 
 API_VERSION = "api_version"
@@ -46,6 +47,23 @@ class TestGoogleDisplayVideo360ReportSensor:
         
hook_mock.return_value.get_query.assert_called_once_with(query_id=report_id)
 
 
+class TestGoogleDisplayVideo360RunQuerySensor:
+    
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360Hook")
+    
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.BaseSensorOperator")
+    def test_poke(self, mock_base_op, hook_mock):
+        query_id = "QUERY_ID"
+        report_id = "REPORT_ID"
+        op = GoogleDisplayVideo360RunQuerySensor(query_id=query_id, 
report_id=report_id, task_id="test_task")
+        op.poke(context=None)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=None,
+            api_version="v2",
+            impersonation_chain=None,
+        )
+        
hook_mock.return_value.get_report.assert_called_once_with(query_id=query_id, 
report_id=report_id)
+
+
 class TestGoogleDisplayVideo360Sensor:
     
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360Hook")
     
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.BaseSensorOperator")

Reply via email to