This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 55840375594 Return list of GCS URIs from
FacebookAdsReportToGcsOperator (#61349)
55840375594 is described below
commit 55840375594c674218d50d8d12c109daa75aecb4
Author: Abhishek Mishra <[email protected]>
AuthorDate: Wed Feb 18 13:07:56 2026 +0530
Return list of GCS URIs from FacebookAdsReportToGcsOperator (#61349)
---
.../google/cloud/transfers/facebook_ads_to_gcs.py | 40 +++++++++++-----
.../cloud/transfers/test_facebook_ads_to_gcs.py | 53 ++++++++++++++++++++--
2 files changed, 78 insertions(+), 15 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
b/providers/google/src/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
index 6572371db1d..a325f7a3af5 100644
---
a/providers/google/src/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
+++
b/providers/google/src/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
@@ -122,11 +122,20 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
self.upload_as_account = upload_as_account
self.impersonation_chain = impersonation_chain
- def execute(self, context: Context):
+ def execute(self, context: Context) -> list[str]:
+ """
+ Execute the operator.
+
+ Fetches Facebook Ads reports and uploads them to Google Cloud Storage.
+
+ :return: List of GCS URIs where the reports were uploaded.
+ This value is pushed to XCom for downstream tasks.
+ """
service = FacebookAdsReportingHook(
facebook_conn_id=self.facebook_conn_id,
api_version=self.api_version
)
bulk_report = service.bulk_facebook_report(params=self.parameters,
fields=self.fields)
+ files_uploaded: list[str] = []
if isinstance(bulk_report, list):
converted_rows_with_action = self._generate_rows_with_action(False)
@@ -151,8 +160,10 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
f"List or Dict. Actual return type of the Hook:
{type(bulk_report)}"
)
raise AirflowException(message)
- total_row_count =
self._decide_and_flush(converted_rows_with_action=converted_rows_with_action)
- self.log.info("Facebook Returned %s data points in total: ",
total_row_count)
+ files_uploaded =
self._decide_and_flush(converted_rows_with_action=converted_rows_with_action)
+ total_row_count = sum(1 for _ in files_uploaded)
+ self.log.info("Facebook Returned data points across %s files",
total_row_count)
+ return files_uploaded
def _generate_rows_with_action(self, type_check: bool):
if type_check and self.upload_as_account:
@@ -178,35 +189,37 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
self.log.info("Facebook Returned %s data points ",
len(converted_rows))
return converted_rows_with_action
- def _decide_and_flush(self, converted_rows_with_action: dict[FlushAction,
list]):
- total_data_count = 0
+ def _decide_and_flush(self, converted_rows_with_action: dict[FlushAction,
list]) -> list[str]:
+ files_uploaded: list[str] = []
once_action = converted_rows_with_action.get(FlushAction.EXPORT_ONCE)
if once_action is not None:
- self._flush_rows(
+ gcs_uri = self._flush_rows(
converted_rows=once_action,
object_name=self.object_name,
)
- total_data_count += len(once_action)
+ if gcs_uri:
+ files_uploaded.append(gcs_uri)
else:
every_account_action =
converted_rows_with_action.get(FlushAction.EXPORT_EVERY_ACCOUNT)
if every_account_action:
for converted_rows in every_account_action:
- self._flush_rows(
+ gcs_uri = self._flush_rows(
converted_rows=converted_rows.get("converted_rows"),
object_name=self._transform_object_name_with_account_id(
account_id=converted_rows.get("account_id")
),
)
- total_data_count +=
len(converted_rows.get("converted_rows"))
+ if gcs_uri:
+ files_uploaded.append(gcs_uri)
else:
message = (
"FlushAction not found in the data. Please check the
FlushAction in "
f"the operator. Converted Rows with Action:
{converted_rows_with_action}"
)
raise AirflowException(message)
- return total_data_count
+ return files_uploaded
- def _flush_rows(self, converted_rows: list[Any] | None, object_name: str):
+ def _flush_rows(self, converted_rows: list[Any] | None, object_name: str)
-> str | None:
if converted_rows:
headers = self.fields
with tempfile.NamedTemporaryFile("w", suffix=".csv") as csvfile:
@@ -224,7 +237,10 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
filename=csvfile.name,
gzip=self.gzip,
)
- self.log.info("%s uploaded to GCS", csvfile.name)
+ gcs_uri = f"gs://{self.bucket_name}/{object_name}"
+ self.log.info("%s uploaded to GCS at %s", csvfile.name,
gcs_uri)
+ return gcs_uri
+ return None
def _transform_object_name_with_account_id(self, account_id: str):
directory_parts = self.object_name.split("/")
diff --git
a/providers/google/tests/unit/google/cloud/transfers/test_facebook_ads_to_gcs.py
b/providers/google/tests/unit/google/cloud/transfers/test_facebook_ads_to_gcs.py
index a051a4a4a01..e0d52cdde12 100644
---
a/providers/google/tests/unit/google/cloud/transfers/test_facebook_ads_to_gcs.py
+++
b/providers/google/tests/unit/google/cloud/transfers/test_facebook_ads_to_gcs.py
@@ -73,7 +73,7 @@ class TestFacebookAdsReportToGcsOperator:
task_id="run_operator",
impersonation_chain=IMPERSONATION_CHAIN,
)
- op.execute({})
+ result = op.execute({})
mock_ads_hook.assert_called_once_with(facebook_conn_id=FACEBOOK_ADS_CONN_ID,
api_version=None)
mock_ads_hook.return_value.bulk_facebook_report.assert_called_once_with(
params=PARAMETERS, fields=FIELDS
@@ -85,6 +85,11 @@ class TestFacebookAdsReportToGcsOperator:
mock_gcs_hook.return_value.upload.assert_called_once_with(
bucket_name=GCS_BUCKET, object_name=GCS_OBJ_PATH,
filename=mock.ANY, gzip=False
)
+ # Assert return value is a list of GCS URIs
+ assert result is not None
+ assert isinstance(result, list)
+ assert len(result) == 1
+ assert result[0] == f"gs://{GCS_BUCKET}/{GCS_OBJ_PATH}"
@mock.patch("airflow.providers.google.cloud.transfers.facebook_ads_to_gcs.FacebookAdsReportingHook")
@mock.patch("airflow.providers.google.cloud.transfers.facebook_ads_to_gcs.GCSHook")
@@ -100,7 +105,7 @@ class TestFacebookAdsReportToGcsOperator:
task_id="run_operator",
impersonation_chain=IMPERSONATION_CHAIN,
)
- op.execute({})
+ result = op.execute({})
mock_ads_hook.assert_called_once_with(facebook_conn_id=FACEBOOK_ADS_CONN_ID,
api_version=None)
mock_ads_hook.return_value.bulk_facebook_report.assert_called_once_with(
params=PARAMETERS, fields=FIELDS
@@ -111,5 +116,47 @@ class TestFacebookAdsReportToGcsOperator:
)
mock_gcs_hook.return_value.upload.assert_has_calls(
[mock.call(bucket_name=GCS_BUCKET, object_name=GCS_OBJ_PATH_1,
filename=mock.ANY, gzip=False)],
- [mock.call(bucket_name=GCS_BUCKET, object_name=GCS_OBJ_PATH_2,
fidlename=mock.ANY, gzip=False)],
+ [mock.call(bucket_name=GCS_BUCKET, object_name=GCS_OBJ_PATH_2,
filename=mock.ANY, gzip=False)],
)
+ # Assert return value is a list of GCS URIs
+ assert result is not None
+ assert isinstance(result, list)
+ assert len(result) == 2
+ assert result[0] == f"gs://{GCS_BUCKET}/{GCS_OBJ_PATH_1}"
+ assert result[1] == f"gs://{GCS_BUCKET}/{GCS_OBJ_PATH_2}"
+
+
@mock.patch("airflow.providers.google.cloud.transfers.facebook_ads_to_gcs.FacebookAdsReportingHook")
+
@mock.patch("airflow.providers.google.cloud.transfers.facebook_ads_to_gcs.GCSHook")
+ def test_execute_with_list_response(self, mock_gcs_hook, mock_ads_hook):
+ """Test when Facebook API returns a list instead of dict."""
+ facebook_list_response = [
+ {
+ "campaign_name": "test",
+ "campaign_id": "123",
+ "ad_id": "456",
+ "clicks": "10",
+ "impressions": "100",
+ }
+ ]
+ mock_ads_hook.return_value.bulk_facebook_report.return_value =
facebook_list_response
+ op = FacebookAdsReportToGcsOperator(
+ facebook_conn_id=FACEBOOK_ADS_CONN_ID,
+ fields=FIELDS,
+ parameters=PARAMETERS,
+ object_name=GCS_OBJ_PATH,
+ bucket_name=GCS_BUCKET,
+ task_id="run_operator",
+ )
+ result = op.execute({})
+
mock_ads_hook.assert_called_once_with(facebook_conn_id=FACEBOOK_ADS_CONN_ID,
api_version=None)
+
mock_ads_hook.return_value.bulk_facebook_report.assert_called_once_with(
+ params=PARAMETERS, fields=FIELDS
+ )
+ mock_gcs_hook.return_value.upload.assert_called_once_with(
+ bucket_name=GCS_BUCKET, object_name=GCS_OBJ_PATH,
filename=mock.ANY, gzip=False
+ )
+ # Assert return value is a list of GCS URIs
+ assert result is not None
+ assert isinstance(result, list)
+ assert len(result) == 1
+ assert result[0] == f"gs://{GCS_BUCKET}/{GCS_OBJ_PATH}"