This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8c3ce932327 AIP-66: Make DAG callbacks bundle aware (#45860)
8c3ce932327 is described below
commit 8c3ce9323270f21c63e37bce4c8ce11f1ed83fa2
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Feb 4 02:44:07 2025 +0100
AIP-66: Make DAG callbacks bundle aware (#45860)
This involves using relative paths in the callbacks, resolving the full
path and using the
it to queue the callback in the file processor process.
---
airflow/callbacks/callback_requests.py | 4 +-
airflow/dag_processing/manager.py | 40 ++---
airflow/dag_processing/processor.py | 13 +-
airflow/jobs/scheduler_job_runner.py | 31 ++--
airflow/models/dagrun.py | 12 +-
airflow/triggers/base.py | 4 +-
.../executors/test_celery_kubernetes_executor.py | 4 +-
.../executors/test_local_kubernetes_executor.py | 4 +-
tests/callbacks/test_callback_requests.py | 12 +-
tests/dag_processing/test_manager.py | 198 ++++++++++++---------
tests/dag_processing/test_processor.py | 19 +-
tests/jobs/test_scheduler_job.py | 74 +++++---
tests/models/test_dagrun.py | 31 +++-
tests_common/pytest_plugin.py | 1 -
14 files changed, 275 insertions(+), 172 deletions(-)
diff --git a/airflow/callbacks/callback_requests.py
b/airflow/callbacks/callback_requests.py
index c70835dedb5..b990b0df7ea 100644
--- a/airflow/callbacks/callback_requests.py
+++ b/airflow/callbacks/callback_requests.py
@@ -34,8 +34,10 @@ class BaseCallbackRequest(BaseModel):
:param msg: Additional Message that can be used for logging
"""
- full_filepath: str
+ filepath: str
"""File Path to use to run the callback"""
+ bundle_name: str
+ bundle_version: str | None
msg: str | None = None
"""Additional Message that can be used for logging to determine
failure/zombie"""
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 987e09bfc66..815a45b0fd9 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -107,6 +107,7 @@ class DagFileInfo:
rel_path: Path
bundle_name: str
bundle_path: Path | None = field(compare=False, default=None)
+ bundle_version: str | None = None
@property
def absolute_path(self) -> Path:
@@ -191,7 +192,7 @@ class DagFileProcessorManager:
_parsing_start_time: float = attrs.field(init=False)
_num_run: int = attrs.field(default=0, init=False)
- _callback_to_execute: dict[str, list[CallbackRequest]] = attrs.field(
+ _callback_to_execute: dict[DagFileInfo, list[CallbackRequest]] =
attrs.field(
factory=lambda: defaultdict(list), init=False
)
@@ -407,18 +408,21 @@ class DagFileProcessorManager:
def _add_callback_to_queue(self, request: CallbackRequest):
self.log.debug("Queuing %s CallbackRequest: %s",
type(request).__name__, request)
- self.log.warning("Callbacks are not implemented yet!")
- # TODO: AIP-66 make callbacks bundle aware
- return
- self._callback_to_execute[request.full_filepath].append(request)
- if request.full_filepath in self._file_queue:
- # Remove file paths matching request.full_filepath from
self._file_queue
- # Since we are already going to use that filepath to run callback,
- # there is no need to have same file path again in the queue
- # todo (AIP-66): update re bundle and rel loc
- self._file_queue = deque(f for f in self._file_queue if f !=
request.full_filepath)
- # todo (AIP-66): update re bundle and rel loc
- self._add_files_to_queue([request.full_filepath], True)
+ try:
+ bundle = DagBundlesManager().get_bundle(name=request.bundle_name,
version=request.bundle_version)
+ except ValueError:
+ # Bundle no longer configured
+ self.log.error("Bundle %s no longer configured, skipping
callback", request.bundle_name)
+ return None
+
+ file_info = DagFileInfo(
+ rel_path=Path(request.filepath),
+ bundle_path=bundle.path,
+ bundle_name=request.bundle_name,
+ bundle_version=request.bundle_version,
+ )
+ self._callback_to_execute[file_info].append(request)
+ self._add_files_to_queue([file_info], True)
Stats.incr("dag_processing.other_callback_count")
@classmethod
@@ -708,15 +712,8 @@ class DagFileProcessorManager:
"""
self._files = files
- # remove from queue any files no longer in the _files list
- self._file_queue = deque(x for x in self._file_queue if x in files)
Stats.gauge("dag_processing.file_path_queue_size",
len(self._file_queue))
- # TODO: AIP-66 make callbacks bundle aware
- # callback_paths_to_del = [x for x in self._callback_to_execute if x
not in new_file_paths]
- # for path_to_del in callback_paths_to_del:
- # del self._callback_to_execute[path_to_del]
-
# Stop processors that are working on deleted files
filtered_processors = {}
for file, processor in self._processors.items():
@@ -807,8 +804,7 @@ class DagFileProcessorManager:
def _create_process(self, dag_file: DagFileInfo) ->
DagFileProcessorProcess:
id = uuid7()
- # callback_to_execute_for_file =
self._callback_to_execute.pop(file_path, [])
- callback_to_execute_for_file: list[CallbackRequest] = []
+ callback_to_execute_for_file = self._callback_to_execute.pop(dag_file,
[])
return DagFileProcessorProcess.start(
id=id,
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index 2a822e1bae9..e8f5d3978cc 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -67,10 +67,11 @@ def _parse_file_entrypoint():
log = structlog.get_logger(logger_name="task")
result = _parse_file(msg, log)
- comms_decoder.send_request(log, result)
+ if result is not None:
+ comms_decoder.send_request(log, result)
-def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) ->
DagFileParsingResult:
+def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) ->
DagFileParsingResult | None:
# TODO: Set known_pool names on DagBag!
bag = DagBag(
dag_folder=msg.file,
@@ -79,6 +80,11 @@ def _parse_file(msg: DagFileParseRequest, log:
FilteringBoundLogger) -> DagFileP
safe_mode=True,
load_op_links=False,
)
+ if msg.callback_requests:
+ # If the request is for callback, we shouldn't serialize the DAGs
+ _execute_callbacks(bag, msg.callback_requests, log)
+ return None
+
serialized_dags, serialization_import_errors = _serialize_dags(bag, log)
bag.import_errors.update(serialization_import_errors)
dags = [LazyDeserializedDAG(data=serdag) for serdag in serialized_dags]
@@ -89,9 +95,6 @@ def _parse_file(msg: DagFileParseRequest, log:
FilteringBoundLogger) -> DagFileP
# TODO: Make `bag.dag_warnings` not return SQLA model objects
warnings=[],
)
-
- if msg.callback_requests:
- _execute_callbacks(bag, msg.callback_requests, log)
return result
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 1e98dbf2aba..df1cb5ab529 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -35,7 +35,7 @@ from typing import TYPE_CHECKING, Any, Callable
from deprecated import deprecated
from sqlalchemy import and_, delete, exists, func, select, text, tuple_, update
from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
+from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient,
selectinload
from sqlalchemy.sql import expression
from airflow import settings
@@ -756,7 +756,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# Check state of finished tasks
filter_for_tis = TI.filter_for_tis(tis_with_right_state)
- query =
select(TI).where(filter_for_tis).options(selectinload(TI.dag_model))
+ query = (
+ select(TI)
+ .where(filter_for_tis)
+ .options(selectinload(TI.dag_model))
+ .options(joinedload(TI.dag_version))
+ )
# row lock this entire set of taskinstances to make sure the scheduler
doesn't fail when we have
# multi-schedulers
tis_query: Query = with_row_locks(query, of=TI, session=session,
skip_locked=True)
@@ -853,7 +858,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# too, which would lead to double logging
cls.logger().error(msg)
request = TaskCallbackRequest(
- full_filepath=ti.dag_model.fileloc,
+ filepath=ti.dag_model.relative_fileloc,
+ bundle_name=ti.dag_version.bundle_name,
+ bundle_version=ti.dag_version.bundle_version,
ti=ti,
msg=msg,
)
@@ -1627,9 +1634,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag_model.calculate_dagrun_date_fields(dag,
dag.get_run_data_interval(dag_run))
callback_to_execute = DagCallbackRequest(
- full_filepath=dag.fileloc,
+ filepath=dag_model.relative_fileloc,
dag_id=dag.dag_id,
run_id=dag_run.run_id,
+ bundle_name=dag_model.bundle_name,
+ bundle_version=dag_run.bundle_version,
is_failure_callback=True,
msg="timed_out",
)
@@ -1991,11 +2000,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if zombies := self._find_zombies(session=session):
self._purge_zombies(zombies, session=session)
- def _find_zombies(self, *, session: Session) -> list[tuple[TI, str]]:
+ def _find_zombies(self, *, session: Session) -> list[TI]:
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() -
timedelta(seconds=self._zombie_threshold_secs)
- zombies = session.execute(
- select(TI, DM.fileloc)
+ zombies = session.scalars(
+ select(TI)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(DM, TI.dag_id == DM.dag_id)
.where(
@@ -2008,11 +2017,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.log.warning("Failing %s TIs without heartbeat after %s",
len(zombies), limit_dttm)
return zombies
- def _purge_zombies(self, zombies: list[tuple[TI, str]], *, session:
Session) -> None:
- for ti, file_loc in zombies:
+ def _purge_zombies(self, zombies: list[TI], *, session: Session) -> None:
+ for ti in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
- full_filepath=file_loc,
+ filepath=ti.dag_model.relative_fileloc,
+ bundle_name=ti.dag_version.bundle_name,
+ bundle_version=ti.dag_run.bundle_version,
ti=ti,
msg=str(zombie_message_details),
)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 22757c972e9..7d46c29285f 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -922,9 +922,11 @@ class DagRun(Base, LoggingMixin):
dag.handle_callback(self, success=False,
reason="task_failure", session=session)
elif dag.has_on_failure_callback:
callback = DagCallbackRequest(
- full_filepath=dag.fileloc,
+ filepath=self.dag_model.relative_fileloc,
dag_id=self.dag_id,
run_id=self.run_id,
+ bundle_name=self.dag_version.bundle_name,
+ bundle_version=self.bundle_version,
is_failure_callback=True,
msg="task_failure",
)
@@ -949,9 +951,11 @@ class DagRun(Base, LoggingMixin):
dag.handle_callback(self, success=True, reason="success",
session=session)
elif dag.has_on_success_callback:
callback = DagCallbackRequest(
- full_filepath=dag.fileloc,
+ filepath=self.dag_model.relative_fileloc,
dag_id=self.dag_id,
run_id=self.run_id,
+ bundle_name=self.dag_version.bundle_name,
+ bundle_version=self.bundle_version,
is_failure_callback=False,
msg="success",
)
@@ -966,9 +970,11 @@ class DagRun(Base, LoggingMixin):
dag.handle_callback(self, success=False,
reason="all_tasks_deadlocked", session=session)
elif dag.has_on_failure_callback:
callback = DagCallbackRequest(
- full_filepath=dag.fileloc,
+ filepath=self.dag_model.relative_fileloc,
dag_id=self.dag_id,
run_id=self.run_id,
+ bundle_name=self.dag_version.bundle_name,
+ bundle_version=self.bundle_version,
is_failure_callback=True,
msg="all_tasks_deadlocked",
)
diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py
index 4e88465d533..d36dd40b6f8 100644
--- a/airflow/triggers/base.py
+++ b/airflow/triggers/base.py
@@ -217,9 +217,11 @@ class BaseTaskEndEvent(TriggerEvent):
"""Submit a callback request if the task state is SUCCESS or FAILED."""
if self.task_instance_state in (TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED):
request = TaskCallbackRequest(
- full_filepath=task_instance.dag_model.fileloc,
+ filepath=task_instance.dag_model.relative_fileloc,
ti=task_instance,
task_callback_type=self.task_instance_state,
+ bundle_name=task_instance.dag_model.bundle_name,
+ bundle_version=task_instance.dag_run.bundle_version,
)
log.info("Sending callback: %s", request)
try:
diff --git
a/providers/celery/tests/provider_tests/celery/executors/test_celery_kubernetes_executor.py
b/providers/celery/tests/provider_tests/celery/executors/test_celery_kubernetes_executor.py
index 027793c29d5..267b0fa78fd 100644
---
a/providers/celery/tests/provider_tests/celery/executors/test_celery_kubernetes_executor.py
+++
b/providers/celery/tests/provider_tests/celery/executors/test_celery_kubernetes_executor.py
@@ -261,7 +261,9 @@ class TestCeleryKubernetesExecutor:
cel_k8s_exec.callback_sink = mock.MagicMock()
if AIRFLOW_V_3_0_PLUS:
- callback = DagCallbackRequest(full_filepath="fake", dag_id="fake",
run_id="fake")
+ callback = DagCallbackRequest(
+ filepath="fake", dag_id="fake", run_id="fake",
bundle_name="testing", bundle_version=None
+ )
else:
callback = CallbackRequest(full_filepath="fake")
cel_k8s_exec.send_callback(callback)
diff --git
a/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py
b/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py
index be4d8b93673..c712b2d4b5e 100644
---
a/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py
+++
b/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py
@@ -116,7 +116,9 @@ class TestLocalKubernetesExecutor:
local_k8s_exec.callback_sink = mock.MagicMock()
if AIRFLOW_V_3_0_PLUS:
- callback = DagCallbackRequest(full_filepath="fake", dag_id="fake",
run_id="fake")
+ callback = DagCallbackRequest(
+ filepath="fake", dag_id="fake", run_id="fake",
bundle_name="fake", bundle_version=None
+ )
else:
callback = CallbackRequest(full_filepath="fake")
local_k8s_exec.send_callback(callback)
diff --git a/tests/callbacks/test_callback_requests.py
b/tests/callbacks/test_callback_requests.py
index 68362d36239..345af3b597c 100644
--- a/tests/callbacks/test_callback_requests.py
+++ b/tests/callbacks/test_callback_requests.py
@@ -43,10 +43,12 @@ class TestCallbackRequest:
),
(
DagCallbackRequest(
- full_filepath="filepath",
+ filepath="filepath",
dag_id="fake_dag",
run_id="fake_run",
is_failure_callback=False,
+ bundle_name="testing",
+ bundle_version=None,
),
DagCallbackRequest,
),
@@ -66,8 +68,7 @@ class TestCallbackRequest:
)
input = TaskCallbackRequest(
- full_filepath="filepath",
- ti=ti,
+ filepath="filepath", ti=ti, bundle_name="testing",
bundle_version=None
)
json_str = input.to_json()
result = request_class.from_json(json_str)
@@ -79,10 +80,7 @@ class TestCallbackRequest:
ti.end_date = timezone.utcnow()
session.merge(ti)
session.flush()
- input = TaskCallbackRequest(
- full_filepath="filepath",
- ti=ti,
- )
+ input = TaskCallbackRequest(filepath="filepath", ti=ti,
bundle_name="testing", bundle_version=None)
json_str = input.to_json()
result = TaskCallbackRequest.from_json(json_str)
assert input == result
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
index ebc3536c83f..46d6fe5f50a 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -160,12 +160,14 @@ class TestDagFileProcessorManager:
# TODO: AIP-66 no asserts?
- def test_start_new_processes_with_same_filepath(self):
+ def test_start_new_processes_with_same_filepath(self,
configure_testing_dag_bundle):
"""
Test that when a processor already exist with a filepath, a new
processor won't be created
with that filepath. The filepath will just be removed from the list.
"""
- manager = DagFileProcessorManager(max_runs=1)
+ with configure_testing_dag_bundle("/tmp"):
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
file_1 = DagFileInfo(bundle_name="testing",
rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER)
file_2 = DagFileInfo(bundle_name="testing",
rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER)
@@ -475,6 +477,7 @@ class TestDagFileProcessorManager:
manager._kill_timed_out_processors()
mock_kill.assert_not_called()
+ @pytest.mark.usefixtures("testing_dag_bundle")
@pytest.mark.parametrize(
["callbacks", "path", "expected_buffer"],
[
@@ -492,9 +495,11 @@ class TestDagFileProcessorManager:
pytest.param(
[
DagCallbackRequest(
- full_filepath="/opt/airflow/dags/dag_callback_dag.py",
+ filepath="dag_callback_dag.py",
dag_id="dag_id",
run_id="run_id",
+ bundle_name="testing",
+ bundle_version=None,
is_failure_callback=False,
)
],
@@ -505,7 +510,9 @@ class TestDagFileProcessorManager:
b'"requests_fd":123,"callback_requests":'
b"["
b"{"
- b'"full_filepath":"/opt/airflow/dags/dag_callback_dag.py",'
+ b'"filepath":"dag_callback_dag.py",'
+ b'"bundle_name":"testing",'
+ b'"bundle_version":null,'
b'"msg":null,'
b'"dag_id":"dag_id",'
b'"run_id":"run_id",'
@@ -650,18 +657,22 @@ class TestDagFileProcessorManager:
assert dagbag.get_dag("test_dag2").get_is_active() is False
@conf_vars({("core", "load_examples"): "False"})
- def test_fetch_callbacks_from_database(self, tmp_path,
configure_testing_dag_bundle):
+ def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle):
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
callback1 = DagCallbackRequest(
dag_id="test_start_date_scheduling",
- full_filepath=str(dag_filepath),
+ bundle_name="testing",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
is_failure_callback=True,
run_id="123",
)
callback2 = DagCallbackRequest(
dag_id="test_start_date_scheduling",
- full_filepath=str(dag_filepath),
+ bundle_name="testing",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
is_failure_callback=True,
run_id="456",
)
@@ -670,7 +681,7 @@ class TestDagFileProcessorManager:
session.add(DbCallbackRequest(callback=callback1,
priority_weight=11))
session.add(DbCallbackRequest(callback=callback2,
priority_weight=10))
- with configure_testing_dag_bundle(tmp_path):
+ with configure_testing_dag_bundle(dag_filepath):
manager = DagFileProcessorManager(max_runs=1)
with create_session() as session:
@@ -691,13 +702,15 @@ class TestDagFileProcessorManager:
for i in range(5):
callback = DagCallbackRequest(
dag_id="test_start_date_scheduling",
- full_filepath=str(dag_filepath),
+ bundle_name="testing",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
is_failure_callback=True,
run_id=str(i),
)
session.add(DbCallbackRequest(callback=callback,
priority_weight=i))
- with configure_testing_dag_bundle(tmp_path):
+ with configure_testing_dag_bundle(dag_filepath):
manager = DagFileProcessorManager(max_runs=1)
with create_session() as session:
@@ -708,83 +721,102 @@ class TestDagFileProcessorManager:
manager.run()
assert session.query(DbCallbackRequest).count() == 1
- @pytest.mark.skip("AIP-66: callbacks are not implemented yet")
- def test_callback_queue(self, tmp_path):
- """
- This test has gotten a bit out of sync with the codebase.
-
- I am just updating it to be consistent with the changes in DagFileInfo
- """
- # given
- manager = DagFileProcessorManager(
- max_runs=1,
- processor_timeout=365 * 86_400,
- )
-
- dag1_path = DagFileInfo(
- bundle_name="testing", rel_path=Path("green_eggs/ham/file1.py"),
bundle_path=TEST_DAGS_FOLDER
- )
- dag1_req1 = DagCallbackRequest(
- full_filepath=TEST_DAGS_FOLDER / "green_eggs/ham/file1.py",
- dag_id="dag1",
- run_id="run1",
- is_failure_callback=False,
- msg=None,
- )
- dag1_req2 = DagCallbackRequest(
- full_filepath=TEST_DAGS_FOLDER / "green_eggs/ham/file1.py",
- dag_id="dag1",
- run_id="run1",
- is_failure_callback=False,
- msg=None,
- )
-
- dag2_path = DagFileInfo(
- bundle_name="testing", rel_path=Path("green_eggs/ham/file2.py"),
bundle_path=TEST_DAGS_FOLDER
- )
- dag2_req1 = DagCallbackRequest(
- full_filepath=TEST_DAGS_FOLDER / "green_eggs/ham/file2.py",
- dag_id="dag2",
- run_id="run1",
- is_failure_callback=False,
- msg=None,
- )
-
- # when
- manager._add_callback_to_queue(dag1_req1)
- manager._add_callback_to_queue(dag2_req1)
-
- # then - requests should be in manager's queue, with dag2 ahead of
dag1 (because it was added last)
- assert manager._file_queue == deque([dag2_path, dag1_path])
- assert set(manager._callback_to_execute.keys()) == {
- dag1_req1.full_filepath,
- dag2_req1.full_filepath,
- }
- assert manager._callback_to_execute[dag2_req1.full_filepath] ==
[dag2_req1]
+ @mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
+ def test_callback_queue(self, mock_logger, configure_testing_dag_bundle):
+ tmp_path = "/green_eggs/ham"
+ with configure_testing_dag_bundle(tmp_path):
+ # given
+ manager = DagFileProcessorManager(
+ max_runs=1,
+ processor_timeout=365 * 86_400,
+ )
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
- # update the queue, although the callback is registered
- assert manager._file_queue == deque([dag2_req1.full_filepath,
dag1_req1.full_filepath])
+ dag1_path = DagFileInfo(
+ bundle_name="testing", rel_path=Path("file1.py"),
bundle_path=Path(tmp_path)
+ )
+ dag1_req1 = DagCallbackRequest(
+ filepath="file1.py",
+ dag_id="dag1",
+ run_id="run1",
+ is_failure_callback=False,
+ bundle_name="testing",
+ bundle_version=None,
+ msg=None,
+ )
+ dag1_req2 = DagCallbackRequest(
+ filepath="file1.py",
+ dag_id="dag1",
+ run_id="run1",
+ is_failure_callback=False,
+ bundle_name="testing",
+ bundle_version=None,
+ msg=None,
+ )
- # when
- manager._add_callback_to_queue(dag1_req2)
+ dag2_path = DagFileInfo(
+ bundle_name="testing", rel_path=Path("file2.py"),
bundle_path=Path(tmp_path)
+ )
+ dag2_req1 = DagCallbackRequest(
+ filepath="file2.py",
+ dag_id="dag2",
+ run_id="run1",
+ bundle_name=dag2_path.bundle_name,
+ bundle_version=None,
+ is_failure_callback=False,
+ msg=None,
+ )
- # then - non-sla callback should have brought dag1 to the fore
- assert manager._file_queue == deque([dag1_req1.full_filepath,
dag2_req1.full_filepath])
- assert manager._callback_to_execute[dag1_req1.full_filepath] == [
- dag1_req1,
- dag1_req2,
- ]
+ # when
+ manager._add_callback_to_queue(dag1_req1)
+ manager._add_callback_to_queue(dag2_req1)
+
+ # then - requests should be in manager's queue, with dag2 ahead of
dag1 (because it was added last)
+ assert manager._file_queue == deque([dag2_path, dag1_path])
+ assert set(manager._callback_to_execute.keys()) == {
+ dag1_path,
+ dag2_path,
+ }
+ assert manager._callback_to_execute[dag2_path] == [dag2_req1]
+
+ # update the queue, although the callback is registered
+ assert manager._file_queue == deque([dag2_path, dag1_path])
+
+ # when
+ manager._add_callback_to_queue(dag1_req2)
+ # Since dag1_req2 is same as dag1_req1, we now have 2 items in
file_path_queue
+ assert manager._file_queue == deque([dag2_path, dag1_path])
+ assert manager._callback_to_execute[dag1_path] == [
+ dag1_req1,
+ dag1_req2,
+ ]
- with mock.patch.object(
- DagFileProcessorProcess, "start", side_effect=lambda *args,
**kwargs: self.mock_processor()
- ) as start:
- manager._start_new_processes()
- # Callbacks passed to process ctor
- start.assert_any_call(
- id=mock.ANY, path=dag1_req1.full_filepath, callbacks=[dag1_req1,
dag1_req2], selector=mock.ANY
- )
- # And removed from the queue
- assert dag1_req1.full_filepath not in manager._callback_to_execute
+ with mock.patch.object(
+ DagFileProcessorProcess, "start", side_effect=lambda *args,
**kwargs: self.mock_processor()
+ ) as start:
+ manager._start_new_processes()
+ # Callbacks passed to processor
+ assert start.call_args_list == [
+ mock.call(
+ id=mock.ANY,
+ path=Path(dag2_path.bundle_path, dag2_path.rel_path),
+ bundle_path=dag2_path.bundle_path,
+ callbacks=[dag2_req1],
+ selector=mock.ANY,
+ logger=mock_logger.return_value,
+ ),
+ mock.call(
+ id=mock.ANY,
+ path=Path(dag1_path.bundle_path, dag1_path.rel_path),
+ bundle_path=dag1_path.bundle_path,
+ callbacks=[dag1_req1, dag1_req2],
+ selector=mock.ANY,
+ logger=mock_logger.return_value,
+ ),
+ ]
+ # And removed from the queue
+ assert dag1_path not in manager._callback_to_execute
+ assert dag2_path not in manager._callback_to_execute
def test_dag_with_assets(self, session, configure_testing_dag_bundle):
"""'Integration' test to ensure that the assets get parsed and stored
correctly for parsed dags."""
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index 9844186be1d..a01d3edb2ff 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -75,7 +75,7 @@ def disable_load_example():
class TestDagFileProcessor:
def _process_file(
self, file_path, callback_requests: list[CallbackRequest] | None = None
- ) -> DagFileParsingResult:
+ ) -> DagFileParsingResult | None:
return _parse_file(
DagFileParseRequest(
file=file_path,
@@ -128,7 +128,7 @@ class TestDagFileProcessor:
dagbag.import_errors["a.py"] = "Import error"
resp = self._process_file("a.py")
-
+ assert resp is not None
assert not resp.serialized_dags
assert resp.import_errors is not None
assert "a.py" in resp.import_errors
@@ -184,6 +184,7 @@ def disable_capturing():
sys.stdin, sys.stdout, sys.stderr = old_in, old_out, old_err
[email protected]("testing_dag_bundle")
@pytest.mark.usefixtures("disable_capturing")
def test_parse_file_entrypoint_parses_dag_callbacks(spy_agency):
r, w = socketpair()
@@ -193,7 +194,7 @@ def
test_parse_file_entrypoint_parses_dag_callbacks(spy_agency):
w.makefile("wb").write(
b'{"file":"/files/dags/wait.py","bundle_path":"/files/dags","requests_fd":'
+ str(w2.fileno()).encode("ascii")
- + b',"callback_requests": [{"full_filepath": "/files/dags/wait.py", '
+ + b',"callback_requests": [{"filepath": "wait.py", "bundle_name":
"testing", "bundle_version": null, '
b'"msg": "task_failure", "dag_id": "wait_to_fail", "run_id": '
b'"manual__2024-12-30T21:02:55.203691+00:00", '
b'"is_failure_callback": true, "type": "DagCallbackRequest"}], "type":
"DagFileParseRequest"}\n'
@@ -209,11 +210,13 @@ def
test_parse_file_entrypoint_parses_dag_callbacks(spy_agency):
assert msg.file == "/files/dags/wait.py"
assert msg.callback_requests == [
DagCallbackRequest(
- full_filepath="/files/dags/wait.py",
+ filepath="wait.py",
msg="task_failure",
dag_id="wait_to_fail",
run_id="manual__2024-12-30T21:02:55.203691+00:00",
is_failure_callback=True,
+ bundle_name="testing",
+ bundle_version=None,
)
]
@@ -236,10 +239,12 @@ def test_parse_file_with_dag_callbacks(spy_agency):
requests = [
DagCallbackRequest(
- full_filepath="A",
+ filepath="A",
msg="Message",
dag_id="a",
run_id="b",
+ bundle_name="testing",
+ bundle_version=None,
)
]
_parse_file(
@@ -270,9 +275,11 @@ def test_parse_file_with_task_callbacks(spy_agency):
requests = [
TaskCallbackRequest(
- full_filepath="A",
+ filepath="A",
msg="Message",
ti=None,
+ bundle_name="testing",
+ bundle_version=None,
)
]
_parse_file(
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index faf8fa73284..6b0f5847ce7 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -152,6 +152,7 @@ def create_dagrun(session):
state=state,
start_date=start_date,
triggered_by=DagRunTriggeredByType.TEST,
+ dag_version=DagVersion.get_latest_version(dag.dag_id),
)
return _create_dagrun
@@ -370,13 +371,17 @@ class TestSchedulerJob:
@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
- def test_process_executor_events_with_callback(self, mock_stats_incr,
mock_task_callback, dag_maker):
+ def test_process_executor_events_with_callback(
+ self, mock_stats_incr, mock_task_callback, dag_maker, session
+ ):
dag_id = "test_process_executor_events_with_callback"
task_id_1 = "dummy_task"
with dag_maker(dag_id=dag_id, fileloc="/test_path1/") as dag:
- task1 = EmptyOperator(task_id=task_id_1,
on_failure_callback=lambda x: print("hi"))
- ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
+ EmptyOperator(task_id=task_id_1, on_failure_callback=lambda x:
print("hi"))
+ dagv = DagVersion.get_latest_version(dag.dag_id)
+ dr = dag_maker.create_dagrun(dag_version=dagv)
+ ti1 = dr.task_instances[0]
mock_stats_incr.reset_mock()
@@ -386,8 +391,6 @@ class TestSchedulerJob:
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)
- session = settings.Session()
-
ti1.state = State.QUEUED
session.merge(ti1)
session.commit()
@@ -400,8 +403,10 @@ class TestSchedulerJob:
# will be set to failed in dag parsing process
assert ti1.state == State.QUEUED
mock_task_callback.assert_called_once_with(
- full_filepath=dag.fileloc,
+ filepath=dag.relative_fileloc,
ti=mock.ANY,
+ bundle_name="dag_maker",
+ bundle_version=None,
msg=f"Executor {executor} reported that the task instance "
"<TaskInstance:
test_process_executor_events_with_callback.dummy_task test [queued]> "
"finished with state failed, but the task instance's state
attribute is queued. "
@@ -2415,10 +2420,12 @@ class TestSchedulerJob:
assert isinstance(orm_dag.next_dagrun_create_after, datetime.datetime)
expected_callback = DagCallbackRequest(
- full_filepath=dr.dag.fileloc,
+ filepath=dr.dag.relative_fileloc,
dag_id=dr.dag_id,
is_failure_callback=True,
run_id=dr.run_id,
+ bundle_name=orm_dag.bundle_name,
+ bundle_version=orm_dag.bundle_version,
msg="timed_out",
)
@@ -2454,10 +2461,12 @@ class TestSchedulerJob:
assert dr.state == State.FAILED
expected_callback = DagCallbackRequest(
- full_filepath=dr.dag.fileloc,
+ filepath=dr.dag.relative_fileloc,
dag_id=dr.dag_id,
is_failure_callback=True,
run_id=dr.run_id,
+ bundle_name=dr.dag.get_bundle_name(),
+ bundle_version=dr.dag.get_bundle_version(),
msg="timed_out",
)
@@ -2528,11 +2537,13 @@ class TestSchedulerJob:
self.job_runner._do_scheduling(session)
expected_callback = DagCallbackRequest(
- full_filepath=dag.fileloc,
+ filepath=dag.relative_fileloc,
dag_id=dr.dag_id,
is_failure_callback=bool(state == State.FAILED),
run_id=dr.run_id,
msg=expected_callback_msg,
+ bundle_name=dag.get_bundle_name(),
+ bundle_version=dag.get_bundle_version(),
)
# Verify dag failure callback request is sent to file processor
@@ -2605,11 +2616,13 @@ class TestSchedulerJob:
)
expected_callback = DagCallbackRequest(
- full_filepath=dag.fileloc,
+ filepath=dag.relative_fileloc,
dag_id=dr.dag_id,
is_failure_callback=True,
run_id=dr.run_id,
msg="timed_out",
+ bundle_name=dag.get_bundle_name(),
+ bundle_version=dag.get_bundle_version(),
)
assert callback == expected_callback
@@ -3286,25 +3299,24 @@ class TestSchedulerJob:
session.close()
@pytest.mark.need_serialized_dag
- def test_retry_still_in_executor(self, dag_maker):
+ def test_retry_still_in_executor(self, dag_maker, session):
"""
Checks if the scheduler does not put a task in limbo, when a task is
retried
but is still present in the executor.
"""
executor = MockExecutor(do_update=False)
- with create_session() as session:
- with dag_maker(
- dag_id="test_retry_still_in_executor",
- schedule="@once",
- session=session,
- ):
- dag_task1 = BashOperator(
- task_id="test_retry_handling_op",
- bash_command="exit 1",
- retries=1,
- )
- dag_maker.dag_model.calculate_dagrun_date_fields(dag_maker.dag,
None)
+ with dag_maker(
+ dag_id="test_retry_still_in_executor",
+ schedule="@once",
+ session=session,
+ ):
+ dag_task1 = BashOperator(
+ task_id="test_retry_handling_op",
+ bash_command="exit 1",
+ retries=1,
+ )
+ dag_maker.dag_model.calculate_dagrun_date_fields(dag_maker.dag, None)
@provide_session
def do_schedule(session):
@@ -5448,6 +5460,8 @@ class TestSchedulerJob:
dagbag = DagBag(dagfile)
dag = dagbag.get_dag("example_branch_operator")
DAG.bulk_write_to_db("testing", None, [dag])
+ SerializedDagModel.write_dag(dag=dag, bundle_name="testing")
+ dag_v = DagVersion.get_latest_version(dag.dag_id)
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dag_run = create_dagrun(
dag,
@@ -5467,7 +5481,7 @@ class TestSchedulerJob:
for task_id in tasks_to_setup:
task = dag.get_task(task_id=task_id)
- ti = TaskInstance(task, run_id=dag_run.run_id,
state=State.RUNNING)
+ ti = TaskInstance(task, run_id=dag_run.run_id,
state=State.RUNNING, dag_version_id=dag_v.id)
ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
ti.queued_by_job_id = 999
@@ -5487,7 +5501,7 @@ class TestSchedulerJob:
callback_requests = executor.callback_sink.send.call_args.args
assert len(callback_requests) == 1
callback_request = callback_requests[0]
- assert callback_request.full_filepath == dag.fileloc
+ assert callback_request.filepath == dag.relative_fileloc
assert callback_request.msg ==
str(self.job_runner._generate_zombie_message_details(ti))
assert callback_request.is_failure_callback is True
assert callback_request.ti.dag_id == ti.dag_id
@@ -5572,6 +5586,7 @@ class TestSchedulerJob:
session.query(Job).delete()
dag = dagbag.get_dag("test_example_bash_operator")
DAG.bulk_write_to_db("testing", None, [dag])
+ SerializedDagModel.write_dag(dag=dag, bundle_name="testing")
data_interval =
dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dag_run = create_dagrun(
dag,
@@ -5581,8 +5596,9 @@ class TestSchedulerJob:
data_interval=data_interval,
)
task = dag.get_task(task_id="run_this_last")
-
- ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
+ ti = TaskInstance(
+ task, run_id=dag_run.run_id, state=State.RUNNING,
dag_version_id=dag_run.dag_version_id
+ )
ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
# TODO: If there was an actual Relationship between TI and Job
@@ -5599,9 +5615,11 @@ class TestSchedulerJob:
expected_failure_callback_requests = [
TaskCallbackRequest(
- full_filepath=dag.fileloc,
+ filepath=dag.relative_fileloc,
ti=ti,
msg=str(self.job_runner._generate_zombie_message_details(ti)),
+ bundle_name="testing",
+ bundle_version=dag_run.bundle_version,
)
]
callback_requests =
scheduler_job.executor.callback_sink.send.call_args.args
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 12bccca74b8..a967f6b2555 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -33,7 +33,9 @@ from airflow.callbacks.callback_requests import
DagCallbackRequest
from airflow.decorators import setup, task, task_group, teardown
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG, DagModel
+from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun, DagRunNote
+from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstanceNote,
clear_task_instances
from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
@@ -95,6 +97,7 @@ class TestDagRun:
logical_date: datetime.datetime | None = None,
is_backfill: bool = False,
state: DagRunState = DagRunState.RUNNING,
+ dag_version: DagVersion | None = None,
session: Session,
):
now = timezone.utcnow()
@@ -118,6 +121,7 @@ class TestDagRun:
start_date=now,
state=state,
external_trigger=False,
+ dag_version=dag_version or
DagVersion.get_latest_version(dag.dag_id, session=session),
triggered_by=DagRunTriggeredByType.TEST,
)
@@ -457,6 +461,7 @@ class TestDagRun:
def on_success_callable(context):
assert context["dag_run"].dag_id ==
"test_dagrun_update_state_with_handle_callback_success"
+ relative_fileloc =
"test_dagrun_update_state_with_handle_callback_success.py"
dag = DAG(
dag_id="test_dagrun_update_state_with_handle_callback_success",
schedule=datetime.timedelta(days=1),
@@ -464,6 +469,10 @@ class TestDagRun:
on_success_callback=on_success_callable,
)
DAG.bulk_write_to_db("testing", None, dags=[dag], session=session)
+ dm = DagModel.get_dagmodel(dag.dag_id, session=session)
+ dm.relative_fileloc = relative_fileloc
+ session.merge(dm)
+ session.commit()
dag_task1 = EmptyOperator(task_id="test_state_succeeded1", dag=dag)
dag_task2 = EmptyOperator(task_id="test_state_succeeded2", dag=dag)
@@ -476,18 +485,23 @@ class TestDagRun:
# Scheduler uses Serialized DAG -- so use that instead of the Actual
DAG
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+ dag.relative_fileloc = relative_fileloc
+ SerializedDagModel.write_dag(dag, bundle_name="testing",
session=session)
+ session.commit()
dag_run = self.create_dag_run(dag=dag,
task_states=initial_task_states, session=session)
+ dag_run.dag_model = dm
_, callback = dag_run.update_state(execute_callbacks=False)
assert dag_run.state == DagRunState.SUCCESS
# Callbacks are not added until handle_callback = False is passed to
dag_run.update_state()
-
assert callback == DagCallbackRequest(
- full_filepath=dag_run.dag.fileloc,
+ filepath=dag_run.dag.relative_fileloc,
dag_id="test_dagrun_update_state_with_handle_callback_success",
run_id=dag_run.run_id,
is_failure_callback=False,
+ bundle_name="testing",
+ bundle_version=None,
msg="success",
)
@@ -495,6 +509,7 @@ class TestDagRun:
def on_failure_callable(context):
assert context["dag_run"].dag_id ==
"test_dagrun_update_state_with_handle_callback_failure"
+ relative_fileloc =
"test_dagrun_update_state_with_handle_callback_failure.py"
dag = DAG(
dag_id="test_dagrun_update_state_with_handle_callback_failure",
schedule=datetime.timedelta(days=1),
@@ -502,6 +517,10 @@ class TestDagRun:
on_failure_callback=on_failure_callable,
)
DAG.bulk_write_to_db("testing", None, dags=[dag], session=session)
+ dm = DagModel.get_dagmodel(dag.dag_id, session=session)
+ dm.relative_fileloc = relative_fileloc
+ session.merge(dm)
+ session.commit()
dag_task1 = EmptyOperator(task_id="test_state_succeeded1", dag=dag)
dag_task2 = EmptyOperator(task_id="test_state_failed2", dag=dag)
@@ -514,19 +533,25 @@ class TestDagRun:
# Scheduler uses Serialized DAG -- so use that instead of the Actual
DAG
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+ dag.relative_fileloc = relative_fileloc
+ SerializedDagModel.write_dag(dag, bundle_name="testing",
session=session)
+ session.commit()
dag_run = self.create_dag_run(dag=dag,
task_states=initial_task_states, session=session)
+ dag_run.dag_model = dm
_, callback = dag_run.update_state(execute_callbacks=False)
assert dag_run.state == DagRunState.FAILED
# Callbacks are not added until handle_callback = False is passed to
dag_run.update_state()
assert callback == DagCallbackRequest(
- full_filepath=dag_run.dag.fileloc,
+ filepath=dag.relative_fileloc,
dag_id="test_dagrun_update_state_with_handle_callback_failure",
run_id=dag_run.run_id,
is_failure_callback=True,
msg="task_failure",
+ bundle_name="testing",
+ bundle_version=None,
)
def test_dagrun_set_state_end_date(self, dag_maker, session):
diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py
index 66c271113a7..f777b3d6af8 100644
--- a/tests_common/pytest_plugin.py
+++ b/tests_common/pytest_plugin.py
@@ -937,7 +937,6 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
kwargs.pop("dag_version", None)
kwargs.pop("triggered_by", None)
kwargs["execution_date"] = logical_date
-
self.dag_run = dag.create_dagrun(**kwargs)
for ti in self.dag_run.task_instances:
ti.refresh_from_task(dag.get_task(ti.task_id))