uranusjr commented on a change in pull request #19377:
URL: https://github.com/apache/airflow/pull/19377#discussion_r749935017
##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -128,11 +145,79 @@ def execute(self, context: dict):
service = FacebookAdsReportingHook(
facebook_conn_id=self.facebook_conn_id,
api_version=self.api_version
)
- rows = service.bulk_facebook_report(params=self.parameters,
fields=self.fields)
+ bulk_report = service.bulk_facebook_report(params=self.parameters,
fields=self.fields)
+ if isinstance(bulk_report, list):
+ converted_rows_with_action = self._generate_rows_with_action(False)
+ converted_rows_with_action = self._prepare_rows_for_upload(
+ rows=bulk_report,
converted_rows_with_action=converted_rows_with_action, account_id=None
+ )
+ elif isinstance(bulk_report, dict):
+ converted_rows_with_action = self._generate_rows_with_action(True)
+ for account_id in bulk_report.keys():
+ rows = bulk_report.get(account_id, [])
+ if rows:
+ converted_rows_with_action = self._prepare_rows_for_upload(
+ rows=rows,
+ converted_rows_with_action=converted_rows_with_action,
+ account_id=account_id,
+ )
+ else:
+ self.log.warning("account_id: %s returned empty report",
str(account_id))
+ else:
+ message = "Facebook Ads Hook returned different type than expected"
+ raise AirflowException(message)
Review comment:
Probably a good idea to log what exactly was returned to help debugging.
##########
File path: airflow/providers/google/cloud/transfers/facebook_ads_to_gcs.py
##########
@@ -128,11 +145,79 @@ def execute(self, context: dict):
service = FacebookAdsReportingHook(
facebook_conn_id=self.facebook_conn_id,
api_version=self.api_version
)
- rows = service.bulk_facebook_report(params=self.parameters,
fields=self.fields)
+ bulk_report = service.bulk_facebook_report(params=self.parameters,
fields=self.fields)
+ if isinstance(bulk_report, list):
+ converted_rows_with_action = self._generate_rows_with_action(False)
+ converted_rows_with_action = self._prepare_rows_for_upload(
+ rows=bulk_report,
converted_rows_with_action=converted_rows_with_action, account_id=None
+ )
+ elif isinstance(bulk_report, dict):
+ converted_rows_with_action = self._generate_rows_with_action(True)
+ for account_id in bulk_report.keys():
+ rows = bulk_report.get(account_id, [])
+ if rows:
+ converted_rows_with_action = self._prepare_rows_for_upload(
+ rows=rows,
+ converted_rows_with_action=converted_rows_with_action,
+ account_id=account_id,
+ )
+ else:
+ self.log.warning("account_id: %s returned empty report",
str(account_id))
+ else:
+ message = "Facebook Ads Hook returned different type than expected"
+ raise AirflowException(message)
+ total_row_count =
self._decide_and_flush(converted_rows_with_action=converted_rows_with_action)
+ self.log.info("Facebook Returned %s data points in total: ",
total_row_count)
+
+ def _generate_rows_with_action(self, type_check: bool):
+ if type_check and self.upload_as_account:
+ return {FlushAction.EXPORT_EVERY_ACCOUNT: []}
+ else:
+ return {FlushAction.EXPORT_ONCE: []}
+
+ def _prepare_rows_for_upload(
+ self,
+ rows: List[AdsInsights],
+ converted_rows_with_action: Dict[FlushAction, list],
+ account_id: Optional[str],
+ ):
converted_rows = [dict(row) for row in rows]
- self.log.info("Facebook Returned %s data points", len(converted_rows))
+ if account_id is not None and self.upload_as_account:
+
converted_rows_with_action[FlushAction.EXPORT_EVERY_ACCOUNT].append(
+ {"account_id": account_id, "converted_rows": converted_rows}
+ )
+ self.log.info(
+ "Facebook Returned %s data points for account_id: %s",
len(converted_rows), account_id
+ )
+ else:
+
converted_rows_with_action[FlushAction.EXPORT_ONCE].extend(converted_rows)
+ self.log.info("Facebook Returned %s data points ",
len(converted_rows))
+ return converted_rows_with_action
+ def _decide_and_flush(self, converted_rows_with_action: Dict[FlushAction,
list]):
+ total_data_count = 0
+ if FlushAction.EXPORT_ONCE in converted_rows_with_action:
+ self._flush_rows(
+
converted_rows=converted_rows_with_action.get(FlushAction.EXPORT_ONCE),
+ object_name=self.object_name,
+ )
+ total_data_count +=
len(converted_rows_with_action.get(FlushAction.EXPORT_ONCE))
+ elif FlushAction.EXPORT_EVERY_ACCOUNT in converted_rows_with_action:
+ for converted_rows in
converted_rows_with_action.get(FlushAction.EXPORT_EVERY_ACCOUNT):
+ self._flush_rows(
+ converted_rows=converted_rows.get("converted_rows"),
+ object_name=self._transform_object_name_with_account_id(
+ account_id=converted_rows.get("account_id")
+ ),
+ )
+ total_data_count += len(converted_rows.get("converted_rows"))
+ else:
+ message = "FlushAction not found in the data"
+ raise AirflowException(message)
Review comment:
Same
##########
File path: airflow/providers/facebook/ads/hooks/ads.py
##########
@@ -118,11 +125,32 @@ def bulk_facebook_report(
:param sleep_time: Time to sleep when async call is happening
:type sleep_time: int
- :return: Facebook Ads API response, converted to Facebook Ads Row
objects
- :rtype: List[AdsInsights]
+ :return: Facebook Ads API response,
+ converted to Facebook Ads Row objects regarding given Account ID
type
Review comment:
Probably want to describe a bit more what account ID type triggers what
kinds of results. It may be a good idea to provide a couple of examples above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]