This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 447b221390 Refactor system test for Campaign Manager 360 (#42346)
447b221390 is described below
commit 447b221390547364c380d9dfa53ca7135a05f2b1
Author: max <[email protected]>
AuthorDate: Thu Sep 19 12:17:01 2024 +0000
Refactor system test for Campaign Manager 360 (#42346)
---
.../marketing_platform/example_campaign_manager.py | 155 ++++++++++++++++++---
1 file changed, 135 insertions(+), 20 deletions(-)
diff --git
a/tests/system/providers/google/marketing_platform/example_campaign_manager.py
b/tests/system/providers/google/marketing_platform/example_campaign_manager.py
index bace148179..f779458e78 100644
---
a/tests/system/providers/google/marketing_platform/example_campaign_manager.py
+++
b/tests/system/providers/google/marketing_platform/example_campaign_manager.py
@@ -17,18 +17,30 @@
# under the License.
"""
Example Airflow DAG that shows how to use CampaignManager.
+
+This system test relies on a service account with proper settings in Campaign
Manager 360.
+That's why before running this system test locally, make sure your service
account corresponds all the
+secrets that the DAG reads. If your service account doesn't have access but
you know another one which has
+then simply specify it in the environment variable CM360_IMPERSONATION_CHAIN.
"""
from __future__ import annotations
+import json
+import logging
import os
import time
import uuid
from datetime import datetime
from typing import cast
+from google.api_core.exceptions import NotFound
+
+from airflow.decorators import task
+from airflow.models import Connection
from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
+from airflow.providers.google.cloud.hooks.secret_manager import
GoogleCloudSecretManagerHook
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.marketing_platform.operators.campaign_manager
import (
GoogleCampaignManagerBatchInsertConversionsOperator,
@@ -41,24 +53,36 @@ from
airflow.providers.google.marketing_platform.operators.campaign_manager impo
from airflow.providers.google.marketing_platform.sensors.campaign_manager
import (
GoogleCampaignManagerReportSensor,
)
+from airflow.settings import Session
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+CM360_IMPERSONATION_CHAIN = os.environ.get("IMPERSONATION_CHAIN", None)
+
+DAG_ID = "campaign_manager"
+
+SECRET_ACCOUNT_ID = "cm360_account_id"
+SECRET_DCLID = "cm360_dclid"
+SECRET_ENCRYPTION_ENTITY_ID = "cm360_encryption_entity_id"
+SECRET_FLOODLIGHT_ACTIVITY_ID = "cm360_floodlight_activity_id"
+SECRET_FLOODLIGHT_CONFIGURATION_ID = "cm360_floodlight_configuration_id"
+SECRET_USER_PROFILE_ID = "cm360_user_profile_id"
-DAG_ID = "example_campaign_manager"
+ACCOUNT_ID = "{{ task_instance.xcom_pull('get_account_id') }}"
+DCLID = "{{ task_instance.xcom_pull('get_dclid') }}"
+ENCRYPTION_ENTITY_ID = "{{ task_instance.xcom_pull('get_encryption_entity_id')
}}"
+FLOODLIGHT_ACTIVITY_ID = "{{
task_instance.xcom_pull('get_floodlight_activity_id') }}"
+FLOODLIGHT_CONFIGURATION_ID = "{{
task_instance.xcom_pull('get_floodlight_configuration_id') }}"
+USER_PROFILE_ID = "{{ task_instance.xcom_pull('get_user_profile_id') }}"
-PROFILE_ID = os.environ.get("MARKETING_PROFILE_ID", "123456789")
-FLOODLIGHT_ACTIVITY_ID = int(os.environ.get("FLOODLIGHT_ACTIVITY_ID", 12345))
-FLOODLIGHT_CONFIGURATION_ID =
int(os.environ.get("FLOODLIGHT_CONFIGURATION_ID", 12345))
-ENCRYPTION_ENTITY_ID = int(os.environ.get("ENCRYPTION_ENTITY_ID", 12345))
-DEVICE_ID = os.environ.get("DEVICE_ID", "12345")
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
REPORT_NAME = f"report_{DAG_ID}_{ENV_ID}"
FILE_NAME = f"file_{DAG_ID}_{ENV_ID}"
-ACCOUNT_ID = f"account_{DAG_ID}_{ENV_ID}"
FORMAT = "CSV"
+CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
+
# For more information, please check
# https://developers.google.com/doubleclick-advertisers/rest/v4/reports#type
@@ -85,7 +109,7 @@ CONVERSION = {
"kind": "dfareporting#conversion",
"floodlightActivityId": FLOODLIGHT_ACTIVITY_ID,
"floodlightConfigurationId": FLOODLIGHT_CONFIGURATION_ID,
- "mobileDeviceId": DEVICE_ID,
+ "dclid": DCLID,
"ordinal": "0",
"quantity": 42,
"value": 123.4,
@@ -103,12 +127,23 @@ CONVERSION_UPDATE = {
"kind": "dfareporting#conversion",
"floodlightActivityId": FLOODLIGHT_ACTIVITY_ID,
"floodlightConfigurationId": FLOODLIGHT_CONFIGURATION_ID,
- "mobileDeviceId": DEVICE_ID,
+ "dclid": DCLID,
"ordinal": "0",
"quantity": 42,
"value": 123.4,
}
+
+log = logging.getLogger(__name__)
+
+
+def get_secret(secret_id: str) -> str:
+ hook = GoogleCloudSecretManagerHook()
+ if hook.secret_exists(secret_id=secret_id):
+ return
hook.access_secret(secret_id=secret_id).payload.data.decode().strip()
+ raise NotFound("The secret '%s' not found", secret_id)
+
+
with DAG(
DAG_ID,
schedule="@once", # Override to match your needs,
@@ -116,20 +151,77 @@ with DAG(
catchup=False,
tags=["example", "campaign"],
) as dag:
+
+ @task
+ def create_connection(connection_id: str) -> None:
+ connection = Connection(
+ conn_id=connection_id,
+ conn_type="google_cloud_platform",
+ )
+ extras = {
+ "scope": "https://www.googleapis.com/auth/cloud-platform,"
+ "https://www.googleapis.com/auth/ddmconversions,"
+ "https://www.googleapis.com/auth/dfareporting",
+ }
+ if CM360_IMPERSONATION_CHAIN:
+ extras["impersonation_chain"] = CM360_IMPERSONATION_CHAIN
+
+ conn_extra_json = json.dumps(extras)
+ connection.set_extra(conn_extra_json)
+
+ session = Session()
+ log.info("Removing connection %s if it exists", connection_id)
+ query = session.query(Connection).filter(Connection.conn_id ==
connection_id)
+ query.delete()
+
+ session.add(connection)
+ session.commit()
+ log.info("Connection %s created", CONNECTION_ID)
+
+ @task
+ def get_account_id():
+ return get_secret(secret_id=SECRET_ACCOUNT_ID)
+
+ @task
+ def get_dclid():
+ return get_secret(secret_id=SECRET_DCLID)
+
+ @task
+ def get_encryption_entity_id():
+ return get_secret(secret_id=SECRET_ENCRYPTION_ENTITY_ID)
+
+ @task
+ def get_floodlight_activity_id():
+ return get_secret(secret_id=SECRET_FLOODLIGHT_ACTIVITY_ID)
+
+ @task
+ def get_floodlight_configuration_id():
+ return get_secret(secret_id=SECRET_FLOODLIGHT_CONFIGURATION_ID)
+
+ @task
+ def get_user_profile_id():
+ return get_secret(secret_id=SECRET_USER_PROFILE_ID)
+
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)
# [START howto_campaign_manager_insert_report_operator]
create_report = GoogleCampaignManagerInsertReportOperator(
- profile_id=PROFILE_ID, report=REPORT, task_id="create_report"
+ profile_id=USER_PROFILE_ID,
+ report=REPORT,
+ task_id="create_report",
+ gcp_conn_id=CONNECTION_ID,
)
report_id = cast(str, XComArg(create_report, key="report_id"))
# [END howto_campaign_manager_insert_report_operator]
# [START howto_campaign_manager_run_report_operator]
run_report = GoogleCampaignManagerRunReportOperator(
- profile_id=PROFILE_ID, report_id=report_id, task_id="run_report"
+ profile_id=USER_PROFILE_ID,
+ report_id=report_id,
+ task_id="run_report",
+ gcp_conn_id=CONNECTION_ID,
)
file_id = cast(str, XComArg(run_report, key="file_id"))
# [END howto_campaign_manager_run_report_operator]
@@ -137,9 +229,10 @@ with DAG(
# [START howto_campaign_manager_wait_for_operation]
wait_for_report = GoogleCampaignManagerReportSensor(
task_id="wait_for_report",
- profile_id=PROFILE_ID,
+ profile_id=USER_PROFILE_ID,
report_id=report_id,
file_id=file_id,
+ gcp_conn_id=CONNECTION_ID,
)
# [END howto_campaign_manager_wait_for_operation]
@@ -147,20 +240,22 @@ with DAG(
report_name = f"reports/report_{str(uuid.uuid1())}"
get_report = GoogleCampaignManagerDownloadReportOperator(
task_id="get_report",
- profile_id=PROFILE_ID,
+ profile_id=USER_PROFILE_ID,
report_id=report_id,
file_id=file_id,
report_name=report_name,
bucket_name=BUCKET_NAME,
+ gcp_conn_id=CONNECTION_ID,
)
# [END howto_campaign_manager_get_report_operator]
# [START howto_campaign_manager_delete_report_operator]
delete_report = GoogleCampaignManagerDeleteReportOperator(
- profile_id=PROFILE_ID,
+ profile_id=USER_PROFILE_ID,
report_name=REPORT_NAME,
task_id="delete_report",
trigger_rule=TriggerRule.ALL_DONE,
+ gcp_conn_id=CONNECTION_ID,
)
# [END howto_campaign_manager_delete_report_operator]
@@ -171,31 +266,50 @@ with DAG(
# [START howto_campaign_manager_insert_conversions]
insert_conversion = GoogleCampaignManagerBatchInsertConversionsOperator(
task_id="insert_conversion",
- profile_id=PROFILE_ID,
+ profile_id=USER_PROFILE_ID,
conversions=[CONVERSION],
encryption_source="AD_SERVING",
encryption_entity_type="DCM_ADVERTISER",
- encryption_entity_id=ENCRYPTION_ENTITY_ID,
+ encryption_entity_id=ENCRYPTION_ENTITY_ID, # type: ignore[arg-type]
+ gcp_conn_id=CONNECTION_ID,
)
# [END howto_campaign_manager_insert_conversions]
# [START howto_campaign_manager_update_conversions]
update_conversion = GoogleCampaignManagerBatchUpdateConversionsOperator(
task_id="update_conversion",
- profile_id=PROFILE_ID,
+ profile_id=USER_PROFILE_ID,
conversions=[CONVERSION_UPDATE],
encryption_source="AD_SERVING",
encryption_entity_type="DCM_ADVERTISER",
- encryption_entity_id=ENCRYPTION_ENTITY_ID,
+ encryption_entity_id=ENCRYPTION_ENTITY_ID, # type: ignore[arg-type]
max_failed_updates=1,
+ gcp_conn_id=CONNECTION_ID,
)
# [END howto_campaign_manager_update_conversions]
+ @task(task_id="delete_connection")
+ def delete_connection(connection_id: str) -> None:
+ session = Session()
+ log.info("Removing connection %s", connection_id)
+ query = session.query(Connection).filter(Connection.conn_id ==
connection_id)
+ query.delete()
+ session.commit()
+
(
# TEST SETUP
- create_bucket
- >> create_report
+ create_connection(connection_id=CONNECTION_ID)
+ >> [
+ get_account_id(),
+ get_dclid(),
+ get_encryption_entity_id(),
+ get_floodlight_activity_id(),
+ get_floodlight_configuration_id(),
+ get_user_profile_id(),
+ ]
+ >> create_bucket
# TEST BODY
+ >> create_report
>> run_report
>> wait_for_report
>> get_report
@@ -204,6 +318,7 @@ with DAG(
# TEST TEARDOWN
>> delete_report
>> delete_bucket
+ >> delete_connection(connection_id=CONNECTION_ID)
)
from tests.system.utils.watcher import watcher