bugraoz93 commented on a change in pull request #19377:
URL: https://github.com/apache/airflow/pull/19377#discussion_r753332173
##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -128,11 +145,79 @@ def execute(self, context: dict):
service = FacebookAdsReportingHook(
facebook_conn_id=self.facebook_conn_id,
api_version=self.api_version
)
- rows = service.bulk_facebook_report(params=self.parameters,
fields=self.fields)
+ bulk_report = service.bulk_facebook_report(params=self.parameters,
fields=self.fields)
+ if isinstance(bulk_report, list):
+ converted_rows_with_action = self._generate_rows_with_action(False)
+ converted_rows_with_action = self._prepare_rows_for_upload(
+ rows=bulk_report,
converted_rows_with_action=converted_rows_with_action, account_id=None
+ )
+ elif isinstance(bulk_report, dict):
+ converted_rows_with_action = self._generate_rows_with_action(True)
+ for account_id in bulk_report.keys():
+ rows = bulk_report.get(account_id, [])
+ if rows:
+ converted_rows_with_action = self._prepare_rows_for_upload(
+ rows=rows,
+ converted_rows_with_action=converted_rows_with_action,
+ account_id=account_id,
+ )
+ else:
+ self.log.warning("account_id: %s returned empty report",
str(account_id))
+ else:
+ message = "Facebook Ads Hook returned different type than expected"
+ raise AirflowException(message)
+ total_row_count =
self._decide_and_flush(converted_rows_with_action=converted_rows_with_action)
+ self.log.info("Facebook Returned %s data points in total: ",
total_row_count)
+
+ def _generate_rows_with_action(self, type_check: bool):
+ if type_check and self.upload_as_account:
+ return {FlushAction.EXPORT_EVERY_ACCOUNT: []}
+ else:
+ return {FlushAction.EXPORT_ONCE: []}
+
+ def _prepare_rows_for_upload(
+ self,
+ rows: List[AdsInsights],
+ converted_rows_with_action: Dict[FlushAction, list],
+ account_id: Optional[str],
+ ):
converted_rows = [dict(row) for row in rows]
- self.log.info("Facebook Returned %s data points", len(converted_rows))
+ if account_id is not None and self.upload_as_account:
+
converted_rows_with_action[FlushAction.EXPORT_EVERY_ACCOUNT].append(
+ {"account_id": account_id, "converted_rows": converted_rows}
+ )
+ self.log.info(
+ "Facebook Returned %s data points for account_id: %s",
len(converted_rows), account_id
+ )
+ else:
+
converted_rows_with_action[FlushAction.EXPORT_ONCE].extend(converted_rows)
+ self.log.info("Facebook Returned %s data points ",
len(converted_rows))
+ return converted_rows_with_action
+ def _decide_and_flush(self, converted_rows_with_action: Dict[FlushAction,
list]):
+ total_data_count = 0
+ if FlushAction.EXPORT_ONCE in converted_rows_with_action:
+ self._flush_rows(
+
converted_rows=converted_rows_with_action.get(FlushAction.EXPORT_ONCE),
+ object_name=self.object_name,
+ )
+ total_data_count +=
len(converted_rows_with_action.get(FlushAction.EXPORT_ONCE))
+ elif FlushAction.EXPORT_EVERY_ACCOUNT in converted_rows_with_action:
+ for converted_rows in
converted_rows_with_action.get(FlushAction.EXPORT_EVERY_ACCOUNT):
+ self._flush_rows(
+ converted_rows=converted_rows.get("converted_rows"),
+ object_name=self._transform_object_name_with_account_id(
+ account_id=converted_rows.get("account_id")
+ ),
+ )
+ total_data_count += len(converted_rows.get("converted_rows"))
+ else:
+ message = "FlushAction not found in the data"
+ raise AirflowException(message)
Review comment:
I have updated the message and added the data to investigate whether it
returned the correct keys or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]