This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4ca45c73a0 Add retries to job heartbeat (#37541)
4ca45c73a0 is described below
commit 4ca45c73a010822d085c85a69164c1a1f51daf48
Author: awdavidson <[email protected]>
AuthorDate: Fri Mar 8 00:47:21 2024 +0000
Add retries to job heartbeat (#37541)
* Job heartbeat retries
* Tweak format
* Retry on individual db operations
---
airflow/jobs/job.py | 3 +++
1 file changed, 3 insertions(+)
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 0afbb2a026..b215b3435b 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -39,6 +39,7 @@ from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.platform import getuser
+from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime
from airflow.utils.state import JobState
@@ -302,6 +303,7 @@ class Job(Base, LoggingMixin):
@staticmethod
@internal_api_call
@provide_session
+ @retry_db_transaction
def _fetch_from_db(job: Job | JobPydantic, session: Session = NEW_SESSION)
-> Job | JobPydantic | None:
if isinstance(job, Job):
# not Internal API
@@ -342,6 +344,7 @@ class Job(Base, LoggingMixin):
@staticmethod
@internal_api_call
@provide_session
+ @retry_db_transaction
def _update_heartbeat(job: Job | JobPydantic, session: Session =
NEW_SESSION) -> Job | JobPydantic:
orm_job: Job | None = session.scalar(select(Job).where(Job.id ==
job.id).limit(1))
if orm_job is None: