This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 98e2f6aed3e Fix DAG disappearing after callback execution in stale
detection (#55698)
98e2f6aed3e is described below
commit 98e2f6aed3efdbaa4663cc8fed7d2874d9fcee3b
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Sep 16 19:08:41 2025 +0100
Fix DAG disappearing after callback execution in stale detection (#55698)
When `min_file_process_interval` exceeds `stale_dag_threshold`, DAGs would
incorrectly disappear after callback processing. This occurred because
callback-only processing updated file processing timestamps but not DAG
parsing timestamps, causing stale DAG detection to trigger false positives.
The fix prevents callback-only processing from updating `last_finish_time`,
ensuring DAGs remain active when only callbacks are executed.
fixes #55315
---
airflow-core/src/airflow/dag_processing/manager.py | 33 +++++++++---
.../src/airflow/dag_processing/processor.py | 2 +
.../tests/unit/dag_processing/test_processor.py | 58 ++++++++++++++++++++++
3 files changed, 87 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 725e354610e..73f74c415ab 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -817,6 +817,12 @@ class DagFileProcessorManager(LoggingMixin):
continue
finished.append(file)
+ # Detect if this was callback-only processing
+ # For such-cases, we don't serialize the dags and hence send
parsing_result as None.
+ is_callback_only = proc.had_callbacks and proc.parsing_result is
None
+ if is_callback_only:
+ self.log.debug("Detected callback-only processing for %s",
file)
+
# Collect the DAGS and import errors into the DB, emit metrics etc.
self._file_stats[file] = process_parse_results(
run_duration=time.monotonic() - proc.start_time,
@@ -826,6 +832,7 @@ class DagFileProcessorManager(LoggingMixin):
bundle_version=self._bundle_versions[file.bundle_name],
parsing_result=proc.parsing_result,
session=session,
+ is_callback_only=is_callback_only,
)
for file in finished:
@@ -1109,13 +1116,24 @@ def process_parse_results(
bundle_version: str | None,
parsing_result: DagFileParsingResult | None,
session: Session,
+ *,
+ is_callback_only: bool = False,
) -> DagFileStat:
"""Take the parsing result and stats about the parser process and convert
it into a DagFileStat."""
- stat = DagFileStat(
- last_finish_time=finish_time,
- last_duration=run_duration,
- run_count=run_count + 1,
- )
+ if is_callback_only:
+ # Callback-only processing - don't update timestamps to avoid stale
DAG detection issues
+ stat = DagFileStat(
+ last_duration=run_duration,
+ run_count=run_count, # Don't increment for callback-only
processing
+ )
+ Stats.incr("dag_processing.callback_only_count")
+ else:
+ # Actual DAG parsing or import error
+ stat = DagFileStat(
+ last_finish_time=finish_time,
+ last_duration=run_duration,
+ run_count=run_count + 1,
+ )
# TODO: AIP-66 emit metrics
# file_name = Path(dag_file.path).stem
@@ -1123,7 +1141,10 @@ def process_parse_results(
# Stats.timing("dag_processing.last_duration", stat.last_duration,
tags={"file_name": file_name})
if parsing_result is None:
- stat.import_errors = 1
+ # No DAGs were parsed - this happens for callback-only processing
+ # Don't treat this as an import error when it's callback-only
+ if not is_callback_only:
+ stat.import_errors = 1
else:
# record DAGs and import errors to database
import_errors = {}
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index 616bd1abbe9..a86d308c8c5 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -438,6 +438,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
logger_filehandle: BinaryIO
parsing_result: DagFileParsingResult | None = None
decoder: ClassVar[TypeAdapter[ToManager]] =
TypeAdapter[ToManager](ToManager)
+ had_callbacks: bool = False # Track if this process was started with
callbacks to prevent stale DAG detection false positives
client: Client
"""The HTTP client to use for communication with the API server."""
@@ -458,6 +459,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
_pre_import_airflow_modules(os.fspath(path), logger)
proc: Self = super().start(target=target, client=client, **kwargs)
+ proc.had_callbacks = bool(callbacks) # Track if this process had
callbacks
proc._on_child_started(callbacks, path, bundle_path)
return proc
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 37d9236face..8acba729f05 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -47,6 +47,7 @@ from airflow.callbacks.callback_requests import (
EmailNotificationRequest,
TaskCallbackRequest,
)
+from airflow.dag_processing.manager import process_parse_results
from airflow.dag_processing.processor import (
DagFileParseRequest,
DagFileParsingResult,
@@ -581,6 +582,63 @@ def test_parse_file_with_task_callbacks(spy_agency):
assert called is True
+def test_callback_processing_does_not_update_timestamps(session):
+ """Callback processing should not update last_finish_time to prevent stale
DAG detection."""
+ stat = process_parse_results(
+ run_duration=1.0,
+ finish_time=timezone.utcnow(),
+ run_count=5,
+ bundle_name="test",
+ bundle_version=None,
+ parsing_result=None,
+ session=session,
+ is_callback_only=True,
+ )
+
+ assert stat.last_finish_time is None
+ assert stat.run_count == 5
+
+
+def test_normal_parsing_updates_timestamps(session):
+ """last_finish_time should be updated when parsing a dag file."""
+ finish_time = timezone.utcnow()
+
+ stat = process_parse_results(
+ run_duration=2.0,
+ finish_time=finish_time,
+ run_count=3,
+ bundle_name="test-bundle",
+ bundle_version="v1",
+ parsing_result=DagFileParsingResult(fileloc="test.py",
serialized_dags=[]),
+ session=session,
+ is_callback_only=False,
+ )
+
+ assert stat.last_finish_time == finish_time
+ assert stat.run_count == 4
+ assert stat.import_errors == 0
+
+
+def test_import_error_updates_timestamps(session):
+ """last_finish_time should be updated when parsing a dag file results in
import errors."""
+ finish_time = timezone.utcnow()
+
+ stat = process_parse_results(
+ run_duration=1.5,
+ finish_time=finish_time,
+ run_count=2,
+ bundle_name="test-bundle",
+ bundle_version="v1",
+ parsing_result=None,
+ session=session,
+ is_callback_only=False,
+ )
+
+ assert stat.last_finish_time == finish_time
+ assert stat.run_count == 3
+ assert stat.import_errors == 1
+
+
class TestExecuteDagCallbacks:
"""Test the _execute_dag_callbacks function with context_from_server"""