This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f970b4dabc Fix Base Job Tests for Dataset Isolation Mode (#41136)
f970b4dabc is described below
commit f970b4dabcde3b54f5249a631cb5f602441e815e
Author: Jens Scheffler <[email protected]>
AuthorDate: Wed Jul 31 00:10:02 2024 +0200
Fix Base Job Tests for Dataset Isolation Mode (#41136)
---
tests/jobs/test_base_job.py | 34 +++++++++++++++++++++++++++-------
1 file changed, 27 insertions(+), 7 deletions(-)
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index fcbb84fef7..e956d12889 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import datetime
import logging
import sys
+from typing import TYPE_CHECKING
from unittest.mock import ANY, Mock, patch
import pytest
@@ -35,6 +36,9 @@ from tests.listeners import lifecycle_listener
from tests.test_utils.config import conf_vars
from tests.utils.test_helpers import MockJobRunner, SchedulerJobRunner,
TriggererJobRunner
+if TYPE_CHECKING:
+ from airflow.serialization.pydantic.job import JobPydantic
+
pytestmark = pytest.mark.db_test
@@ -107,7 +111,7 @@ class TestJob:
job = Job()
job_runner(job=job)
session.add(job)
- session.flush()
+ session.commit()
most_recent = most_recent_job(job_runner.job_type, session=session)
assert most_recent.heartrate == float(job_heartbeat_sec)
@@ -126,6 +130,19 @@ class TestJob:
# heartrate should be 12 since we passed that to the constructor
directly
assert job.heartrate == 12
+ def _compare_jobs(self, job1: Job | JobPydantic, job2: Job | JobPydantic):
+ """Helper to compare two jobs where one can by Pydantic and the other
not."""
+ assert job1.id == job2.id
+ assert job1.dag_id == job2.dag_id
+ assert job1.state == job2.state
+ assert job1.job_type == job2.job_type
+ assert job1.start_date == job2.start_date
+ assert job1.end_date == job2.end_date
+ assert job1.latest_heartbeat == job2.latest_heartbeat
+ assert job1.executor_class == job2.executor_class
+ assert job1.hostname == job2.hostname
+ assert job1.unixname == job2.unixname
+
def test_most_recent_job(self):
with create_session() as session:
old_job = Job(heartrate=10)
@@ -135,10 +152,10 @@ class TestJob:
MockJobRunner(job=job)
session.add(job)
session.add(old_job)
- session.flush()
+ session.commit()
- assert most_recent_job(MockJobRunner.job_type, session=session) ==
job
- assert old_job.most_recent_job(session=session) == job
+ self._compare_jobs(most_recent_job(MockJobRunner.job_type,
session=session), job)
+ self._compare_jobs(old_job.most_recent_job(session=session), job)
session.rollback()
@@ -159,9 +176,11 @@ class TestJob:
session.add(old_running_state_job)
session.add(new_failed_state_job)
session.add(new_null_state_job)
- session.flush()
+ session.commit()
- assert most_recent_job(MockJobRunner.job_type, session=session) ==
old_running_state_job
+ self._compare_jobs(
+ most_recent_job(MockJobRunner.job_type, session=session),
old_running_state_job
+ )
session.rollback()
@@ -208,6 +227,7 @@ class TestJob:
job.latest_heartbeat = timezone.utcnow() -
datetime.timedelta(seconds=10)
assert job.is_alive() is False, "Completed jobs even with recent
heartbeat should not be alive"
+ @pytest.mark.skip_if_database_isolation_mode
def test_heartbeat_failed(self, caplog):
when = timezone.utcnow() - datetime.timedelta(seconds=60)
mock_session = Mock(name="MockSession")
@@ -215,7 +235,7 @@ class TestJob:
job = Job(heartrate=10, state=State.RUNNING)
job.latest_heartbeat = when
with caplog.at_level(logging.ERROR):
- job.heartbeat(heartbeat_callback=lambda: None,
session=mock_session)
+ job.heartbeat(heartbeat_callback=lambda _: None,
session=mock_session)
assert "heartbeat failed with error" in caplog.text
assert job.latest_heartbeat == when, "attribute not updated when
heartbeat fails"
assert job.heartbeat_failed