dabla commented on issue #44618:
URL: https://github.com/apache/airflow/issues/44618#issuecomment-2572497803
> Hi @Ohashiro, thank you for bringing this up and creating the PR to
address it. After reviewing the conversation, I see the issue lies in `get
refresh history` function in hook class, as sometimes it fails to return the
"dataset refresh histories" leading the dataset refresh to be marked as fail,
even if it actually succeeds. Please correct me if I’ve misunderstood.
>
> So, I suggest, we should add the retry mechanism to the get refresh
history function only as below, it will retry to fetch histories, if it still
fails, then we just throw the exception. That would also mean we don't need any
extra exception class as you created "PowerBIDatasetRefreshStatusExecption"
>
> ```
> @tenacity.retry(
> stop=tenacity.stop_after_attempt(3),
> wait=tenacity.wait_random_exponential(),
> reraise=True,
> retry=tenacity.retry_if_exception(should_retry_creation),
> )
> async def get_refresh_history(
> self,
> dataset_id: str,
> group_id: str,
> ) -> list[dict[str, str]]:
> """
> Retrieve the refresh history of the specified dataset from the
given group ID.
>
> :param dataset_id: The dataset ID.
> :param group_id: The workspace ID.
>
> :return: Dictionary containing all the refresh histories of the
dataset.
> """
> try:
> response = await self.run(
>
url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes",
> path_parameters={
> "group_id": group_id,
> "dataset_id": dataset_id,
> },
> )
>
> refresh_histories = response.get("value")
> if not refresh_histories: # Retry if refresh_histories is
None or empty
> raise PowerBIDatasetRefreshException(
> "Refresh histories are empty; retrying..."
> )
>
> return [self.raw_to_refresh_details(refresh_history) for
refresh_history in refresh_histories]
>
> except Exception as error:
> raise PowerBIDatasetRefreshException(f"Failed to retrieve
refresh history due to error: {error}")
> ```
I don't think it's a good practice to use tenacity retry mechanism while
operators (e.g. task instances) have there own retry mechanism managed by
Airflow tasks, but I could be wrong. I also went back to the code and I still
don't understand why the call would still fail when the task instance for that
operator is retried the second time by Airflow, as some time would have passed
between the first and the second attempt, I would expect the second call to
succeed but apparently it still doesn't.
Maybe it because of this part:
```
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to the PowerBI and polls for the dataset
refresh status."""
# this is always called, even if it succeeded during first attempt
but is retried because the second one below fails, maybe something has to be
done to avoid that when a retry is being executed
self.dataset_refresh_id = await self.hook.trigger_dataset_refresh(
dataset_id=self.dataset_id,
group_id=self.group_id,
)
async def fetch_refresh_status_and_error() -> tuple[str, str]:
"""Fetch the current status and error of the dataset refresh."""
# this is the call that fails, because it probably happens to
fast after the above one
refresh_details = await
self.hook.get_refresh_details_by_refresh_id(
dataset_id=self.dataset_id,
group_id=self.group_id,
refresh_id=self.dataset_refresh_id,
)
return refresh_details["status"], refresh_details["error"]
```
So maybe @Ohashiro a delay before calling the
get_refresh_details_by_refresh_id would be a possible easy fix to avoid the
issue on the second call, even though I don't like it that much and also
doesn't ensure it will eventually succeed.
What I suggest is to refactor the PowerBITrigger and
PowerBIDatasetRefreshOperator. The PowerBITrigger should get an extra
dataset_refresh_id parameter in the constructor, so the run method can be
called without and with the dataset_refresh_id parameter, that way it can
handle both scenario's and return corresponding TriggerEvents regarding the
executed flow (e.g. is a dataset refresh being triggered or do we want to get
the dataset refresh details), code modifications could possibly look like this
in PowerBITrigger :
```
def __init__(
self,
conn_id: str,
dataset_id: str,
group_id: str,
timeout: float = 60 * 60 * 24 * 7,
proxies: dict | None = None,
api_version: APIVersion | str | None = None,
check_interval: int = 60,
wait_for_termination: bool = True,
dataset_refresh_id: str | None = None, # add dataset_refresh_id
parameter
):
super().__init__()
self.hook = PowerBIHook(conn_id=conn_id, proxies=proxies,
api_version=api_version, timeout=timeout)
self.dataset_id = dataset_id
self.timeout = timeout
self.group_id = group_id
self.check_interval = check_interval
self.wait_for_termination = wait_for_termination
self.dataset_refresh_id = dataset_refresh_id
def serialize(self):
"""Serialize the trigger instance."""
return (
"airflow.providers.microsoft.azure.triggers.powerbi.PowerBITrigger",
{
"conn_id": self.conn_id,
"proxies": self.proxies,
"api_version": self.api_version,
"dataset_id": self.dataset_id,
"group_id": self.group_id,
"timeout": self.timeout,
"check_interval": self.check_interval,
"wait_for_termination": self.wait_for_termination,
"dataset_refresh_id ": self.dataset_refresh_id , #
IMPORTANT: do not forget to add parameter in serialize method also
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make async connection to the PowerBI and polls for the dataset
refresh status."""
something has to be done to avoid that when a retry is being executed
if not self.dataset_refresh_id:
dataset_refresh_id = await self.hook.trigger_dataset_refresh(
dataset_id=self.dataset_id,
group_id=self.group_id,
)
# Just yield a TriggerEvent with the dataset_refresh_id, that
will then be used by the operator to retrigger it with the corresponding
dataset_refresh_id so the PowerBITrigger knows it has to only get the refresh
details in case of failure, then the refresh details would then be executed.
yield TriggerEvent(
{
"status": "success",
"message": f"The dataset refresh
{self.dataset_refresh_id} has been triggered.",
"dataset_refresh_id": dataset_refresh_id,
}
)
async def fetch_refresh_status_and_error() -> tuple[str, str]:
"""Fetch the current status and error of the dataset
refresh."""
refresh_details = await
self.hook.get_refresh_details_by_refresh_id(
dataset_id=self.dataset_id,
group_id=self.group_id,
refresh_id=self.dataset_refresh_id,
)
return refresh_details["status"], refresh_details["error"]
```
Then in the PowerBIDatasetRefreshOperator, both TriggerEvents should be
handled accordingly, which means the PowerBITrigger will be called (e.g.
deferred) twice, once to trigger the dataset_refresh_id and once to poll for
the get_refresh_details_by_refresh_id. On the first attempt, if the trigger
succeeds, the dataset_refresh_id should be persisted as an XCom within the
operator, so that when the seconds calls fails the operator can directly retry
the second Trigger call.
This is of course a more complex approach due to the deferrable aspect, but
would be more in line with how operators should work imho instead of hiding
failures and doing retries outside Airflow flow using the tenacity library.
From what I see in the code base, tenacity is only used in the retries module
of Airflow and the cli commands, not in the operators/hooks/triggerers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]