This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f970b4dabc Fix Base Job Tests for Dataset Isolation Mode (#41136)
f970b4dabc is described below

commit f970b4dabcde3b54f5249a631cb5f602441e815e
Author: Jens Scheffler <[email protected]>
AuthorDate: Wed Jul 31 00:10:02 2024 +0200

    Fix Base Job Tests for Dataset Isolation Mode (#41136)
---
 tests/jobs/test_base_job.py | 34 +++++++++++++++++++++++++++-------
 1 file changed, 27 insertions(+), 7 deletions(-)

diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index fcbb84fef7..e956d12889 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import datetime
 import logging
 import sys
+from typing import TYPE_CHECKING
 from unittest.mock import ANY, Mock, patch
 
 import pytest
@@ -35,6 +36,9 @@ from tests.listeners import lifecycle_listener
 from tests.test_utils.config import conf_vars
 from tests.utils.test_helpers import MockJobRunner, SchedulerJobRunner, 
TriggererJobRunner
 
+if TYPE_CHECKING:
+    from airflow.serialization.pydantic.job import JobPydantic
+
 pytestmark = pytest.mark.db_test
 
 
@@ -107,7 +111,7 @@ class TestJob:
             job = Job()
             job_runner(job=job)
             session.add(job)
-            session.flush()
+            session.commit()
 
             most_recent = most_recent_job(job_runner.job_type, session=session)
             assert most_recent.heartrate == float(job_heartbeat_sec)
@@ -126,6 +130,19 @@ class TestJob:
             # heartrate should be 12 since we passed that to the constructor 
directly
             assert job.heartrate == 12
 
+    def _compare_jobs(self, job1: Job | JobPydantic, job2: Job | JobPydantic):
+        """Helper to compare two jobs where one can by Pydantic and the other 
not."""
+        assert job1.id == job2.id
+        assert job1.dag_id == job2.dag_id
+        assert job1.state == job2.state
+        assert job1.job_type == job2.job_type
+        assert job1.start_date == job2.start_date
+        assert job1.end_date == job2.end_date
+        assert job1.latest_heartbeat == job2.latest_heartbeat
+        assert job1.executor_class == job2.executor_class
+        assert job1.hostname == job2.hostname
+        assert job1.unixname == job2.unixname
+
     def test_most_recent_job(self):
         with create_session() as session:
             old_job = Job(heartrate=10)
@@ -135,10 +152,10 @@ class TestJob:
             MockJobRunner(job=job)
             session.add(job)
             session.add(old_job)
-            session.flush()
+            session.commit()
 
-            assert most_recent_job(MockJobRunner.job_type, session=session) == 
job
-            assert old_job.most_recent_job(session=session) == job
+            self._compare_jobs(most_recent_job(MockJobRunner.job_type, 
session=session), job)
+            self._compare_jobs(old_job.most_recent_job(session=session), job)
 
             session.rollback()
 
@@ -159,9 +176,11 @@ class TestJob:
             session.add(old_running_state_job)
             session.add(new_failed_state_job)
             session.add(new_null_state_job)
-            session.flush()
+            session.commit()
 
-            assert most_recent_job(MockJobRunner.job_type, session=session) == 
old_running_state_job
+            self._compare_jobs(
+                most_recent_job(MockJobRunner.job_type, session=session), 
old_running_state_job
+            )
 
             session.rollback()
 
@@ -208,6 +227,7 @@ class TestJob:
         job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=10)
         assert job.is_alive() is False, "Completed jobs even with recent 
heartbeat should not be alive"
 
+    @pytest.mark.skip_if_database_isolation_mode
     def test_heartbeat_failed(self, caplog):
         when = timezone.utcnow() - datetime.timedelta(seconds=60)
         mock_session = Mock(name="MockSession")
@@ -215,7 +235,7 @@ class TestJob:
         job = Job(heartrate=10, state=State.RUNNING)
         job.latest_heartbeat = when
         with caplog.at_level(logging.ERROR):
-            job.heartbeat(heartbeat_callback=lambda: None, 
session=mock_session)
+            job.heartbeat(heartbeat_callback=lambda _: None, 
session=mock_session)
         assert "heartbeat failed with error" in caplog.text
         assert job.latest_heartbeat == when, "attribute not updated when 
heartbeat fails"
         assert job.heartbeat_failed

Reply via email to