dabla commented on issue #44618:
URL: https://github.com/apache/airflow/issues/44618#issuecomment-2572797731

   > Hello! @dabla thank you for your investigation and suggestion!
   > 
   > Regarding your first point:
   > 
   > > I also went back to the code and I still don't understand why the call 
would still fail when the task instance for that operator is retried the second 
time by Airflow, as some time would have passed between the first and the 
second attempt, I would expect the second call to succeed but apparently it 
still doesn't.
   > 
   > From what I understand, when the task fails (for example, in our case, 
because the refreshId was not found), the operator cancels the refresh (using 
`cancel_dataset_refresh` hook method). So when the task is retried by Airflow, 
a new refresh is triggered with a new refreshId.
   > 
   > 
https://github.com/apache/airflow/blob/413a1833c302e2409d2ac96c6521f97e6589a594/providers/src/airflow/providers/microsoft/azure/hooks/powerbi.py#L202
   > 
   > Regarding the fix implementation, the delay solution we discussed seems to 
work well (which, imho, confirm the bug root cause), but I agree that this fix 
is more of a "quick fix" than a clean one.
   > 
   > I can work on the refactor you suggested to check, I just have a few 
questions:
   > 
   > > so that when the seconds calls fails the operator can directly retry the 
second Trigger call
   > 
   > Regarding the retry mechanism you are suggesting in the operator, how 
would you do it? I just wrote the following draft, do you confirm this is what 
you expected? However I'm not sure how you'd handle the retry part... Would you 
let the operator fail and when the task is retried, it tried to fetch the 
refreshId directly? Or would you make the operator retry the Trigger in the 
same task?
   > 
   > Regarding the trigger, if I understand correctly, the `run` method would 
look like that:
   > 
   > ```python
   > async def run(self) -> AsyncIterator[TriggerEvent]:
   > 
   >    if not self.dataset_refresh_id:
   >       # Just yield a TriggerEvent with the dataset_refresh_id, that will 
then be used by the operator to retrigger it with the corresponding 
dataset_refresh_id so the PowerBITrigger knows it has to only get the refresh 
details in case of failure, then the refresh details would then be executed.
   >       yield TriggerEvent(
   >          ...
   >       )
   > 
   >    else:
   >       # Handle the "while" loop looking for the refresh status
   > ```
   > 
   > And regarding the operator, if I understand correctly, it would look like 
that:
   > 
   > ```python
   > class PowerBIDatasetRefreshOperator(BaseOperator):
   >    def execute(self, context: Context):
   >         """Refresh the Power BI Dataset."""
   >         if self.wait_for_termination:
   >             self.defer(
   >                 trigger=PowerBITrigger(...),
   >                 method_name=self.push_refreshId.__name__,
   >             )
   > 
   >    def push_refreshId():
   >       # push the refresh Id to xcom
   >       self.xcom_push(
   >                 context=context, key="powerbi_dataset_refresh_Id", 
value=event["dataset_refresh_id"]
   >             )
   >       self.defer(
   >                 trigger=PowerBITrigger(...),
   >                 method_name=self.execute.__name__,
   >             )
   > 
   >    def execute():
   >       # exit the operator as currently done
   > ```
   
   Indeed, that's the main issue, because the fact that the refresh details 
fails and how the operator is implemented today, instead of trying to directly 
get the refresh details on the second attempt, it will again trigger a new 
dataset refresh and then again try to get its refresh details, which of course 
like you mentioned, will fail again, thus the main problem persists.
   
   That's why I suggested the refactor, so that the PowerBITrigger can handle 
both cases separately, and that the operator can then retry the second case 
directly instead of redoing the whole flow.
   
   I also saw after a remark made by my colleague @joffreybienvenu-infrabel 
that the tenacity is actually also used in the KubernetesPodOperator, so I was 
wrong there that it's not being used by operators.  But still I think it's 
better to use the retry mechanism implemented by the TaskInstance instead of 
bypassing it and doing it directly within the hook/operator, as imho that's not 
the purpose/ good practise but again I could be wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to