This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 71db47a73d Update DV360 operators to use API v2 (#30326)
71db47a73d is described below
commit 71db47a73d741015d8ffeaa2276635f19d51f8e7
Author: Ćukasz Wyszomirski <[email protected]>
AuthorDate: Sun Apr 9 12:15:42 2023 +0200
Update DV360 operators to use API v2 (#30326)
* Update DV360 operators to use API v2
* Update display_video.rst
* fixup! Update display_video.rst
* fixup! Update display_video.rst
---------
Co-authored-by: Jarek Potiuk <[email protected]>
---
airflow/providers/google/CHANGELOG.rst | 10 +-
.../example_dags/example_display_video.py | 69 ++++-
.../marketing_platform/hooks/display_video.py | 59 ++++-
.../marketing_platform/operators/display_video.py | 294 ++++++++++++++++++++-
.../marketing_platform/sensors/display_video.py | 72 ++++-
airflow/providers/google/provider.yaml | 2 +-
.../operators/marketing_platform/display_video.rst | 68 +++++
docs/spelling_wordlist.txt | 6 +
scripts/in_container/verify_providers.py | 4 +
tests/always/test_project_structure.py | 4 +
.../marketing_platform/hooks/test_display_video.py | 140 +++++++++-
.../operators/test_display_video.py | 200 +++++++++++++-
.../sensors/test_display_video.py | 18 ++
13 files changed, 928 insertions(+), 18 deletions(-)
diff --git a/airflow/providers/google/CHANGELOG.rst
b/airflow/providers/google/CHANGELOG.rst
index 599958ab78..eebf2b2b7d 100644
--- a/airflow/providers/google/CHANGELOG.rst
+++ b/airflow/providers/google/CHANGELOG.rst
@@ -23,8 +23,14 @@
Changelog
---------
-8.12.1
-......
+9.0.0
+.....
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+Google announced sunset of Bid manager API v1 and v1.1 by April 27, 2023 for
more information
+please check: `docs <https://developers.google.com/bid-manager/v1.1>`_ As a
result default value of api_version in GoogleDisplayVideo360Hook and related
operators updated to v2
This version of provider contains a temporary workaround to issue with ``v11``
version of
google-ads API being discontinued, while the google provider dependencies
preventing installing
diff --git
a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
b/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
index dc9325e95e..7e6c3c38ac 100644
---
a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+++
b/airflow/providers/google/marketing_platform/example_dags/example_display_video.py
@@ -29,11 +29,14 @@ from airflow.models.xcom_arg import XComArg
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import
GCSToBigQueryOperator
from airflow.providers.google.marketing_platform.hooks.display_video import
GoogleDisplayVideo360Hook
from airflow.providers.google.marketing_platform.operators.display_video
import (
+ GoogleDisplayVideo360CreateQueryOperator,
GoogleDisplayVideo360CreateReportOperator,
GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadLineItemsOperator,
GoogleDisplayVideo360DownloadReportOperator,
+ GoogleDisplayVideo360DownloadReportV2Operator,
+ GoogleDisplayVideo360RunQueryOperator,
GoogleDisplayVideo360RunReportOperator,
GoogleDisplayVideo360SDFtoGCSOperator,
GoogleDisplayVideo360UploadLineItemsOperator,
@@ -41,6 +44,7 @@ from
airflow.providers.google.marketing_platform.operators.display_video import
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360GetSDFDownloadOperationSensor,
GoogleDisplayVideo360ReportSensor,
+ GoogleDisplayVideo360RunQuerySensor,
)
# [START howto_display_video_env_variables]
@@ -50,7 +54,7 @@ OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME",
"files/report.csv")
PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE",
"test-gcs-example.txt")
PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE",
"test-gcs-example-download.txt")
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
-SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1")
+SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_5")
BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")
GMP_PARTNER_ID = os.environ.get("GMP_PARTNER_ID", 123)
ENTITY_TYPE = os.environ.get("GMP_ENTITY_TYPE", "LineItem")
@@ -74,7 +78,25 @@ REPORT = {
"schedule": {"frequency": "ONE_TIME"},
}
-PARAMETERS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
+REPORT_V2 = {
+ "metadata": {
+ "title": "Airflow Test Report",
+ "dataRange": {"range": "LAST_7_DAYS"},
+ "format": "CSV",
+ "sendNotification": False,
+ },
+ "params": {
+ "type": "STANDARD",
+ "groupBys": ["FILTER_DATE", "FILTER_PARTNER"],
+ "filters": [{"type": "FILTER_PARTNER", "value": ADVERTISER_ID}],
+ "metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"],
+ },
+ "schedule": {"frequency": "ONE_TIME"},
+}
+
+PARAMETERS = {
+ "dataRange": {"range": "LAST_7_DAYS"},
+}
CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST: dict = {
"version": SDF_VERSION,
@@ -209,3 +231,46 @@ with models.DAG(
# Task dependency created via `XComArgs`:
# save_sdf_in_gcs >> upload_sdf_to_big_query
+
+with models.DAG(
+ "example_display_video_v2",
+ start_date=START_DATE,
+ catchup=False,
+) as dag:
+ # [START howto_google_display_video_create_query_operator]
+ create_query_v2 = GoogleDisplayVideo360CreateQueryOperator(body=REPORT_V2,
task_id="create_query")
+
+ query_id = cast(str, XComArg(create_query_v2, key="query_id"))
+ # [END howto_google_display_video_create_query_operator]
+
+ # [START howto_google_display_video_run_query_report_operator]
+ run_query_v2 = GoogleDisplayVideo360RunQueryOperator(
+ query_id=query_id, parameters=PARAMETERS, task_id="run_report"
+ )
+
+ query_id = cast(str, XComArg(run_query_v2, key="query_id"))
+ report_id = cast(str, XComArg(run_query_v2, key="report_id"))
+ # [END howto_google_display_video_run_query_report_operator]
+
+ # [START howto_google_display_video_wait_run_query_sensor]
+ wait_for_query = GoogleDisplayVideo360RunQuerySensor(
+ task_id="wait_for_query",
+ query_id=query_id,
+ report_id=report_id,
+ )
+ # [END howto_google_display_video_wait_run_query_sensor]
+
+ # [START howto_google_display_video_get_report_operator]
+ get_report_v2 = GoogleDisplayVideo360DownloadReportV2Operator(
+ query_id=query_id,
+ report_id=report_id,
+ task_id="get_report",
+ bucket_name=BUCKET,
+ report_name="test1.csv",
+ )
+ # # [END howto_google_display_video_get_report_operator]
+ # # [START howto_google_display_video_delete_query_report_operator]
+ delete_report_v2 =
GoogleDisplayVideo360DeleteReportOperator(report_id=report_id,
task_id="delete_report")
+ # # [END howto_google_display_video_delete_query_report_operator]
+
+ create_query_v2 >> run_query_v2 >> wait_for_query >> get_report_v2 >>
delete_report_v2
diff --git a/airflow/providers/google/marketing_platform/hooks/display_video.py
b/airflow/providers/google/marketing_platform/hooks/display_video.py
index 05bba7a8ce..9422426fbe 100644
--- a/airflow/providers/google/marketing_platform/hooks/display_video.py
+++ b/airflow/providers/google/marketing_platform/hooks/display_video.py
@@ -18,6 +18,7 @@
"""This module contains Google DisplayVideo hook."""
from __future__ import annotations
+import warnings
from typing import Any, Sequence
from googleapiclient.discovery import Resource, build
@@ -32,7 +33,7 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
def __init__(
self,
- api_version: str = "v1",
+ api_version: str = "v2",
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
@@ -42,6 +43,11 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
+ if api_version in ["v1", "v1.1"]:
+ warnings.warn(
+ f"API {api_version} is deprecated and shortly will be removed
please use v2",
+ DeprecationWarning,
+ )
self.api_version = api_version
def get_conn(self) -> Resource:
@@ -93,7 +99,10 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
:param query: Query object to be passed to request body.
"""
- response =
self.get_conn().queries().createquery(body=query).execute(num_retries=self.num_retries)
+ if self.api_version in ["v1", "v1.1"]:
+ response =
self.get_conn().queries().createquery(body=query).execute(num_retries=self.num_retries)
+ else:
+ response =
self.get_conn().queries().create(body=query).execute(num_retries=self.num_retries)
return response
def delete_query(self, query_id: str) -> None:
@@ -102,7 +111,10 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
:param query_id: Query ID to delete.
"""
-
(self.get_conn().queries().deletequery(queryId=query_id).execute(num_retries=self.num_retries))
+ if self.api_version in ["v1", "v1.1"]:
+
self.get_conn().queries().deletequery(queryId=query_id).execute(num_retries=self.num_retries)
+ else:
+
self.get_conn().queries().delete(queryId=query_id).execute(num_retries=self.num_retries)
def get_query(self, query_id: str) -> dict:
"""
@@ -110,25 +122,56 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook):
:param query_id: Query ID to retrieve.
"""
- response =
self.get_conn().queries().getquery(queryId=query_id).execute(num_retries=self.num_retries)
+ if self.api_version in ["v1", "v1.1"]:
+ response = (
+
self.get_conn().queries().getquery(queryId=query_id).execute(num_retries=self.num_retries)
+ )
+ else:
+ response =
self.get_conn().queries().get(queryId=query_id).execute(num_retries=self.num_retries)
return response
def list_queries(self) -> list[dict]:
"""Retrieves stored queries."""
- response =
self.get_conn().queries().listqueries().execute(num_retries=self.num_retries)
+ if self.api_version in ["v1", "v1.1"]:
+ response =
self.get_conn().queries().listqueries().execute(num_retries=self.num_retries)
+ else:
+ response =
self.get_conn().queries().list().execute(num_retries=self.num_retries)
return response.get("queries", [])
- def run_query(self, query_id: str, params: dict[str, Any] | None) -> None:
+ def run_query(self, query_id: str, params: dict[str, Any] | None) -> dict:
"""
Runs a stored query to generate a report.
:param query_id: Query ID to run.
:param params: Parameters for the report.
"""
- (
+ if self.api_version in ["v1", "v1.1"]:
+ return (
+ self.get_conn()
+ .queries()
+ .runquery(queryId=query_id, body=params)
+ .execute(num_retries=self.num_retries)
+ )
+ else:
+ return (
+ self.get_conn()
+ .queries()
+ .run(queryId=query_id, body=params)
+ .execute(num_retries=self.num_retries)
+ )
+
+ def get_report(self, query_id: str, report_id: str) -> dict:
+ """
+ Retrieves a report.
+
+ :param query_id: Query ID for which report was generated.
+ :param report_id: Report ID to retrieve.
+ """
+ return (
self.get_conn()
.queries()
- .runquery(queryId=query_id, body=params)
+ .reports()
+ .get(queryId=query_id, reportId=report_id)
.execute(num_retries=self.num_retries)
)
diff --git
a/airflow/providers/google/marketing_platform/operators/display_video.py
b/airflow/providers/google/marketing_platform/operators/display_video.py
index d33ffb46d7..738d734bad 100644
--- a/airflow/providers/google/marketing_platform/operators/display_video.py
+++ b/airflow/providers/google/marketing_platform/operators/display_video.py
@@ -23,6 +23,7 @@ import json
import shutil
import tempfile
import urllib.request
+import warnings
from typing import TYPE_CHECKING, Any, Sequence
from urllib.parse import urlsplit
@@ -82,6 +83,11 @@ class
GoogleDisplayVideo360CreateReportOperator(BaseOperator):
) -> None:
super().__init__(**kwargs)
self.body = body
+
+ warnings.warn(
+ "This operator is deprecated. Please use
`GoogleDisplayVideo360CreateQueryOperator`",
+ DeprecationWarning,
+ )
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -108,6 +114,79 @@ class
GoogleDisplayVideo360CreateReportOperator(BaseOperator):
return response
+class GoogleDisplayVideo360CreateQueryOperator(BaseOperator):
+ """
+ Creates a query.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ ``GoogleDisplayVideo360CreateQueryOperator``
+
+ .. seealso::
+ Check also the official API docs:
+ `https://developers.google.com/bid-manager/v2/queries/create`
+
+ :param body: Report object passed to the request's body as described here:
+ https://developers.google.com/bid-manager/v2/queries#Query
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
+ domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "body",
+ "impersonation_chain",
+ )
+ template_ext: Sequence[str] = (".json",)
+
+ def __init__(
+ self,
+ *,
+ body: dict[str, Any],
+ api_version: str = "v2",
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.body = body
+ self.api_version = api_version
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def prepare_template(self) -> None:
+ # If .json is passed then we have to read the file
+ if isinstance(self.body, str) and self.body.endswith(".json"):
+ with open(self.body) as file:
+ self.body = json.load(file)
+
+ def execute(self, context: Context) -> dict:
+ hook = GoogleDisplayVideo360Hook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Creating Display & Video 360 query.")
+ response = hook.create_query(query=self.body)
+ query_id = response["queryId"]
+ self.xcom_push(context, key="query_id", value=query_id)
+ self.log.info("Created query with ID: %s", query_id)
+ return response
+
+
class GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
"""
Deletes a stored query as well as the associated stored reports.
@@ -118,7 +197,7 @@ class
GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
.. seealso::
Check also the official API docs:
- `https://developers.google.com/bid-manager/v1/queries/deletequery`
+ `https://developers.google.com/bid-manager/v2/queries/delete`
:param report_id: Report ID to delete.
:param report_name: Name of the report to delete.
@@ -147,7 +226,7 @@ class
GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
*,
report_id: str | None = None,
report_name: str | None = None,
- api_version: str = "v1",
+ api_version: str = "v2",
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
@@ -156,6 +235,11 @@ class
GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
super().__init__(**kwargs)
self.report_id = report_id
self.report_name = report_name
+ if api_version in ["v1", "v1.1"]:
+ warnings.warn(
+ f"API {api_version} is deprecated and shortly will be removed
please use v2",
+ DeprecationWarning,
+ )
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -242,6 +326,10 @@ class
GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
**kwargs,
) -> None:
super().__init__(**kwargs)
+ warnings.warn(
+ "This operator is deprecated. Please use
`GoogleDisplayVideo360DownloadReportV2Operator`",
+ DeprecationWarning,
+ )
self.report_id = report_id
self.chunk_size = chunk_size
self.gzip = gzip
@@ -310,6 +398,131 @@ class
GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
self.xcom_push(context, key="report_name", value=report_name)
+class GoogleDisplayVideo360DownloadReportV2Operator(BaseOperator):
+ """
+ Retrieves a stored query.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleDisplayVideo360DownloadReportV2Operator`
+
+ .. seealso::
+ Check also the official API docs:
+ `https://developers.google.com/bid-manager/v2/queries/get`
+
+ :param report_id: Report ID to retrieve.
+ :param bucket_name: The bucket to upload to.
+ :param report_name: The report name to set when uploading the local file.
+ :param chunk_size: File will be downloaded in chunks of this many bytes.
+ :param gzip: Option to compress local file or file data for upload
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
+ domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "query_id",
+ "report_id",
+ "bucket_name",
+ "report_name",
+ "impersonation_chain",
+ )
+
+ def __init__(
+ self,
+ *,
+ query_id: str,
+ report_id: str,
+ bucket_name: str,
+ report_name: str | None = None,
+ gzip: bool = True,
+ chunk_size: int = 10 * 1024 * 1024,
+ api_version: str = "v2",
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.query_id = query_id
+ self.report_id = report_id
+ self.chunk_size = chunk_size
+ self.gzip = gzip
+ self.bucket_name = bucket_name
+ self.report_name = report_name
+ self.api_version = api_version
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def _resolve_file_name(self, name: str) -> str:
+ new_name = name if name.endswith(".csv") else f"{name}.csv"
+ new_name = f"{new_name}.gz" if self.gzip else new_name
+ return new_name
+
+ @staticmethod
+ def _set_bucket_name(name: str) -> str:
+ bucket = name if not name.startswith("gs://") else name[5:]
+ return bucket.strip("/")
+
+ def execute(self, context: Context):
+ hook = GoogleDisplayVideo360Hook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ gcs_hook = GCSHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ resource = hook.get_report(query_id=self.query_id,
report_id=self.report_id)
+ status = resource.get("metadata", {}).get("status", {}).get("state")
+ if resource and status not in ["DONE", "FAILED"]:
+ raise AirflowException(f"Report {self.report_id} for query
{self.query_id} is still running")
+
+ # If no custom report_name provided, use DV360 name
+ file_url = resource["metadata"]["googleCloudStoragePath"]
+ report_name = self.report_name or
urlsplit(file_url).path.split("/")[-1]
+ report_name = self._resolve_file_name(report_name)
+
+ # Download the report
+ self.log.info("Starting downloading report %s", self.report_id)
+ with tempfile.NamedTemporaryFile(delete=False) as temp_file:
+ with urllib.request.urlopen(file_url) as response:
+ shutil.copyfileobj(response, temp_file, length=self.chunk_size)
+
+ temp_file.flush()
+ # Upload the local file to bucket
+ bucket_name = self._set_bucket_name(self.bucket_name)
+ gcs_hook.upload(
+ bucket_name=bucket_name,
+ object_name=report_name,
+ gzip=self.gzip,
+ filename=temp_file.name,
+ mime_type="text/csv",
+ )
+ self.log.info(
+ "Report %s was saved in bucket %s as %s.",
+ self.report_id,
+ self.bucket_name,
+ report_name,
+ )
+ self.xcom_push(context, key="report_name", value=report_name)
+
+
class GoogleDisplayVideo360RunReportOperator(BaseOperator):
"""
Runs a stored query to generate a report.
@@ -359,6 +572,10 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):
) -> None:
super().__init__(**kwargs)
self.report_id = report_id
+ warnings.warn(
+ "This operator is deprecated. Please use
`GoogleDisplayVideo360RunQueryOperator`",
+ DeprecationWarning,
+ )
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
@@ -380,6 +597,79 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator):
hook.run_query(query_id=self.report_id, params=self.parameters)
+class GoogleDisplayVideo360RunQueryOperator(BaseOperator):
+ """
+ Runs a stored query to generate a report.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleDisplayVideo360RunQueryOperator`
+
+ .. seealso::
+ Check also the official API docs:
+ `https://developers.google.com/bid-manager/v2/queries/run`
+
+ :param report_id: Report ID to run.
+ :param parameters: Parameters for running a report as described here:
+ https://developers.google.com/bid-manager/v2/queries/run
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
+ domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "query_id",
+ "parameters",
+ "impersonation_chain",
+ )
+
+ def __init__(
+ self,
+ *,
+ query_id: str,
+ parameters: dict[str, Any] | None = None,
+ api_version: str = "v2",
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.query_id = query_id
+ self.api_version = api_version
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.parameters = parameters
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context) -> dict:
+ hook = GoogleDisplayVideo360Hook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info(
+ "Running query %s with the following parameters:\n %s",
+ self.query_id,
+ self.parameters,
+ )
+ response = hook.run_query(query_id=self.query_id,
params=self.parameters)
+ self.xcom_push(context, key="query_id",
value=response["key"]["queryId"])
+ self.xcom_push(context, key="report_id",
value=response["key"]["reportId"])
+ return response
+
+
class GoogleDisplayVideo360DownloadLineItemsOperator(BaseOperator):
"""
Retrieves line items in CSV format.
diff --git
a/airflow/providers/google/marketing_platform/sensors/display_video.py
b/airflow/providers/google/marketing_platform/sensors/display_video.py
index 5605e3cbd4..d4d10ba955 100644
--- a/airflow/providers/google/marketing_platform/sensors/display_video.py
+++ b/airflow/providers/google/marketing_platform/sensors/display_video.py
@@ -17,6 +17,7 @@
"""Sensor for detecting the completion of DV360 reports."""
from __future__ import annotations
+import warnings
from typing import TYPE_CHECKING, Sequence
from airflow import AirflowException
@@ -67,7 +68,10 @@ class GoogleDisplayVideo360ReportSensor(BaseSensorOperator):
**kwargs,
) -> None:
super().__init__(**kwargs)
-
+ warnings.warn(
+ "This operator is deprecated. Please use
`GoogleDisplayVideo360RunQuerySensor`",
+ DeprecationWarning,
+ )
self.report_id = report_id
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
@@ -153,3 +157,69 @@ class
GoogleDisplayVideo360GetSDFDownloadOperationSensor(BaseSensorOperator):
if operation and operation.get("done"):
return True
return False
+
+
+class GoogleDisplayVideo360RunQuerySensor(BaseSensorOperator):
+ """
+ Sensor for detecting the completion of DV360 reports for API v2.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GoogleDisplayVideo360RunQuerySensor`
+
+ :param query_id: Query ID for which report was generated
+ :param report_id: Report ID for which you want to wait
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
+ domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "query_id",
+ "report_id",
+ "impersonation_chain",
+ )
+
+ def __init__(
+ self,
+ *,
+ query_id: str,
+ report_id: str,
+ api_version: str = "v2",
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.query_id = query_id
+ self.report_id = report_id
+ self.api_version = api_version
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def poke(self, context: Context) -> bool:
+ hook = GoogleDisplayVideo360Hook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ response = hook.get_report(query_id=self.query_id,
report_id=self.report_id)
+ status = response.get("metadata", {}).get("status", {}).get("state")
+ self.log.info(f"STATUS OF THE REPORT {self.report_id} FOR QUERY
{self.query_id}: {status}")
+ if response and status in ["DONE", "FAILED"]:
+ return True
+ return False
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index 283daf2055..6b3006c8c0 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -30,7 +30,7 @@ description: |
suspended: false
versions:
- - 8.12.1
+ - 9.0.0
- 8.12.0
- 8.11.0
- 8.10.0
diff --git
a/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst
b/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst
index 60cd3db322..c0e225f86e 100644
---
a/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst
+++
b/docs/apache-airflow-providers-google/operators/marketing_platform/display_video.rst
@@ -30,6 +30,8 @@ Prerequisite Tasks
Creating a report
^^^^^^^^^^^^^^^^^
+This Operator is Deprecated and will be removed soon. Please look at
``GoogleDisplayVideo360CreateQueryOperator``.
+
To create Display&Video 360 report use
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateReportOperator`.
@@ -47,6 +49,45 @@ The result is saved to :ref:`XCom <concepts:xcom>`, which
allows the result to b
.. _howto/operator:GoogleDisplayVideo360DeleteReportOperator:
+
+Creating a Query
+^^^^^^^^^^^^^^^^
+
+To create Display&Video 360 query use
+:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateQueryOperator`.
+
+.. exampleinclude::
/../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_google_display_video_create_query_operator]
+ :end-before: [END howto_google_display_video_create_query_operator]
+
+Use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateQueryOperator`
+parameters which allow you to dynamically determine values. You can provide
body definition using ``
+.json`` file as this operator supports this template extension.
+The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to
be used by other operators.
+
+.. _howto/operator:GoogleDisplayVideo360RunQueryOperator:
+
+Run Query
+^^^^^^^^^
+
+:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360RunQueryOperator`.
+
+.. exampleinclude::
/../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_google_display_video_run_query_report_operator]
+ :end-before: [END howto_google_display_video_run_query_report_operator]
+
+You can use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360RunQueryOperator`
+parameters which allow you to dynamically determine values.
+The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to
be used by other operators.
+
+.. _howto/operator:GoogleDisplayVideo360RunQuerySensor:
+
Deleting a report
^^^^^^^^^^^^^^^^^
@@ -68,6 +109,9 @@ parameters which allow you to dynamically determine values.
Waiting for report
^^^^^^^^^^^^^^^^^^
+This Operator is Deprecated and will be removed soon. Please look:
+.. _howto/operators:GoogleDisplayVideo360RunQuerySensor
+
To wait for the report use
:class:`~airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360ReportSensor`.
@@ -83,9 +127,30 @@ parameters which allow you to dynamically determine values.
.. _howto/operator:GoogleDisplayVideo360DownloadReportOperator:
+Waiting for query
+^^^^^^^^^^^^^^^^^
+
+To wait for the report use
+:class:`~airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360RunQuerySensor`.
+
+.. exampleinclude::
/../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_google_display_video_wait_run_query_sensor]
+ :end-before: [END howto_google_display_video_wait_run_query_sensor]
+
+Use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360RunQuerySensor`
+parameters which allow you to dynamically determine values.
+
+.. _howto/operator:GoogleDisplayVideo360DownloadReportV2Operator:
+
Downloading a report
^^^^^^^^^^^^^^^^^^^^
+This Operator is Deprecated and will be removed soon. Please look:
+.. _howto/operators:GoogleDisplayVideo360DownloadReportV2Operator
+
To download a report to GCS bucket use
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DownloadReportOperator`.
@@ -105,6 +170,9 @@ parameters which allow you to dynamically determine values.
Running a report
^^^^^^^^^^^^^^^^
+This Operator is Deprecated and will be removed soon. Please look:
+.. _howto/operators:GoogleDisplayVideo360RunQueryOperator
+
To run Display&Video 360 report use
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360RunReportOperator`.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index d603114eea..d7074f4cfd 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -300,6 +300,7 @@ cpu
cpus
crd
createDisposition
+CreateQueryOperator
creationTimestamp
credssp
Cron
@@ -443,6 +444,7 @@ discoverability
discoverable
diskTypes
displayName
+DisplayVideo
distcp
distro
distros
@@ -469,6 +471,7 @@ dogstatsd
donot
Dont
DOS'ing
+DownloadReportV
downscaling
Drillbit
Drivy
@@ -638,6 +641,7 @@ golang
googleads
GoogleAdsRow
googleapiclient
+GoogleDisplayVideo
gpu
gpus
greenlet
@@ -1252,6 +1256,8 @@ rtype
ru
runAsUser
runnable
+RunQueryOperator
+RunQuerySensor
runspace
RunSubmitTaskSettings
runtime
diff --git a/scripts/in_container/verify_providers.py
b/scripts/in_container/verify_providers.py
index dd09e2f7c2..582cd56d2c 100755
--- a/scripts/in_container/verify_providers.py
+++ b/scripts/in_container/verify_providers.py
@@ -152,6 +152,10 @@ KNOWN_COMMON_DEPRECATED_MESSAGES: set[str] = {
"This module is deprecated. Please use
`airflow.providers.cncf.kubernetes.operators.pod` instead.",
"urllib3 (1.26.6) or chardet (5.1.0)/charset_normalizer (2.0.12) doesn't
match a supported version!",
"urllib3 (1.26.9) or chardet (5.1.0)/charset_normalizer (2.0.12) doesn't
match a supported version!",
+ "This operator is deprecated. Please use
`GoogleDisplayVideo360CreateQueryOperator`",
+ "This operator is deprecated. Please use
`GoogleDisplayVideo360RunQueryOperator`",
+ "This operator is deprecated. Please use
`GoogleDisplayVideo360RunQuerySensor`",
+ "This operator is deprecated. Please use
`GoogleDisplayVideo360DownloadReportV2Operator`",
}
# The set of warning messages generated by direct importing of some deprecated
modules. We should only
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index d0aa829f38..ec26f526a8 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -266,6 +266,10 @@ class
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
"airflow.providers.google.cloud.operators.bigquery.BigQueryPatchDatasetOperator",
"airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator",
"airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator",
+
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360CreateQueryOperator",
+
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360RunQueryOperator",
+
"airflow.providers.google.marketing_platform.operators.GoogleDisplayVideo360DownloadReportV2Operator",
+
"airflow.providers.google.marketing_platform.sensors.GoogleDisplayVideo360RunQuerySensor",
}
BASE_CLASSES = {
diff --git
a/tests/providers/google/marketing_platform/hooks/test_display_video.py
b/tests/providers/google/marketing_platform/hooks/test_display_video.py
index f179dd436f..55a3c91308 100644
--- a/tests/providers/google/marketing_platform/hooks/test_display_video.py
+++ b/tests/providers/google/marketing_platform/hooks/test_display_video.py
@@ -32,7 +32,7 @@ class TestGoogleDisplayVideo360Hook:
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
new=mock_base_gcp_hook_default_project_id,
):
- self.hook = GoogleDisplayVideo360Hook(gcp_conn_id=GCP_CONN_ID)
+ self.hook = GoogleDisplayVideo360Hook(api_version=API_VERSION,
gcp_conn_id=GCP_CONN_ID)
@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
@@ -371,3 +371,141 @@ class TestGoogleDisplayVideo360Hook:
get_conn_to_display_video.return_value.media.return_value.download_media.assert_called_once_with(
resourceName=resource_name
)
+
+
+class TestGoogleDisplayVideo360v2Hook:
+ def setup_method(self):
+ with mock.patch(
+
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.__init__",
+ new=mock_base_gcp_hook_default_project_id,
+ ):
+ self.api_version = "v2"
+ self.hook =
GoogleDisplayVideo360Hook(api_version=self.api_version, gcp_conn_id=GCP_CONN_ID)
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook._authorize"
+ )
+
@mock.patch("airflow.providers.google.marketing_platform.hooks.display_video.build")
+ def test_gen_conn(self, mock_build, mock_authorize):
+ result = self.hook.get_conn()
+ mock_build.assert_called_once_with(
+ "doubleclickbidmanager",
+ self.api_version,
+ http=mock_authorize.return_value,
+ cache_discovery=False,
+ )
+ assert mock_build.return_value == result
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook._authorize"
+ )
+
@mock.patch("airflow.providers.google.marketing_platform.hooks.display_video.build")
+ def test_get_conn_to_display_video(self, mock_build, mock_authorize):
+ result = self.hook.get_conn_to_display_video()
+ mock_build.assert_called_once_with(
+ "displayvideo",
+ self.api_version,
+ http=mock_authorize.return_value,
+ cache_discovery=False,
+ )
+ assert mock_build.return_value == result
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn"
+ )
+ def test_create_query(self, get_conn_mock):
+ body = {"body": "test"}
+
+ return_value = "TEST"
+
get_conn_mock.return_value.queries.return_value.create.return_value.execute.return_value
= (
+ return_value
+ )
+
+ result = self.hook.create_query(query=body)
+
+
get_conn_mock.return_value.queries.return_value.create.assert_called_once_with(body=body)
+
+ assert return_value == result
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn"
+ )
+ def test_delete_query(self, get_conn_mock):
+ query_id = "QUERY_ID"
+
+ return_value = "TEST"
+
get_conn_mock.return_value.queries.return_value.delete.return_value.execute.return_value
= (
+ return_value
+ )
+
+ self.hook.delete_query(query_id=query_id)
+
+
get_conn_mock.return_value.queries.return_value.delete.assert_called_once_with(queryId=query_id)
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn"
+ )
+ def test_get_query(self, get_conn_mock):
+ query_id = "QUERY_ID"
+
+ return_value = "TEST"
+
get_conn_mock.return_value.queries.return_value.get.return_value.execute.return_value
= return_value
+
+ result = self.hook.get_query(query_id=query_id)
+
+
get_conn_mock.return_value.queries.return_value.get.assert_called_once_with(queryId=query_id)
+
+ assert return_value == result
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn"
+ )
+ def test_list_queries(self, get_conn_mock):
+ queries = ["test"]
+ return_value = {"queries": queries}
+
get_conn_mock.return_value.queries.return_value.list.return_value.execute.return_value
= return_value
+
+ result = self.hook.list_queries()
+
+
get_conn_mock.return_value.queries.return_value.list.assert_called_once_with()
+
+ assert queries == result
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn"
+ )
+ def test_run_query(self, get_conn_mock):
+ query_id = "QUERY_ID"
+ params = {"params": "test"}
+
+ self.hook.run_query(query_id=query_id, params=params)
+
+
get_conn_mock.return_value.queries.return_value.run.assert_called_once_with(
+ queryId=query_id, body=params
+ )
+
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.hooks."
+ "display_video.GoogleDisplayVideo360Hook.get_conn"
+ )
+ def test_get_report(self, get_conn_mock):
+ query_id = "QUERY_ID"
+ report_id = "REPORT_ID"
+
+ return_value = "TEST"
+
get_conn_mock.return_value.queries.return_value.reports.return_value.get.return_value.execute.return_value
= ( # noqa
+ return_value
+ )
+
+ self.hook.get_report(query_id=query_id, report_id=report_id)
+
+
get_conn_mock.return_value.queries.return_value.reports.return_value.get.assert_called_once_with(
+ queryId=query_id, reportId=report_id
+ )
diff --git
a/tests/providers/google/marketing_platform/operators/test_display_video.py
b/tests/providers/google/marketing_platform/operators/test_display_video.py
index bf9552485c..d55a9a729a 100644
--- a/tests/providers/google/marketing_platform/operators/test_display_video.py
+++ b/tests/providers/google/marketing_platform/operators/test_display_video.py
@@ -25,11 +25,14 @@ import pytest
from airflow.models import DAG, TaskInstance as TI
from airflow.providers.google.marketing_platform.operators.display_video
import (
+ GoogleDisplayVideo360CreateQueryOperator,
GoogleDisplayVideo360CreateReportOperator,
GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadLineItemsOperator,
GoogleDisplayVideo360DownloadReportOperator,
+ GoogleDisplayVideo360DownloadReportV2Operator,
+ GoogleDisplayVideo360RunQueryOperator,
GoogleDisplayVideo360RunReportOperator,
GoogleDisplayVideo360SDFtoGCSOperator,
GoogleDisplayVideo360UploadLineItemsOperator,
@@ -37,7 +40,7 @@ from
airflow.providers.google.marketing_platform.operators.display_video import
from airflow.utils import timezone
from airflow.utils.session import create_session
-API_VERSION = "api_version"
+API_VERSION = "v2"
GCP_CONN_ID = "google_cloud_default"
DELEGATE_TO: str | None = None
IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
@@ -235,6 +238,132 @@ class TestGoogleDisplayVideo360DownloadReportOperator:
)
+class TestGoogleDisplayVideo360DownloadReportV2Operator:
+ def setup_method(self):
+ with create_session() as session:
+ session.query(TI).delete()
+
+ def teardown_method(self):
+ with create_session() as session:
+ session.query(TI).delete()
+
+
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.shutil")
+
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.urllib.request")
+
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.tempfile")
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.operators."
+ "display_video.GoogleDisplayVideo360DownloadReportV2Operator.xcom_push"
+ )
+
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.GCSHook")
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
+ )
+ def test_execute(
+ self,
+ mock_hook,
+ mock_gcs_hook,
+ mock_xcom,
+ mock_temp,
+ mock_request,
+ mock_shutil,
+ ):
+ mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name
= FILENAME
+ mock_hook.return_value.get_report.return_value = {
+ "metadata": {
+ "status": {"state": "DONE"},
+ "googleCloudStoragePath": "TEST",
+ }
+ }
+ op = GoogleDisplayVideo360DownloadReportV2Operator(
+ query_id=QUERY_ID,
+ report_id=REPORT_ID,
+ bucket_name=BUCKET_NAME,
+ report_name=REPORT_NAME,
+ task_id="test_task",
+ )
+ op.execute(context=None)
+ mock_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=None,
+ api_version="v2",
+ impersonation_chain=None,
+ )
+
mock_hook.return_value.get_report.assert_called_once_with(report_id=REPORT_ID,
query_id=QUERY_ID)
+
+ mock_gcs_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=None,
+ impersonation_chain=None,
+ )
+ mock_gcs_hook.return_value.upload.assert_called_once_with(
+ bucket_name=BUCKET_NAME,
+ filename=FILENAME,
+ gzip=True,
+ mime_type="text/csv",
+ object_name=REPORT_NAME + ".gz",
+ )
+ mock_xcom.assert_called_once_with(None, key="report_name",
value=REPORT_NAME + ".gz")
+
+ @pytest.mark.parametrize(
+ "test_bucket_name",
+ [BUCKET_NAME, f"gs://{BUCKET_NAME}", "XComArg", "{{
ti.xcom_pull(task_ids='f') }}"],
+ )
+
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.shutil")
+
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.urllib.request")
+
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.tempfile")
+
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.GCSHook")
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
+ )
+ def test_set_bucket_name(
+ self,
+ mock_hook,
+ mock_gcs_hook,
+ mock_temp,
+ mock_request,
+ mock_shutil,
+ test_bucket_name,
+ ):
+ mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name
= FILENAME
+ mock_hook.return_value.get_report.return_value = {
+ "metadata": {"status": {"state": "DONE"},
"googleCloudStoragePath": "TEST"}
+ }
+
+ dag = DAG(
+ dag_id="test_set_bucket_name",
+ start_date=DEFAULT_DATE,
+ schedule=None,
+ catchup=False,
+ )
+
+ if BUCKET_NAME not in test_bucket_name:
+
+ @dag.task
+ def f():
+ return BUCKET_NAME
+
+ taskflow_op = f()
+ taskflow_op.operator.run(start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE)
+
+ op = GoogleDisplayVideo360DownloadReportV2Operator(
+ query_id=QUERY_ID,
+ report_id=REPORT_ID,
+ bucket_name=test_bucket_name if test_bucket_name != "XComArg" else
taskflow_op,
+ report_name=REPORT_NAME,
+ task_id="test_task",
+ dag=dag,
+ )
+ op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ mock_gcs_hook.return_value.upload.assert_called_once_with(
+ bucket_name=BUCKET_NAME,
+ filename=FILENAME,
+ gzip=True,
+ mime_type="text/csv",
+ object_name=REPORT_NAME + ".gz",
+ )
+
+
class TestGoogleDisplayVideo360RunReportOperator:
@mock.patch(
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
@@ -257,6 +386,41 @@ class TestGoogleDisplayVideo360RunReportOperator:
hook_mock.return_value.run_query.assert_called_once_with(query_id=REPORT_ID,
params=parameters)
+class TestGoogleDisplayVideo360RunQueryOperator:
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.operators."
+ "display_video.GoogleDisplayVideo360RunQueryOperator.xcom_push"
+ )
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
+ )
+ def test_execute(self, hook_mock, mock_xcom):
+ parameters = {"param": "test"}
+ hook_mock.return_value.run_query.return_value = {
+ "key": {
+ "queryId": QUERY_ID,
+ "reportId": REPORT_ID,
+ }
+ }
+ op = GoogleDisplayVideo360RunQueryOperator(
+ query_id=QUERY_ID,
+ parameters=parameters,
+ api_version=API_VERSION,
+ task_id="test_task",
+ )
+ op.execute(context=None)
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=None,
+ api_version=API_VERSION,
+ impersonation_chain=None,
+ )
+
+ mock_xcom.assert_any_call(None, key="query_id", value=QUERY_ID)
+ mock_xcom.assert_any_call(None, key="report_id", value=REPORT_ID)
+
hook_mock.return_value.run_query.assert_called_once_with(query_id=QUERY_ID,
params=parameters)
+
+
class TestGoogleDisplayVideo360DownloadLineItemsOperator:
@mock.patch(
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
@@ -456,3 +620,37 @@ class
TestGoogleDisplayVideo360CreateSDFDownloadTaskOperator:
body_request=body_request
)
xcom_mock.assert_called_once_with(None, key="name", value=test_name)
+
+
+class TestGoogleDisplayVideo360CreateQueryOperator:
+ @mock.patch(
+ "airflow.providers.google.marketing_platform.operators."
+ "display_video.GoogleDisplayVideo360CreateQueryOperator.xcom_push"
+ )
+ @mock.patch(
+
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
+ )
+ def test_execute(self, hook_mock, xcom_mock):
+ body = {"body": "test"}
+ hook_mock.return_value.create_query.return_value = {"queryId":
QUERY_ID}
+ op = GoogleDisplayVideo360CreateQueryOperator(body=body,
task_id="test_task")
+ op.execute(context=None)
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=None,
+ api_version="v2",
+ impersonation_chain=None,
+ )
+ hook_mock.return_value.create_query.assert_called_once_with(query=body)
+ xcom_mock.assert_called_once_with(None, key="query_id", value=QUERY_ID)
+
+ def test_prepare_template(self):
+ body = {"key": "value"}
+ with NamedTemporaryFile("w+", suffix=".json") as f:
+ f.write(json.dumps(body))
+ f.flush()
+ op = GoogleDisplayVideo360CreateQueryOperator(body=body,
task_id="test_task")
+ op.prepare_template()
+
+ assert isinstance(op.body, dict)
+ assert op.body == body
diff --git
a/tests/providers/google/marketing_platform/sensors/test_display_video.py
b/tests/providers/google/marketing_platform/sensors/test_display_video.py
index fb7a7c9091..193ba18e6b 100644
--- a/tests/providers/google/marketing_platform/sensors/test_display_video.py
+++ b/tests/providers/google/marketing_platform/sensors/test_display_video.py
@@ -22,6 +22,7 @@ from unittest import mock
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360GetSDFDownloadOperationSensor,
GoogleDisplayVideo360ReportSensor,
+ GoogleDisplayVideo360RunQuerySensor,
)
API_VERSION = "api_version"
@@ -46,6 +47,23 @@ class TestGoogleDisplayVideo360ReportSensor:
hook_mock.return_value.get_query.assert_called_once_with(query_id=report_id)
+class TestGoogleDisplayVideo360RunQuerySensor:
+
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360Hook")
+
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.BaseSensorOperator")
+ def test_poke(self, mock_base_op, hook_mock):
+ query_id = "QUERY_ID"
+ report_id = "REPORT_ID"
+ op = GoogleDisplayVideo360RunQuerySensor(query_id=query_id,
report_id=report_id, task_id="test_task")
+ op.poke(context=None)
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=None,
+ api_version="v2",
+ impersonation_chain=None,
+ )
+
hook_mock.return_value.get_report.assert_called_once_with(query_id=query_id,
report_id=report_id)
+
+
class TestGoogleDisplayVideo360Sensor:
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360Hook")
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.BaseSensorOperator")