Ohashiro commented on issue #44618:
URL: https://github.com/apache/airflow/issues/44618#issuecomment-2575249301

   Hi @dabla 
   I quickly tested what we said, so that the triggers return to 
`execute_complete` method, which itself checks the message and if it detects 
that there was an issue fetching the refresh Id, it redirects to a 
`retry_execution` method that counts the retries and re-executes the `execute` 
method. Of course the code is not very clean, but here is an overview:
   
   ```python
   def execute(self, context: Context):
       """Refresh the Power BI Dataset."""
       if self.wait_for_termination:
           self.defer(
               trigger=PowerBITrigger(
                   conn_id=self.conn_id,
                   group_id=self.group_id,
                   dataset_id=self.dataset_id,
                   timeout=self.timeout,
                   proxies=self.proxies,
                   api_version=self.api_version,
                   check_interval=self.check_interval,
                   wait_for_termination=self.wait_for_termination,
               ),
               method_name=self.get_refresh_status.__name__,
           )
   
   def get_refresh_status(self, context: Context, event: dict[str, str] | None 
= None):
       """Push the refresh Id to XCom then runs the Triggers to wait for 
refresh completion."""
   
       if event:
           if event["status"] == "error" and "Unable to fetch the details of 
dataset refresh with Request Id" not in event["message"] and "not found" not in 
event["message"]:
               raise AirflowException(event["message"])
   
           self.xcom_push(context=context, key="powerbi_dataset_refresh_Id", 
value=event["dataset_refresh_id"])
   
       dataset_refresh_id = self.xcom_pull(context=context, 
key="powerbi_dataset_refresh_Id")
       if dataset_refresh_id:
           self.defer(
               trigger=PowerBITrigger(
                   conn_id=self.conn_id,
                   group_id=self.group_id,
                   dataset_id=self.dataset_id,
                   dataset_refresh_id=dataset_refresh_id,
                   timeout=self.timeout,
                   proxies=self.proxies,
                   api_version=self.api_version,
                   check_interval=self.check_interval,
                   wait_for_termination=self.execute_complete,
               ),
               method_name=self.execute_complete.__name__,
           )
   
   def retry_execution(self, context: Context):
       retries = self.xcom_pull(context=context, key="retries")
       if retries and retries >= self.max_retries:
           raise AirflowException("Max number of retries reached!")
   
       if not retries:
           retries = 0
       self.xcom_push(context=context, key="retries", value=retries+1)
   
       self.get_refresh_status(context)
   
   def execute_complete(self, context: Context, event: dict[str, str]) -> Any:
       """
       Return immediately - callback for when the trigger fires.
   
       Relies on trigger to throw an exception, otherwise it assumes execution 
was successful.
       """
       if event:
           if event["status"] == "error":
               if "Unable to fetch the details of dataset refresh with Request 
Id" in event["message"] or "not found" in event["message"]:
                   self.retry_execution(context)
               else:
                   raise AirflowException(event["message"])
   
           self.xcom_push(context=context, 
key="powerbi_dataset_refresh_status", value=event["status"])
   
   ```
   
   Note: in addition to these changes, we should add a new way to handle the 
refresh cancellation. By default, if the trigger encounters an exception, it 
cancels the refresh (which is not compatible with the retry made by the 
operator). If we keep this solution, we have to change this behavior.
   
   I think this solution can work but might add a little too much complexity to 
the operator compared to a simple retry, though I think that this separation 
between the trigger refresh and the status fetch is nice.
   What's your opinion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to