This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ca45c73a0 Add retries to job heartbeat (#37541)
4ca45c73a0 is described below

commit 4ca45c73a010822d085c85a69164c1a1f51daf48
Author: awdavidson <[email protected]>
AuthorDate: Fri Mar 8 00:47:21 2024 +0000

    Add retries to job heartbeat (#37541)
    
    * Job heartbeat retries
    
    * Tweak format
    
    * Retry on individual db operations
---
 airflow/jobs/job.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 0afbb2a026..b215b3435b 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -39,6 +39,7 @@ from airflow.utils.helpers import convert_camel_to_snake
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
 from airflow.utils.platform import getuser
+from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
 from airflow.utils.state import JobState
@@ -302,6 +303,7 @@ class Job(Base, LoggingMixin):
     @staticmethod
     @internal_api_call
     @provide_session
+    @retry_db_transaction
     def _fetch_from_db(job: Job | JobPydantic, session: Session = NEW_SESSION) 
-> Job | JobPydantic | None:
         if isinstance(job, Job):
             # not Internal API
@@ -342,6 +344,7 @@ class Job(Base, LoggingMixin):
     @staticmethod
     @internal_api_call
     @provide_session
+    @retry_db_transaction
     def _update_heartbeat(job: Job | JobPydantic, session: Session = 
NEW_SESSION) -> Job | JobPydantic:
         orm_job: Job | None = session.scalar(select(Job).where(Job.id == 
job.id).limit(1))
         if orm_job is None:

Reply via email to