This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 55840375594 Return list of GCS URIs from 
FacebookAdsReportToGcsOperator (#61349)
55840375594 is described below

commit 55840375594c674218d50d8d12c109daa75aecb4
Author: Abhishek Mishra <[email protected]>
AuthorDate: Wed Feb 18 13:07:56 2026 +0530

    Return list of GCS URIs from FacebookAdsReportToGcsOperator (#61349)
---
 .../google/cloud/transfers/facebook_ads_to_gcs.py  | 40 +++++++++++-----
 .../cloud/transfers/test_facebook_ads_to_gcs.py    | 53 ++++++++++++++++++++--
 2 files changed, 78 insertions(+), 15 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
 
b/providers/google/src/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
index 6572371db1d..a325f7a3af5 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
@@ -122,11 +122,20 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
         self.upload_as_account = upload_as_account
         self.impersonation_chain = impersonation_chain
 
-    def execute(self, context: Context):
+    def execute(self, context: Context) -> list[str]:
+        """
+        Execute the operator.
+
+        Fetches Facebook Ads reports and uploads them to Google Cloud Storage.
+
+        :return: List of GCS URIs where the reports were uploaded.
+            This value is pushed to XCom for downstream tasks.
+        """
         service = FacebookAdsReportingHook(
             facebook_conn_id=self.facebook_conn_id, 
api_version=self.api_version
         )
         bulk_report = service.bulk_facebook_report(params=self.parameters, 
fields=self.fields)
+        files_uploaded: list[str] = []
 
         if isinstance(bulk_report, list):
             converted_rows_with_action = self._generate_rows_with_action(False)
@@ -151,8 +160,10 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
                 f"List or Dict. Actual return type of the Hook: 
{type(bulk_report)}"
             )
             raise AirflowException(message)
-        total_row_count = 
self._decide_and_flush(converted_rows_with_action=converted_rows_with_action)
-        self.log.info("Facebook Returned %s data points in total: ", 
total_row_count)
+        files_uploaded = 
self._decide_and_flush(converted_rows_with_action=converted_rows_with_action)
+        total_row_count = sum(1 for _ in files_uploaded)
+        self.log.info("Facebook Returned data points across %s files", 
total_row_count)
+        return files_uploaded
 
     def _generate_rows_with_action(self, type_check: bool):
         if type_check and self.upload_as_account:
@@ -178,35 +189,37 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
             self.log.info("Facebook Returned %s data points ", 
len(converted_rows))
         return converted_rows_with_action
 
-    def _decide_and_flush(self, converted_rows_with_action: dict[FlushAction, 
list]):
-        total_data_count = 0
+    def _decide_and_flush(self, converted_rows_with_action: dict[FlushAction, 
list]) -> list[str]:
+        files_uploaded: list[str] = []
         once_action = converted_rows_with_action.get(FlushAction.EXPORT_ONCE)
         if once_action is not None:
-            self._flush_rows(
+            gcs_uri = self._flush_rows(
                 converted_rows=once_action,
                 object_name=self.object_name,
             )
-            total_data_count += len(once_action)
+            if gcs_uri:
+                files_uploaded.append(gcs_uri)
         else:
             every_account_action = 
converted_rows_with_action.get(FlushAction.EXPORT_EVERY_ACCOUNT)
             if every_account_action:
                 for converted_rows in every_account_action:
-                    self._flush_rows(
+                    gcs_uri = self._flush_rows(
                         converted_rows=converted_rows.get("converted_rows"),
                         
object_name=self._transform_object_name_with_account_id(
                             account_id=converted_rows.get("account_id")
                         ),
                     )
-                    total_data_count += 
len(converted_rows.get("converted_rows"))
+                    if gcs_uri:
+                        files_uploaded.append(gcs_uri)
             else:
                 message = (
                     "FlushAction not found in the data. Please check the 
FlushAction in "
                     f"the operator. Converted Rows with Action: 
{converted_rows_with_action}"
                 )
                 raise AirflowException(message)
-        return total_data_count
+        return files_uploaded
 
-    def _flush_rows(self, converted_rows: list[Any] | None, object_name: str):
+    def _flush_rows(self, converted_rows: list[Any] | None, object_name: str) 
-> str | None:
         if converted_rows:
             headers = self.fields
             with tempfile.NamedTemporaryFile("w", suffix=".csv") as csvfile:
@@ -224,7 +237,10 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
                     filename=csvfile.name,
                     gzip=self.gzip,
                 )
-                self.log.info("%s uploaded to GCS", csvfile.name)
+                gcs_uri = f"gs://{self.bucket_name}/{object_name}"
+                self.log.info("%s uploaded to GCS at %s", csvfile.name, 
gcs_uri)
+                return gcs_uri
+        return None
 
     def _transform_object_name_with_account_id(self, account_id: str):
         directory_parts = self.object_name.split("/")
diff --git 
a/providers/google/tests/unit/google/cloud/transfers/test_facebook_ads_to_gcs.py
 
b/providers/google/tests/unit/google/cloud/transfers/test_facebook_ads_to_gcs.py
index a051a4a4a01..e0d52cdde12 100644
--- 
a/providers/google/tests/unit/google/cloud/transfers/test_facebook_ads_to_gcs.py
+++ 
b/providers/google/tests/unit/google/cloud/transfers/test_facebook_ads_to_gcs.py
@@ -73,7 +73,7 @@ class TestFacebookAdsReportToGcsOperator:
             task_id="run_operator",
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        op.execute({})
+        result = op.execute({})
         
mock_ads_hook.assert_called_once_with(facebook_conn_id=FACEBOOK_ADS_CONN_ID, 
api_version=None)
         
mock_ads_hook.return_value.bulk_facebook_report.assert_called_once_with(
             params=PARAMETERS, fields=FIELDS
@@ -85,6 +85,11 @@ class TestFacebookAdsReportToGcsOperator:
         mock_gcs_hook.return_value.upload.assert_called_once_with(
             bucket_name=GCS_BUCKET, object_name=GCS_OBJ_PATH, 
filename=mock.ANY, gzip=False
         )
+        # Assert return value is a list of GCS URIs
+        assert result is not None
+        assert isinstance(result, list)
+        assert len(result) == 1
+        assert result[0] == f"gs://{GCS_BUCKET}/{GCS_OBJ_PATH}"
 
     
@mock.patch("airflow.providers.google.cloud.transfers.facebook_ads_to_gcs.FacebookAdsReportingHook")
     
@mock.patch("airflow.providers.google.cloud.transfers.facebook_ads_to_gcs.GCSHook")
@@ -100,7 +105,7 @@ class TestFacebookAdsReportToGcsOperator:
             task_id="run_operator",
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        op.execute({})
+        result = op.execute({})
         
mock_ads_hook.assert_called_once_with(facebook_conn_id=FACEBOOK_ADS_CONN_ID, 
api_version=None)
         
mock_ads_hook.return_value.bulk_facebook_report.assert_called_once_with(
             params=PARAMETERS, fields=FIELDS
@@ -111,5 +116,47 @@ class TestFacebookAdsReportToGcsOperator:
         )
         mock_gcs_hook.return_value.upload.assert_has_calls(
             [mock.call(bucket_name=GCS_BUCKET, object_name=GCS_OBJ_PATH_1, 
filename=mock.ANY, gzip=False)],
-            [mock.call(bucket_name=GCS_BUCKET, object_name=GCS_OBJ_PATH_2, 
fidlename=mock.ANY, gzip=False)],
+            [mock.call(bucket_name=GCS_BUCKET, object_name=GCS_OBJ_PATH_2, 
filename=mock.ANY, gzip=False)],
         )
+        # Assert return value is a list of GCS URIs
+        assert result is not None
+        assert isinstance(result, list)
+        assert len(result) == 2
+        assert result[0] == f"gs://{GCS_BUCKET}/{GCS_OBJ_PATH_1}"
+        assert result[1] == f"gs://{GCS_BUCKET}/{GCS_OBJ_PATH_2}"
+
+    
@mock.patch("airflow.providers.google.cloud.transfers.facebook_ads_to_gcs.FacebookAdsReportingHook")
+    
@mock.patch("airflow.providers.google.cloud.transfers.facebook_ads_to_gcs.GCSHook")
+    def test_execute_with_list_response(self, mock_gcs_hook, mock_ads_hook):
+        """Test when Facebook API returns a list instead of dict."""
+        facebook_list_response = [
+            {
+                "campaign_name": "test",
+                "campaign_id": "123",
+                "ad_id": "456",
+                "clicks": "10",
+                "impressions": "100",
+            }
+        ]
+        mock_ads_hook.return_value.bulk_facebook_report.return_value = 
facebook_list_response
+        op = FacebookAdsReportToGcsOperator(
+            facebook_conn_id=FACEBOOK_ADS_CONN_ID,
+            fields=FIELDS,
+            parameters=PARAMETERS,
+            object_name=GCS_OBJ_PATH,
+            bucket_name=GCS_BUCKET,
+            task_id="run_operator",
+        )
+        result = op.execute({})
+        
mock_ads_hook.assert_called_once_with(facebook_conn_id=FACEBOOK_ADS_CONN_ID, 
api_version=None)
+        
mock_ads_hook.return_value.bulk_facebook_report.assert_called_once_with(
+            params=PARAMETERS, fields=FIELDS
+        )
+        mock_gcs_hook.return_value.upload.assert_called_once_with(
+            bucket_name=GCS_BUCKET, object_name=GCS_OBJ_PATH, 
filename=mock.ANY, gzip=False
+        )
+        # Assert return value is a list of GCS URIs
+        assert result is not None
+        assert isinstance(result, list)
+        assert len(result) == 1
+        assert result[0] == f"gs://{GCS_BUCKET}/{GCS_OBJ_PATH}"

Reply via email to