iw-pavan commented on PR #41048:
URL: https://github.com/apache/airflow/pull/41048#issuecomment-2252658640
With this support we want to implement retryable queries
Add retry session like
`class RetryingSession(Session):
__max_retry_count__ = 3
__retry_messages__ = [
"server closed the connection unexpectedly",
"the database system is shutting down",
"the database system is starting up"
]
def __raise_if_not_retryable(self, ex):
exception_msg = str(ex)
for m in RetryingSession.__retry_messages__:
if m in exception_msg:
logging.warning('Database error is retryable : %s', exception_msg)
return
logging.warning('Database error is not retryable : %s', exception_msg)
raise ex
def execute(
self,
*args,
**kwargs
):
attempts = 0
while True:
try:
attempts += 1
return super(RetryingSession, self).execute(*args, **kwargs)
except OperationalError as ex:
self.__raise_if_not_retryable(ex)
if attempts <= self.__max_retry_count__:
sleep_for = 2 ** (attempts - 1)
logging.error(
"/!\ Database connection error: retrying Strategy => sleeping
for {}s"
" and will retry (attempt #{} of {}) \n Detailed query impacted:
{}".format(
sleep_for, attempts, self.__max_retry_count__, ex)
)
sleep(sleep_for)
else:
raise`
Then in airflow_local_settings
`session_kwargs = dict(
class_= RetryingSession
)`
And airflow cfg
`sql_alchemy_session_args = airflow_local_settings.session_kwargs`
PS: Is this the way to implement retry in all queries or any other better
way?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]