bugraoz93 commented on a change in pull request #19377:
URL: https://github.com/apache/airflow/pull/19377#discussion_r742028280
##########
File path: airflow/providers/facebook/ads/hooks/ads.py
##########
@@ -119,25 +120,30 @@ def bulk_facebook_report(
:type sleep_time: int
:return: Facebook Ads API response, converted to Facebook Ads Row
objects
- :rtype: List[AdsInsights]
+ :rtype: Dict[str, List[AdsInsights]]
"""
+ all_insights = {}
api = self._get_service()
- ad_account = AdAccount(api.get_default_account_id(), api=api)
- _async = ad_account.get_insights(params=params, fields=fields,
is_async=True)
- while True:
- request = _async.api_get()
- async_status = request[AdReportRun.Field.async_status]
- percent = request[AdReportRun.Field.async_percent_completion]
- self.log.info("%s %s completed, async_status: %s", percent, "%",
async_status)
- if async_status == JobStatus.COMPLETED.value:
- self.log.info("Job run completed")
- break
- if async_status in [JobStatus.SKIPPED.value,
JobStatus.FAILED.value]:
- message = f"{async_status}. Please retry."
- raise AirflowException(message)
- time.sleep(sleep_time)
- report_run_id = _async.api_get()["report_run_id"]
- report_object = AdReportRun(report_run_id, api=api)
- insights = report_object.get_insights()
- self.log.info("Extracting data from returned Facebook Ads Iterators")
- return list(insights)
+ for account_id in self.facebook_ads_config["account_id"]:
+ ad_account = AdAccount(account_id, api=api)
+ _async = ad_account.get_insights(params=params, fields=fields,
is_async=True)
+ while True:
+ request = _async.api_get()
+ async_status = request[AdReportRun.Field.async_status]
+ percent = request[AdReportRun.Field.async_percent_completion]
+ self.log.info("%s %s completed, async_status: %s", percent,
"%", async_status)
+ if async_status == JobStatus.COMPLETED.value:
+ self.log.info("Job run completed")
+ break
+ if async_status in [JobStatus.SKIPPED.value,
JobStatus.FAILED.value]:
+ message = f"{async_status}. Please retry."
+ raise AirflowException(message)
+ time.sleep(sleep_time)
+ report_run_id = _async.api_get()["report_run_id"]
+ report_object = AdReportRun(report_run_id, api=api)
+ self.log.info("Extracting data from returned Facebook Ads
Iterators")
+ all_insights[account_id] = list(report_object.get_insights())
+ self.log.info(
+ str(account_id) + " Account Id used to extract data from
Facebook Ads Iterators successfully"
+ )
Review comment:
Hey @mik-laj, I made the changes. I think I was too focused on the
feature itself, I missed this one. Thank you very much.
##########
File path: airflow/providers/facebook/ads/hooks/ads.py
##########
@@ -119,25 +120,30 @@ def bulk_facebook_report(
:type sleep_time: int
:return: Facebook Ads API response, converted to Facebook Ads Row
objects
- :rtype: List[AdsInsights]
+ :rtype: Dict[str, List[AdsInsights]]
Review comment:
Hey @mik-laj, I did this due to we are getting the account_id from the
connection extras. We can get the connection from a hook and cannot in operator
as I understand from my tracing the code. On the other hand, I think that if we
are supporting multiple account_id at once, we need to separate those files
while we are exporting to provide distinct files (optional parameter ->
upload_as_account). To do this, I need to pass the account_id or correlate it
with the list of Ad Insight data.
I read from Facebook documentation, the Ad Insight data hold the account_id
information, we can get from there but if we get account_id from the data most
probably the data transformation part in the operator will be more complex.
This would cause consuming more required resources than now.
##########
File path: airflow/providers/facebook/ads/hooks/ads.py
##########
@@ -119,25 +120,30 @@ def bulk_facebook_report(
:type sleep_time: int
:return: Facebook Ads API response, converted to Facebook Ads Row
objects
- :rtype: List[AdsInsights]
+ :rtype: Dict[str, List[AdsInsights]]
"""
+ all_insights = {}
api = self._get_service()
- ad_account = AdAccount(api.get_default_account_id(), api=api)
- _async = ad_account.get_insights(params=params, fields=fields,
is_async=True)
- while True:
- request = _async.api_get()
- async_status = request[AdReportRun.Field.async_status]
- percent = request[AdReportRun.Field.async_percent_completion]
- self.log.info("%s %s completed, async_status: %s", percent, "%",
async_status)
- if async_status == JobStatus.COMPLETED.value:
- self.log.info("Job run completed")
- break
- if async_status in [JobStatus.SKIPPED.value,
JobStatus.FAILED.value]:
- message = f"{async_status}. Please retry."
- raise AirflowException(message)
- time.sleep(sleep_time)
- report_run_id = _async.api_get()["report_run_id"]
- report_object = AdReportRun(report_run_id, api=api)
- insights = report_object.get_insights()
- self.log.info("Extracting data from returned Facebook Ads Iterators")
- return list(insights)
+ for account_id in self.facebook_ads_config["account_id"]:
+ ad_account = AdAccount(account_id, api=api)
+ _async = ad_account.get_insights(params=params, fields=fields,
is_async=True)
+ while True:
+ request = _async.api_get()
+ async_status = request[AdReportRun.Field.async_status]
+ percent = request[AdReportRun.Field.async_percent_completion]
+ self.log.info("%s %s completed, async_status: %s", percent,
"%", async_status)
+ if async_status == JobStatus.COMPLETED.value:
+ self.log.info("Job run completed")
+ break
+ if async_status in [JobStatus.SKIPPED.value,
JobStatus.FAILED.value]:
+ message = f"{async_status}. Please retry."
+ raise AirflowException(message)
+ time.sleep(sleep_time)
+ report_run_id = _async.api_get()["report_run_id"]
+ report_object = AdReportRun(report_run_id, api=api)
+ self.log.info("Extracting data from returned Facebook Ads
Iterators")
+ all_insights[account_id] = list(report_object.get_insights())
+ self.log.info(
+ str(account_id) + " Account Id used to extract data from
Facebook Ads Iterators successfully"
+ )
Review comment:
Hey @mik-laj, I made the change. I think I was too focused on the
feature itself, I missed this one. Thank you very much.
##########
File path: airflow/providers/facebook/ads/hooks/ads.py
##########
@@ -119,25 +120,30 @@ def bulk_facebook_report(
:type sleep_time: int
:return: Facebook Ads API response, converted to Facebook Ads Row
objects
- :rtype: List[AdsInsights]
+ :rtype: Dict[str, List[AdsInsights]]
Review comment:
Hey @mik-laj, I did this due to we are getting the account_id from the
connection extras. We can get the connection from a hook and cannot in operator
as I understand from my tracing the code. On the other hand, I think that if we
are supporting multiple account_id at once, we need to separate those files
while we are exporting to provide distinct files (optional parameter ->
upload_as_account). To do this, I need to pass the account_id or correlate it
with the list of Ad Insight data.
I read from Facebook documentation, the Ad Insight data hold the account_id
information, we can get from there but if we get account_id from the data most
probably the data transformation part in the operator will be more complex.
This would cause consuming more resources than now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]