dabla commented on issue #44618:
URL: https://github.com/apache/airflow/issues/44618#issuecomment-2572497803

   > Hi @Ohashiro, thank you for bringing this up and creating the PR to 
address it. After reviewing the conversation, I see the issue lies in `get 
refresh history` function in hook class, as sometimes it fails to return the 
"dataset refresh histories" leading the dataset refresh to be marked as fail, 
even if it actually succeeds. Please correct me if I’ve misunderstood.
   > 
   > So, I suggest, we should add the retry mechanism to the get refresh 
history function only as below, it will retry to fetch histories, if it still 
fails, then we just throw the exception. That would also mean we don't need any 
extra exception class as you created "PowerBIDatasetRefreshStatusExecption"
   > 
   > ```
   > @tenacity.retry(
   >         stop=tenacity.stop_after_attempt(3),
   >         wait=tenacity.wait_random_exponential(),
   >         reraise=True,
   >         retry=tenacity.retry_if_exception(should_retry_creation),
   >     )
   >     async def get_refresh_history(
   >         self,
   >         dataset_id: str,
   >         group_id: str,
   >     ) -> list[dict[str, str]]:
   >         """
   >         Retrieve the refresh history of the specified dataset from the 
given group ID.
   > 
   >         :param dataset_id: The dataset ID.
   >         :param group_id: The workspace ID.
   > 
   >         :return: Dictionary containing all the refresh histories of the 
dataset.
   >         """
   >         try:
   >             response = await self.run(
   >                 
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes",
   >                 path_parameters={
   >                     "group_id": group_id,
   >                     "dataset_id": dataset_id,
   >                 },
   >             )
   > 
   >             refresh_histories = response.get("value")
   >             if not refresh_histories:  # Retry if refresh_histories is 
None or empty
   >                 raise PowerBIDatasetRefreshException(
   >                     "Refresh histories are empty; retrying..."
   >                 )
   >                 
   >             return [self.raw_to_refresh_details(refresh_history) for 
refresh_history in refresh_histories]
   > 
   >         except Exception as error:
   >             raise PowerBIDatasetRefreshException(f"Failed to retrieve 
refresh history due to error: {error}")
   > ```
   
   I don't think it's a good practice to use tenacity retry mechanism while 
operators (e.g. task instances) have there own retry mechanism managed by 
Airflow tasks, but I could be wrong.  I also went back to the code and I still 
don't understand why the call would still fail when the task instance for that 
operator is retried the second time by Airflow, as some time would have passed 
between the first and the second attempt, I would expect the second call to 
succeed but apparently it still doesn't.
   
   Maybe it because of this part:
   
   ```
       async def run(self) -> AsyncIterator[TriggerEvent]:
           """Make async connection to the PowerBI and polls for the dataset 
refresh status."""
           # this is always called, even if it succeeded during first attempt 
but is retried because the second one below fails, maybe something has to be 
done to avoid that when a retry is being executed
           self.dataset_refresh_id = await self.hook.trigger_dataset_refresh(
               dataset_id=self.dataset_id,
               group_id=self.group_id,
           )
   
           async def fetch_refresh_status_and_error() -> tuple[str, str]:
               """Fetch the current status and error of the dataset refresh."""
               # this is the call that fails, because it probably happens to 
fast after the above one
               refresh_details = await 
self.hook.get_refresh_details_by_refresh_id(
                   dataset_id=self.dataset_id,
                   group_id=self.group_id,
                   refresh_id=self.dataset_refresh_id,
               )
               return refresh_details["status"], refresh_details["error"]
   ```
   
   So maybe @Ohashiro a delay before calling the 
get_refresh_details_by_refresh_id would be a possible easy fix to avoid the 
issue on the second call, even though I don't like it that much and also 
doesn't ensure it will eventually succeed.
   
   What I suggest is to refactor the PowerBITrigger and 
PowerBIDatasetRefreshOperator.  The PowerBITrigger should get an extra 
dataset_refresh_id parameter in the constructor, so the run method can be 
called without and with the dataset_refresh_id  parameter, that way it can 
handle both scenario's and return corresponding TriggerEvents regarding the 
executed flow (e.g. is a dataset refresh being triggered or do we want to get 
the dataset refresh details), code modifications could possibly look like this 
in PowerBITrigger :
   
   ```
       def __init__(
           self,
           conn_id: str,
           dataset_id: str,
           group_id: str,
           timeout: float = 60 * 60 * 24 * 7,
           proxies: dict | None = None,
           api_version: APIVersion | str | None = None,
           check_interval: int = 60,
           wait_for_termination: bool = True,
           dataset_refresh_id: str | None = None,  # add dataset_refresh_id 
parameter
       ):
           super().__init__()
           self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies, 
api_version=api_version, timeout=timeout)
           self.dataset_id = dataset_id
           self.timeout = timeout
           self.group_id = group_id
           self.check_interval = check_interval
           self.wait_for_termination = wait_for_termination
           self.dataset_refresh_id = dataset_refresh_id
           
           def serialize(self):
                 """Serialize the trigger instance."""
                 return (
                     
"airflow.providers.microsoft.azure.triggers.powerbi.PowerBITrigger",
                     {
                         "conn_id": self.conn_id,
                         "proxies": self.proxies,
                         "api_version": self.api_version,
                         "dataset_id": self.dataset_id,
                         "group_id": self.group_id,
                         "timeout": self.timeout,
                         "check_interval": self.check_interval,
                         "wait_for_termination": self.wait_for_termination,
                         "dataset_refresh_id ": self.dataset_refresh_id ,  # 
IMPORTANT: do not forget to add parameter in serialize method also
                     },
                 )
   
       async def run(self) -> AsyncIterator[TriggerEvent]:
           """Make async connection to the PowerBI and polls for the dataset 
refresh status."""
   something has to be done to avoid that when a retry is being executed
           if not self.dataset_refresh_id:
               dataset_refresh_id = await self.hook.trigger_dataset_refresh(
                   dataset_id=self.dataset_id,
                   group_id=self.group_id,
               )
               
               # Just yield a TriggerEvent with the dataset_refresh_id, that 
will then be used by the operator to retrigger it with the corresponding 
dataset_refresh_id so the PowerBITrigger knows it has to only get the refresh 
details in case of failure, then the refresh details would then be executed.
               yield TriggerEvent(
                   {
                       "status": "success",
                       "message": f"The dataset refresh 
{self.dataset_refresh_id} has been triggered.",
                       "dataset_refresh_id": dataset_refresh_id,
                   }
               )
   
             async def fetch_refresh_status_and_error() -> tuple[str, str]:
                 """Fetch the current status and error of the dataset 
refresh."""
                 refresh_details = await 
self.hook.get_refresh_details_by_refresh_id(
                     dataset_id=self.dataset_id,
                     group_id=self.group_id,
                     refresh_id=self.dataset_refresh_id,
                 )
                 return refresh_details["status"], refresh_details["error"]
   ```
   
   Then in the PowerBIDatasetRefreshOperator, both TriggerEvents should be 
handled accordingly, which means the PowerBITrigger will be called (e.g. 
deferred) twice, once to trigger the dataset_refresh_id and once to poll for 
the get_refresh_details_by_refresh_id.  On the first attempt, if the trigger 
succeeds, the dataset_refresh_id should be persisted as an XCom within the 
operator, so that when the seconds calls fails the operator can directly retry 
the second Trigger call.
   
   This is of course a more complex approach due to the deferrable aspect, but 
would be more in line with how operators should work imho instead of hiding 
failures and doing retries outside Airflow flow using the tenacity library.  
From what I see in the code base, tenacity is only used in the retries module 
of Airflow and the cli commands, not in the operators/hooks/triggerers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to