This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 58aefb2 Added SDFtoGCSOperator (#8740) 58aefb2 is described below commit 58aefb23b1d456bbb24876a4e3ff14f25d6274b0 Author: Michał Słowikowski <michalslowikowsk...@gmail.com> AuthorDate: Fri May 8 10:20:44 2020 +0200 Added SDFtoGCSOperator (#8740) Co-authored-by: michalslowikowski00 <michal.slowikow...@polidea.com> --- .../providers/google/common/hooks/base_google.py | 22 ++- .../example_dags/example_display_video.py | 74 ++++++++-- .../marketing_platform/hooks/display_video.py | 67 ++++++++- .../marketing_platform/operators/display_video.py | 163 ++++++++++++++++++++- .../marketing_platform/sensors/display_video.py | 59 +++++++- docs/howto/operator/gcp/display_video.rst | 55 +++++++ .../marketing_platform/hooks/test_display_video.py | 151 ++++++++++++++++++- .../operators/test_display_video.py | 141 +++++++++++++++--- .../operators/test_display_video_system.py | 11 +- .../sensors/test_display_video.py | 25 +++- 10 files changed, 718 insertions(+), 50 deletions(-) diff --git a/airflow/providers/google/common/hooks/base_google.py b/airflow/providers/google/common/hooks/base_google.py index a92d349..4d3418a 100644 --- a/airflow/providers/google/common/hooks/base_google.py +++ b/airflow/providers/google/common/hooks/base_google.py @@ -39,7 +39,7 @@ from google.api_core.gapic_v1.client_info import ClientInfo from google.auth import _cloud_sdk from google.auth.environment_vars import CREDENTIALS from googleapiclient.errors import HttpError -from googleapiclient.http import set_user_agent +from googleapiclient.http import MediaIoBaseDownload, set_user_agent from airflow import version from airflow.exceptions import AirflowException @@ -456,3 +456,23 @@ class GoogleBaseHook(BaseHook): creds_content["refresh_token"], ]) yield + + @staticmethod + def download_content_from_request(file_handle, request, chunk_size): + """ + Download media resources. + Note that the Python file object is compatible with io.Base and can be used with this class also. + + :param file_handle: io.Base or file object. The stream in which to write the downloaded + bytes. + :type file_handle: io.Base or file object + :param request: googleapiclient.http.HttpRequest, the media request to perform in chunks. + :type request: Dict + :param chunk_size: int, File will be downloaded in chunks of this many bytes. + :type chunk_size: int + """ + downloader = MediaIoBaseDownload(file_handle, request, chunksize=chunk_size) + done = False + while done is False: + _, done = downloader.next_chunk() + file_handle.flush() diff --git a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py b/airflow/providers/google/marketing_platform/example_dags/example_display_video.py index 6373972..a46cf2e 100644 --- a/airflow/providers/google/marketing_platform/example_dags/example_display_video.py +++ b/airflow/providers/google/marketing_platform/example_dags/example_display_video.py @@ -19,15 +19,18 @@ Example Airflow DAG that shows how to use DisplayVideo. """ import os +from typing import Dict from airflow import models +from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator from airflow.providers.google.marketing_platform.operators.display_video import ( - GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator, - GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator, - GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator, + GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator, + GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator, + GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator, + GoogleDisplayVideo360SDFtoGCSOperator, GoogleDisplayVideo360UploadLineItemsOperator, ) from airflow.providers.google.marketing_platform.sensors.display_video import ( - GoogleDisplayVideo360ReportSensor, + GoogleDisplayVideo360GetSDFDownloadOperationSensor, GoogleDisplayVideo360ReportSensor, ) from airflow.utils import dates @@ -35,6 +38,15 @@ from airflow.utils import dates BUCKET = os.environ.get("GMP_DISPLAY_VIDEO_BUCKET", "gs://test-display-video-bucket") ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID", 1234567) OBJECT_NAME = os.environ.get("GMP_OBJECT_NAME", "files/report.csv") +PATH_TO_UPLOAD_FILE = os.environ.get( + "GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt" +) +PATH_TO_SAVED_FILE = os.environ.get( + "GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt" +) +BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1] +SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1") +BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test") REPORT = { "kind": "doubleclickbidmanager#query", @@ -55,14 +67,16 @@ REPORT = { } PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"} + +BODY_REQUEST: Dict = { + "version": SDF_VERSION, + "advertiserId": ADVERTISER_ID, + "inventorySourceFilter": {"inventorySourceIds": []}, +} # [END howto_display_video_env_variables] # download_line_items variables -REQUEST_BODY = { - "filterType": ADVERTISER_ID, - "format": "CSV", - "fileSpec": "EWF" -} +REQUEST_BODY = {"filterType": ADVERTISER_ID, "format": "CSV", "fileSpec": "EWF"} default_args = {"start_date": dates.days_ago(1)} @@ -119,7 +133,47 @@ with models.DAG( upload_line_items = GoogleDisplayVideo360UploadLineItemsOperator( task_id="upload_line_items", bucket_name=BUCKET, - object_name=OBJECT_NAME, + object_name=BUCKET_FILE_LOCATION, ) # [END howto_google_display_video_upload_line_items_operator] + + # [START howto_google_display_video_create_sdf_download_task_operator] + create_sdf_download_task = GoogleDisplayVideo360CreateSDFDownloadTaskOperator( + task_id="create_sdf_download_task", body_request=BODY_REQUEST + ) + operation_name = '{{ task_instance.xcom_pull("create_sdf_download_task")["name"] }}' + # [END howto_google_display_video_create_sdf_download_task_operator] + + # [START howto_google_display_video_wait_for_operation_sensor] + wait_for_operation = GoogleDisplayVideo360GetSDFDownloadOperationSensor( + task_id="wait_for_operation", operation_name=operation_name, + ) + # [END howto_google_display_video_wait_for_operation_sensor] + + # [START howto_google_display_video_save_sdf_in_gcs_operator] + save_sdf_in_gcs = GoogleDisplayVideo360SDFtoGCSOperator( + task_id="save_sdf_in_gcs", + operation_name=operation_name, + bucket_name=BUCKET, + object_name=BUCKET_FILE_LOCATION, + gzip=False, + ) + # [END howto_google_display_video_save_sdf_in_gcs_operator] + + # [START howto_google_display_video_gcs_to_big_query_operator] + upload_sdf_to_big_query = GCSToBigQueryOperator( + task_id="upload_sdf_to_big_query", + bucket=BUCKET, + source_objects=['{{ task_instance.xcom_pull("upload_sdf_to_bigquery")}}'], + destination_project_dataset_table=f"{BQ_DATA_SET}.gcs_to_bq_table", + schema_fields=[ + {"name": "name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"}, + ], + write_disposition="WRITE_TRUNCATE", + dag=dag, + ) + # [END howto_google_display_video_gcs_to_big_query_operator] + create_report >> run_report >> wait_for_report >> get_report >> delete_report + create_sdf_download_task >> wait_for_operation >> save_sdf_in_gcs >> upload_sdf_to_big_query diff --git a/airflow/providers/google/marketing_platform/hooks/display_video.py b/airflow/providers/google/marketing_platform/hooks/display_video.py index e7bb797..9985dd0 100644 --- a/airflow/providers/google/marketing_platform/hooks/display_video.py +++ b/airflow/providers/google/marketing_platform/hooks/display_video.py @@ -56,6 +56,20 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook): ) return self._conn + def get_conn_to_display_video(self) -> Resource: + """ + Retrieves connection to DisplayVideo. + """ + if not self._conn: + http_authorized = self._authorize() + self._conn = build( + "displayvideo", + self.api_version, + http=http_authorized, + cache_discovery=False, + ) + return self._conn + def create_query(self, query: Dict[str, Any]) -> Dict: """ Creates a query. @@ -111,7 +125,7 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook): .listqueries() .execute(num_retries=self.num_retries) ) - return response.get('queries', []) + return response.get("queries", []) def run_query(self, query_id: str, params: Dict[str, Any]) -> None: """ @@ -170,3 +184,54 @@ class GoogleDisplayVideo360Hook(GoogleBaseHook): .execute(num_retries=self.num_retries) ) return response["lineItems"] + + def create_sdf_download_operation(self, body_request: Dict[str, Any]) -> Dict[str, Any]: + """ + Creates an SDF Download Task and Returns an Operation. + + :param body_request: Body request. + :type body_request: Dict[str, Any] + + More information about body request n be found here: + https://developers.google.com/display-video/api/reference/rest/v1/sdfdownloadtasks/create + """ + + result = ( + self.get_conn_to_display_video() # pylint: disable=no-member + .sdfdownloadtasks() + .create(body=body_request) + .execute(num_retries=self.num_retries) + ) + return result + + def get_sdf_download_operation(self, operation_name: str): + """ + Gets the latest state of an asynchronous SDF download task operation. + + :param operation_name: The name of the operation resource. + :type operation_name: str + """ + + result = ( + self.get_conn_to_display_video() # pylint: disable=no-member + .sdfdownloadtasks() + .operation() + .get(name=operation_name) + .execute(num_retries=self.num_retries) + ) + return result + + def download_media(self, resource_name: str): + """ + Downloads media. + + :param resource_name: of the media that is being downloaded. + :type resource_name: str + """ + + request = ( + self.get_conn_to_display_video() # pylint: disable=no-member + .media() + .download_media(resource_name=resource_name) + ) + return request diff --git a/airflow/providers/google/marketing_platform/operators/display_video.py b/airflow/providers/google/marketing_platform/operators/display_video.py index f4c70c5..993f93d 100644 --- a/airflow/providers/google/marketing_platform/operators/display_video.py +++ b/airflow/providers/google/marketing_platform/operators/display_video.py @@ -67,7 +67,7 @@ class GoogleDisplayVideo360CreateReportOperator(BaseOperator): gcp_conn_id: str = "google_cloud_default", delegate_to: Optional[str] = None, *args, - **kwargs + **kwargs, ) -> None: super().__init__(*args, **kwargs) self.body = body @@ -125,7 +125,7 @@ class GoogleDisplayVideo360DeleteReportOperator(BaseOperator): gcp_conn_id: str = "google_cloud_default", delegate_to: Optional[str] = None, *args, - **kwargs + **kwargs, ) -> None: super().__init__(*args, **kwargs) self.report_id = report_id @@ -209,7 +209,7 @@ class GoogleDisplayVideo360DownloadReportOperator(BaseOperator): gcp_conn_id: str = "google_cloud_default", delegate_to: Optional[str] = None, *args, - **kwargs + **kwargs, ) -> None: super().__init__(*args, **kwargs) self.report_id = report_id @@ -312,7 +312,7 @@ class GoogleDisplayVideo360RunReportOperator(BaseOperator): gcp_conn_id: str = "google_cloud_default", delegate_to: Optional[str] = None, *args, - **kwargs + **kwargs, ) -> None: super().__init__(*args, **kwargs) self.report_id = report_id @@ -440,7 +440,7 @@ class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator): gcp_conn_id: str = "google_cloud_default", delegate_to: Optional[str] = None, *args, - **kwargs + **kwargs, ) -> None: super().__init__(*args, **kwargs) self.bucket_name = bucket_name @@ -450,9 +450,7 @@ class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator): self.delegate_to = delegate_to def execute(self, context: Dict): - gcs_hook = GCSHook( - gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to - ) + gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) hook = GoogleDisplayVideo360Hook( gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, @@ -470,3 +468,152 @@ class GoogleDisplayVideo360UploadLineItemsOperator(BaseOperator): ) f.flush() hook.upload_line_items(line_items=line_items) + + +class GoogleDisplayVideo360CreateSDFDownloadTaskOperator(BaseOperator): + """ + Creates SDF operation task. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleDisplayVideo360CreateSDFDownloadTaskOperator` + + .. seealso:: + Check also the official API docs: + `https://developers.google.com/display-video/api/reference/rest` + + :param version: The SDF version of the downloaded file.. + :type version: str + :param partner_id: The ID of the partner to download SDF for. + :type partner_id: str + :param advertiser_id: The ID of the advertiser to download SDF for. + :type advertiser_id: str + :param parent_entity_filter: Filters on selected file types. + :type parent_entity_filter: Dict[str, Any] + :param id_filter: Filters on entities by their entity IDs. + :type id_filter: Dict[str, Any] + :param inventory_source_filter: Filters on Inventory Sources by their IDs. + :type inventory_source_filter: Dict[str, Any] + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate, if any. For this to work, the service account making the + request must have domain-wide delegation enabled. + :type delegate_to: str + """ + + template_fields = ("body_request", ) + + @apply_defaults + def __init__( + self, + body_request: Dict[str, Any], + api_version: str = "v1", + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.body_request = body_request + self.api_version = api_version + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + + def execute(self, context: Dict): + hook = GoogleDisplayVideo360Hook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + api_version=self.api_version, + ) + + self.log.info("Creating operation for SDF download task...") + operation = hook.create_sdf_download_operation( + body_request=self.body_request + ) + + return operation + + +class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator): + """ + Download SDF media and save it in the Google Cloud Storage. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleDisplayVideo360SDFtoGCSOperator` + + .. seealso:: + Check also the official API docs: + `https://developers.google.com/display-video/api/reference/rest` + + :param version: The SDF version of the downloaded file.. + :type version: str + :param partner_id: The ID of the partner to download SDF for. + :type partner_id: str + :param advertiser_id: The ID of the advertiser to download SDF for. + :type advertiser_id: str + :param parent_entity_filter: Filters on selected file types. + :type parent_entity_filter: Dict[str, Any] + :param id_filter: Filters on entities by their entity IDs. + :type id_filter: Dict[str, Any] + :param inventory_source_filter: Filters on Inventory Sources by their IDs. + :type inventory_source_filter: Dict[str, Any] + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate, if any. For this to work, the service account making the + request must have domain-wide delegation enabled. + :type delegate_to: str + """ + + template_fields = ("operation_name", "bucket_name", "object_name", "body_request") + + @apply_defaults + def __init__( + self, + operation_name: str, + bucket_name: str, + object_name: str, + gzip: bool = False, + api_version: str = "v1", + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.operation_name = operation_name + self.bucket_name = bucket_name + self.object_name = object_name + self.gzip = gzip + self.api_version = api_version + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + + def execute(self, context: Dict): + hook = GoogleDisplayVideo360Hook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + api_version=self.api_version, + ) + gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) + + self.log.info("Retrieving operation...") + operation = hook.get_sdf_download_operation(operation_name=self.operation_name) + + self.log.info("Creating file for upload...") + media = hook.download_media(resource_name=operation) + + self.log.info("Sending file to the Google Cloud Storage...") + with tempfile.NamedTemporaryFile() as temp_file: + hook.download_content_from_request( + temp_file, media, chunk_size=1024 * 1024 + ) + temp_file.flush() + gcs_hook.upload( + bucket_name=self.bucket_name, + object_name=self.object_name, + filename=temp_file.name, + gzip=self.gzip, + ) + + return f"{self.bucket_name}/{self.object_name}" diff --git a/airflow/providers/google/marketing_platform/sensors/display_video.py b/airflow/providers/google/marketing_platform/sensors/display_video.py index 4aafff3..7306c69 100644 --- a/airflow/providers/google/marketing_platform/sensors/display_video.py +++ b/airflow/providers/google/marketing_platform/sensors/display_video.py @@ -20,6 +20,7 @@ Sensor for detecting the completion of DV360 reports. """ from typing import Dict, Optional +from airflow import AirflowException from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook from airflow.sensors.base_sensor_operator import BaseSensorOperator @@ -38,7 +39,7 @@ class GoogleDisplayVideo360ReportSensor(BaseSensorOperator): :type api_version: str :param gcp_conn_id: The connection ID to use when fetching connection info. :type gcp_conn_id: str - :param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the + :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: str """ @@ -72,3 +73,59 @@ class GoogleDisplayVideo360ReportSensor(BaseSensorOperator): if response and not response.get("metadata", {}).get("running"): return True return False + + +class GoogleDisplayVideo360GetSDFDownloadOperationSensor(BaseSensorOperator): + """ + Sensor for detecting the completion of SDF operation. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GoogleDisplayVideo360GetSDFDownloadOperationSensor` + + :param name: The name of the operation resource + :type name: Dict[str, Any] + :param api_version: The version of the api that will be requested for example 'v1'. + :type api_version: str + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate, if any. For this to work, the service account making the + request must have domain-wide delegation enabled. + :type delegate_to: str + + """ + + template_fields = ("operation_name", ) + + def __init__( + self, + operation_name: str, + api_version: str = "v1", + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + mode: str = "reschedule", + poke_interval: int = 60 * 5, + *args, + **kwargs + ): + super().__init__(*args, **kwargs) + self.mode = mode + self.poke_interval = poke_interval + self.operation_name = operation_name + self.api_version = api_version + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + + def poke(self, context: Dict) -> bool: + hook = GoogleDisplayVideo360Hook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + api_version=self.api_version, + ) + operation = hook.get_sdf_download_operation(operation_name=self.operation_name) + + if "error" in operation: + raise AirflowException(f'The operation finished in error with {operation["error"]}') + if operation and operation.get("done"): + return True + return False diff --git a/docs/howto/operator/gcp/display_video.rst b/docs/howto/operator/gcp/display_video.rst index 6890f81..04f4814 100644 --- a/docs/howto/operator/gcp/display_video.rst +++ b/docs/howto/operator/gcp/display_video.rst @@ -168,3 +168,58 @@ To run Display&Video 360 uploading line items use Use :ref:`Jinja templating <jinja-templating>` with :template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360UploadLineItemsOperator` parameters which allow you to dynamically determine values. + +.. _howto/operator:GoogleDisplayVideo360CreateSDFDownloadTaskOperator: + +Create SDF download task +^^^^^^^^^^^^^^^^^^^^^^^^ + +To create SDF download task use +:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateSDFDownloadTaskOperator`. + +.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py + :language: python + :dedent: 4 + :start-after: [START howto_google_display_video_create_sdf_download_task_operator] + :end-before: [END howto_google_display_video_create_sdf_download_task_operator] + +Use :ref:`Jinja templating <jinja-templating>` with +:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateSDFDownloadTaskOperator` +parameters which allow you to dynamically determine values. + + +.. _howto/operator:GoogleDisplayVideo360SDFtoGCSOperator: + +Save SDF files in the Google Cloud Storage +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To save SDF files and save them in the Google Cloud Storage use +:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360SDFtoGCSOperator`. + +.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py + :language: python + :dedent: 4 + :start-after: [START howto_google_display_video_save_sdf_in_gcs_operator] + :end-before: [END howto_google_display_video_save_sdf_in_gcs_operator] + +Use :ref:`Jinja templating <jinja-templating>` with +:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360SDFtoGCSOperator` +parameters which allow you to dynamically determine values. + +.. _howto/operator:GoogleDisplayVideo360GetSDFDownloadOperationSensor: + +Waiting for SDF operation +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Wait for SDF operation is executed by: +:class:`~airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360GetSDFDownloadOperationSensor`. + +.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py + :language: python + :dedent: 4 + :start-after: [START howto_google_display_video_wait_for_operation_sensor] + :end-before: [END howto_google_display_video_wait_for_operation_sensor] + +Use :ref:`Jinja templating <jinja-templating>` with +:template-fields:`airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360GetSDFDownloadOperationSensor` +parameters which allow you to dynamically determine values. diff --git a/tests/providers/google/marketing_platform/hooks/test_display_video.py b/tests/providers/google/marketing_platform/hooks/test_display_video.py index e4bba9d..f61f118 100644 --- a/tests/providers/google/marketing_platform/hooks/test_display_video.py +++ b/tests/providers/google/marketing_platform/hooks/test_display_video.py @@ -36,8 +36,9 @@ class TestGoogleDisplayVideo360Hook(TestCase): "airflow.providers.google.marketing_platform.hooks." "display_video.GoogleDisplayVideo360Hook._authorize" ) - @mock.patch("airflow.providers.google.marketing_platform.hooks." - "display_video.build") + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." "display_video.build" + ) def test_gen_conn(self, mock_build, mock_authorize): result = self.hook.get_conn() mock_build.assert_called_once_with( @@ -50,6 +51,23 @@ class TestGoogleDisplayVideo360Hook(TestCase): @mock.patch( "airflow.providers.google.marketing_platform.hooks." + "display_video.GoogleDisplayVideo360Hook._authorize" + ) + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." "display_video.build" + ) + def test_get_conn_to_display_video(self, mock_build, mock_authorize): + result = self.hook.get_conn_to_display_video() + mock_build.assert_called_once_with( + "displayvideo", + API_VERSION, + http=mock_authorize.return_value, + cache_discovery=False, + ) + self.assertEqual(mock_build.return_value, result) + + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." "display_video.GoogleDisplayVideo360Hook.get_conn" ) def test_create_query(self, get_conn_mock): @@ -239,3 +257,132 @@ class TestGoogleDisplayVideo360Hook(TestCase): result = self.hook.upload_line_items(line_items) self.assertEqual(return_value, result) + + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." + "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video" + ) + def test_create_sdf_download_tasks_called_with_params( + self, get_conn_to_display_video + ): + body_request = { + "version": "version", + "partnerId": "partner_id", + "advertiserId": "advertiser_id", + "parentEntityFilter": "parent_entity_filter", + "idFilter": "id_filter", + "inventorySourceFilter": "inventory_source_filter", + } + + self.hook.create_sdf_download_operation(body_request=body_request) + + get_conn_to_display_video.return_value.sdfdownloadtasks.return_value.create.assert_called_once_with( + body=body_request + ) + + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." + "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video" + ) + def test_create_sdf_download_tasks_called_once(self, get_conn_to_display_video): + body_request = { + "version": "version", + "partnerId": "partner_id", + "advertiserId": "advertiser_id", + "parentEntityFilter": "parent_entity_filter", + "idFilter": "id_filter", + "inventorySourceFilter": "inventory_source_filter", + } + + self.hook.create_sdf_download_operation(body_request=body_request) + + get_conn_to_display_video.return_value.sdfdownloadtasks.return_value.create.assert_called_once() + + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." + "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video" + ) + def test_create_sdf_download_tasks_return_equal_values( + self, get_conn_to_display_video + ): + response = ["name"] + body_request = { + "version": "version", + "partnerId": "partner_id", + "advertiserId": "advertiser_id", + "parentEntityFilter": "parent_entity_filter", + "idFilter": "id_filter", + "inventorySourceFilter": "inventory_source_filter", + } + + get_conn_to_display_video.return_value.\ + sdfdownloadtasks.return_value.\ + create.return_value\ + .execute.return_value = response + + result = self.hook.create_sdf_download_operation(body_request=body_request) + self.assertEqual(response, result) + + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." + "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video" + ) + def test_get_sdf_download_tasks_called_with_params(self, get_conn_to_display_video): + operation_name = "operation_name" + self.hook.get_sdf_download_operation(operation_name=operation_name) + get_conn_to_display_video.return_value.\ + sdfdownloadtasks.return_value.\ + operation.return_value.\ + get.assert_called_once_with(name=operation_name) + + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." + "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video" + ) + def test_get_sdf_download_tasks_called_once(self, get_conn_to_display_video): + operation_name = "name" + self.hook.get_sdf_download_operation(operation_name=operation_name) + get_conn_to_display_video.return_value.\ + sdfdownloadtasks.return_value.\ + operation.return_value.\ + get.assert_called_once() + + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." + "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video" + ) + def get_sdf_download_tasks_return_equal_values(self, get_conn_to_display_video): + operation_name = "operation" + response = "reposonse" + + get_conn_to_display_video.return_value.\ + sdfdownloadtasks.return_value.\ + operation.return_value.get = response + + result = self.hook.get_sdf_download_operation(operation_name=operation_name) + + self.assertEqual(operation_name, result) + + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." + "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video" + ) + def test_download_media_called_once(self, get_conn_to_display_video): + resource_name = "resource_name" + + self.hook.download_media(resource_name=resource_name) + get_conn_to_display_video.return_value.\ + media.return_value.\ + download_media.assert_called_once() + + @mock.patch( + "airflow.providers.google.marketing_platform.hooks." + "display_video.GoogleDisplayVideo360Hook.get_conn_to_display_video" + ) + def test_download_media_called_once_with_params(self, get_conn_to_display_video): + resource_name = "resource_name" + + self.hook.download_media(resource_name=resource_name) + get_conn_to_display_video.return_value.\ + media.return_value.\ + download_media.assert_called_once_with(resource_name=resource_name) diff --git a/tests/providers/google/marketing_platform/operators/test_display_video.py b/tests/providers/google/marketing_platform/operators/test_display_video.py index 45fe897..dcff1cb 100644 --- a/tests/providers/google/marketing_platform/operators/test_display_video.py +++ b/tests/providers/google/marketing_platform/operators/test_display_video.py @@ -19,9 +19,10 @@ from unittest import TestCase, mock from airflow.providers.google.marketing_platform.operators.display_video import ( - GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator, - GoogleDisplayVideo360DownloadLineItemsOperator, GoogleDisplayVideo360DownloadReportOperator, - GoogleDisplayVideo360RunReportOperator, GoogleDisplayVideo360UploadLineItemsOperator, + GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360CreateSDFDownloadTaskOperator, + GoogleDisplayVideo360DeleteReportOperator, GoogleDisplayVideo360DownloadLineItemsOperator, + GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator, + GoogleDisplayVideo360SDFtoGCSOperator, GoogleDisplayVideo360UploadLineItemsOperator, ) API_VERSION = "api_version" @@ -80,8 +81,7 @@ class TestGoogleDisplayVideo360DeleteReportOperator(TestCase): class TestGoogleDisplayVideo360GetReportOperator(TestCase): @mock.patch( - "airflow.providers.google.marketing_platform.operators." - "display_video.shutil" + "airflow.providers.google.marketing_platform.operators." "display_video.shutil" ) @mock.patch( "airflow.providers.google.marketing_platform.operators." @@ -96,8 +96,7 @@ class TestGoogleDisplayVideo360GetReportOperator(TestCase): "display_video.GoogleDisplayVideo360DownloadReportOperator.xcom_push" ) @mock.patch( - "airflow.providers.google.marketing_platform.operators." - "display_video.GCSHook" + "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook" ) @mock.patch( "airflow.providers.google.marketing_platform.operators." @@ -189,8 +188,7 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase): "display_video.GoogleDisplayVideo360Hook" ) @mock.patch( - "airflow.providers.google.marketing_platform.operators." - "display_video.GCSHook" + "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook" ) @mock.patch( "airflow.providers.google.marketing_platform.operators." @@ -201,7 +199,7 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase): "filterType": "filter_type", "filterIds": [], "format": "format", - "fileSpec": "file_spec" + "fileSpec": "file_spec", } bucket_name = "bucket_name" object_name = "object_name" @@ -227,12 +225,11 @@ class TestGoogleDisplayVideo360DownloadLineItemsOperator(TestCase): object_name=object_name, filename=filename, gzip=gzip, - mime_type='text/csv', + mime_type="text/csv", ) gcs_hook_mock.assert_called_once_with( - gcp_conn_id=GCP_CONN_ID, - delegate_to=DELEGATE_TO, + gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO, ) hook_mock.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO @@ -252,8 +249,7 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase): "display_video.GoogleDisplayVideo360Hook" ) @mock.patch( - "airflow.providers.google.marketing_platform.operators." - "display_video.GCSHook" + "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook" ) def test_execute(self, gcs_hook_mock, hook_mock, mock_tempfile): filename = "filename" @@ -261,7 +257,9 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase): bucket_name = "bucket_name" line_items = "holy_hand_grenade" gcs_hook_mock.return_value.download.return_value = line_items - mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = filename + mock_tempfile.NamedTemporaryFile.return_value.__enter__.return_value.name = ( + filename + ) op = GoogleDisplayVideo360UploadLineItemsOperator( bucket_name=bucket_name, @@ -272,20 +270,117 @@ class TestGoogleDisplayVideo360UploadLineItemsOperator(TestCase): ) op.execute(context=None) hook_mock.assert_called_once_with( - gcp_conn_id=GCP_CONN_ID, - api_version=API_VERSION, - delegate_to=DELEGATE_TO + gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO ) gcs_hook_mock.assert_called_once_with( - gcp_conn_id=GCP_CONN_ID, - delegate_to=DELEGATE_TO, + gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO, ) gcs_hook_mock.return_value.download.assert_called_once_with( + bucket_name=bucket_name, object_name=object_name, filename=filename, + ) + hook_mock.return_value.upload_line_items.assert_called_once() + hook_mock.return_value.upload_line_items.assert_called_once_with( + line_items=line_items + ) + + +class TestGoogleDisplayVideo360SDFtoGCSOperator(TestCase): + @mock.patch( + "airflow.providers.google.marketing_platform.operators." + "display_video.GoogleDisplayVideo360Hook" + ) + @mock.patch( + "airflow.providers.google.marketing_platform.operators." "display_video.GCSHook" + ) + @mock.patch( + "airflow.providers.google.marketing_platform.operators." + "display_video.tempfile" + ) + def test_execute(self, mock_temp, gcs_mock_hook, mock_hook): + operation_name = "operation_name" + operation = {"key": "value"} + bucket_name = "bucket_name" + object_name = "object_name" + filename = "filename" + gzip = False + + # mock_hook.return_value.create_sdf_download_operation.return_value = response_name + mock_hook.return_value.get_sdf_download_operation.return_value = operation + mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename + + op = GoogleDisplayVideo360SDFtoGCSOperator( + operation_name=operation_name, + bucket_name=bucket_name, + object_name=object_name, + gzip=gzip, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + task_id="test_task", + ) + + op.execute(context=None) + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO + ) + + mock_hook.return_value.get_sdf_download_operation.assert_called_once() + mock_hook.return_value.get_sdf_download_operation.assert_called_once_with( + operation_name=operation_name + ) + + mock_hook.return_value.download_media.assert_called_once() + mock_hook.return_value.download_media.assert_called_once_with( + resource_name=mock_hook.return_value.get_sdf_download_operation.return_value + ) + + mock_hook.return_value.download_content_from_request.assert_called_once() + mock_hook.return_value.download_content_from_request.assert_called_once_with( + mock_temp.NamedTemporaryFile.return_value.__enter__.return_value, + mock_hook.return_value.download_media(), + chunk_size=1024 * 1024, + ) + + gcs_mock_hook.assert_called_once() + gcs_mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, delegate_to=DELEGATE_TO + ) + + gcs_mock_hook.return_value.upload.assert_called_once() + gcs_mock_hook.return_value.upload.assert_called_once_with( bucket_name=bucket_name, object_name=object_name, filename=filename, + gzip=gzip, + ) + + +class TestGoogleDisplayVideo360CreateSDFDownloadTaskOperator(TestCase): + @mock.patch( + "airflow.providers.google.marketing_platform.operators." + "display_video.GoogleDisplayVideo360Hook" + ) + def test_execute(self, mock_hook): + body_request = { + "version": "1", + "id": "id", + "filter": {"id": []}, + } + + op = GoogleDisplayVideo360CreateSDFDownloadTaskOperator( + body_request=body_request, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + task_id="test_task", + ) + + op.execute(context=None) + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, api_version=API_VERSION, delegate_to=DELEGATE_TO + ) + + mock_hook.return_value.create_sdf_download_operation.assert_called_once() + mock_hook.return_value.create_sdf_download_operation.assert_called_once_with( + body_request=body_request ) - hook_mock.return_value.upload_line_items.assert_called_once() - hook_mock.return_value.upload_line_items.assert_called_once_with(line_items=line_items) diff --git a/tests/providers/google/marketing_platform/operators/test_display_video_system.py b/tests/providers/google/marketing_platform/operators/test_display_video_system.py index 8cc441d..7d4358d 100644 --- a/tests/providers/google/marketing_platform/operators/test_display_video_system.py +++ b/tests/providers/google/marketing_platform/operators/test_display_video_system.py @@ -16,14 +16,16 @@ # under the License. import pytest +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from airflow.providers.google.marketing_platform.example_dags.example_display_video import BUCKET -from tests.providers.google.cloud.utils.gcp_authenticator import GMP_KEY +from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY, GMP_KEY from tests.test_utils.gcp_system_helpers import MARKETING_DAG_FOLDER, GoogleSystemTest, provide_gcp_context # Requires the following scope: SCOPES = [ "https://www.googleapis.com/auth/doubleclickbidmanager", - "https://www.googleapis.com/auth/cloud-platform" + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/display-video" ] @@ -37,7 +39,10 @@ class DisplayVideoSystemTest(GoogleSystemTest): def tearDown(self): self.delete_gcs_bucket(BUCKET) - super().tearDown() + with provide_gcp_context(GCP_BIGQUERY_KEY, scopes=SCOPES): + hook = BigQueryHook() + hook.delete_dataset(dataset_id='airflow_test', delete_contents=True) + super().tearDown() @provide_gcp_context(GMP_KEY, scopes=SCOPES) def test_run_example_dag(self): diff --git a/tests/providers/google/marketing_platform/sensors/test_display_video.py b/tests/providers/google/marketing_platform/sensors/test_display_video.py index 624b35b..2daca30 100644 --- a/tests/providers/google/marketing_platform/sensors/test_display_video.py +++ b/tests/providers/google/marketing_platform/sensors/test_display_video.py @@ -19,7 +19,7 @@ from unittest import TestCase, mock from airflow.providers.google.marketing_platform.sensors.display_video import ( - GoogleDisplayVideo360ReportSensor, + GoogleDisplayVideo360GetSDFDownloadOperationSensor, GoogleDisplayVideo360ReportSensor, ) API_VERSION = "api_version" @@ -45,3 +45,26 @@ class TestGoogleDisplayVideo360ReportSensor(TestCase): gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION ) hook_mock.return_value.get_query.assert_called_once_with(query_id=report_id) + + +class TestGoogleDisplayVideo360Sensor(TestCase): + @mock.patch( + "airflow.providers.google.marketing_platform.sensors." + "display_video.GoogleDisplayVideo360Hook" + ) + @mock.patch( + "airflow.providers.google.marketing_platform.sensors." + "display_video.BaseSensorOperator" + ) + def test_poke(self, mock_base_op, hook_mock): + operation_name = "operation_name" + op = GoogleDisplayVideo360GetSDFDownloadOperationSensor( + operation_name=operation_name, api_version=API_VERSION, task_id="test_task", + ) + op.poke(context=None) + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION + ) + hook_mock.return_value.get_sdf_download_operation.assert_called_once_with( + operation_name=operation_name + )