dabla commented on issue #44618: URL: https://github.com/apache/airflow/issues/44618#issuecomment-2572797731
> Hello! @dabla thank you for your investigation and suggestion! > > Regarding your first point: > > > I also went back to the code and I still don't understand why the call would still fail when the task instance for that operator is retried the second time by Airflow, as some time would have passed between the first and the second attempt, I would expect the second call to succeed but apparently it still doesn't. > > From what I understand, when the task fails (for example, in our case, because the refreshId was not found), the operator cancels the refresh (using `cancel_dataset_refresh` hook method). So when the task is retried by Airflow, a new refresh is triggered with a new refreshId. > > https://github.com/apache/airflow/blob/413a1833c302e2409d2ac96c6521f97e6589a594/providers/src/airflow/providers/microsoft/azure/hooks/powerbi.py#L202 > > Regarding the fix implementation, the delay solution we discussed seems to work well (which, imho, confirm the bug root cause), but I agree that this fix is more of a "quick fix" than a clean one. > > I can work on the refactor you suggested to check, I just have a few questions: > > > so that when the seconds calls fails the operator can directly retry the second Trigger call > > Regarding the retry mechanism you are suggesting in the operator, how would you do it? I just wrote the following draft, do you confirm this is what you expected? However I'm not sure how you'd handle the retry part... Would you let the operator fail and when the task is retried, it tried to fetch the refreshId directly? Or would you make the operator retry the Trigger in the same task? > > Regarding the trigger, if I understand correctly, the `run` method would look like that: > > ```python > async def run(self) -> AsyncIterator[TriggerEvent]: > > if not self.dataset_refresh_id: > # Just yield a TriggerEvent with the dataset_refresh_id, that will then be used by the operator to retrigger it with the corresponding dataset_refresh_id so the PowerBITrigger knows it has to only get the refresh details in case of failure, then the refresh details would then be executed. > yield TriggerEvent( > ... > ) > > else: > # Handle the "while" loop looking for the refresh status > ``` > > And regarding the operator, if I understand correctly, it would look like that: > > ```python > class PowerBIDatasetRefreshOperator(BaseOperator): > def execute(self, context: Context): > """Refresh the Power BI Dataset.""" > if self.wait_for_termination: > self.defer( > trigger=PowerBITrigger(...), > method_name=self.push_refreshId.__name__, > ) > > def push_refreshId(): > # push the refresh Id to xcom > self.xcom_push( > context=context, key="powerbi_dataset_refresh_Id", value=event["dataset_refresh_id"] > ) > self.defer( > trigger=PowerBITrigger(...), > method_name=self.execute.__name__, > ) > > def execute(): > # exit the operator as currently done > ``` Indeed, that's the main issue, because the fact that the refresh details fails and how the operator is implemented today, instead of trying to directly get the refresh details on the second attempt, it will again trigger a new dataset refresh and then again try to get its refresh details, which of course like you mentioned, will fail again, thus the main problem persists. That's why I suggested the refactor, so that the PowerBITrigger can handle both cases separately, and that the operator can then retry the second case directly instead of redoing the whole flow. I also saw after a remark made by my colleague @joffreybienvenu-infrabel that the tenacity is actually also used in the KubernetesPodOperator, so I was wrong there that it's not being used by operators. But still I think it's better to use the retry mechanism implemented by the TaskInstance instead of bypassing it and doing it directly within the hook/operator, as imho that's not the purpose/ good practise but again I could be wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
