This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 98e2f6aed3e Fix DAG disappearing after callback execution in stale 
detection (#55698)
98e2f6aed3e is described below

commit 98e2f6aed3efdbaa4663cc8fed7d2874d9fcee3b
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Sep 16 19:08:41 2025 +0100

    Fix DAG disappearing after callback execution in stale detection (#55698)
    
    When `min_file_process_interval` exceeds `stale_dag_threshold`, DAGs would
    incorrectly disappear after callback processing. This occurred because
    callback-only processing updated file processing timestamps but not DAG
    parsing timestamps, causing stale DAG detection to trigger false positives.
    
    The fix prevents callback-only processing from updating `last_finish_time`,
    ensuring DAGs remain active when only callbacks are executed.
    
    fixes #55315
---
 airflow-core/src/airflow/dag_processing/manager.py | 33 +++++++++---
 .../src/airflow/dag_processing/processor.py        |  2 +
 .../tests/unit/dag_processing/test_processor.py    | 58 ++++++++++++++++++++++
 3 files changed, 87 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 725e354610e..73f74c415ab 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -817,6 +817,12 @@ class DagFileProcessorManager(LoggingMixin):
                 continue
             finished.append(file)
 
+            # Detect if this was callback-only processing
+            # For such-cases, we don't serialize the dags and hence send 
parsing_result as None.
+            is_callback_only = proc.had_callbacks and proc.parsing_result is 
None
+            if is_callback_only:
+                self.log.debug("Detected callback-only processing for %s", 
file)
+
             # Collect the DAGS and import errors into the DB, emit metrics etc.
             self._file_stats[file] = process_parse_results(
                 run_duration=time.monotonic() - proc.start_time,
@@ -826,6 +832,7 @@ class DagFileProcessorManager(LoggingMixin):
                 bundle_version=self._bundle_versions[file.bundle_name],
                 parsing_result=proc.parsing_result,
                 session=session,
+                is_callback_only=is_callback_only,
             )
 
         for file in finished:
@@ -1109,13 +1116,24 @@ def process_parse_results(
     bundle_version: str | None,
     parsing_result: DagFileParsingResult | None,
     session: Session,
+    *,
+    is_callback_only: bool = False,
 ) -> DagFileStat:
     """Take the parsing result and stats about the parser process and convert 
it into a DagFileStat."""
-    stat = DagFileStat(
-        last_finish_time=finish_time,
-        last_duration=run_duration,
-        run_count=run_count + 1,
-    )
+    if is_callback_only:
+        # Callback-only processing - don't update timestamps to avoid stale 
DAG detection issues
+        stat = DagFileStat(
+            last_duration=run_duration,
+            run_count=run_count,  # Don't increment for callback-only 
processing
+        )
+        Stats.incr("dag_processing.callback_only_count")
+    else:
+        # Actual DAG parsing or import error
+        stat = DagFileStat(
+            last_finish_time=finish_time,
+            last_duration=run_duration,
+            run_count=run_count + 1,
+        )
 
     # TODO: AIP-66 emit metrics
     # file_name = Path(dag_file.path).stem
@@ -1123,7 +1141,10 @@ def process_parse_results(
     # Stats.timing("dag_processing.last_duration", stat.last_duration, 
tags={"file_name": file_name})
 
     if parsing_result is None:
-        stat.import_errors = 1
+        # No DAGs were parsed - this happens for callback-only processing
+        # Don't treat this as an import error when it's callback-only
+        if not is_callback_only:
+            stat.import_errors = 1
     else:
         # record DAGs and import errors to database
         import_errors = {}
diff --git a/airflow-core/src/airflow/dag_processing/processor.py 
b/airflow-core/src/airflow/dag_processing/processor.py
index 616bd1abbe9..a86d308c8c5 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -438,6 +438,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
     logger_filehandle: BinaryIO
     parsing_result: DagFileParsingResult | None = None
     decoder: ClassVar[TypeAdapter[ToManager]] = 
TypeAdapter[ToManager](ToManager)
+    had_callbacks: bool = False  # Track if this process was started with 
callbacks to prevent stale DAG detection false positives
 
     client: Client
     """The HTTP client to use for communication with the API server."""
@@ -458,6 +459,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
         _pre_import_airflow_modules(os.fspath(path), logger)
 
         proc: Self = super().start(target=target, client=client, **kwargs)
+        proc.had_callbacks = bool(callbacks)  # Track if this process had 
callbacks
         proc._on_child_started(callbacks, path, bundle_path)
         return proc
 
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 37d9236face..8acba729f05 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -47,6 +47,7 @@ from airflow.callbacks.callback_requests import (
     EmailNotificationRequest,
     TaskCallbackRequest,
 )
+from airflow.dag_processing.manager import process_parse_results
 from airflow.dag_processing.processor import (
     DagFileParseRequest,
     DagFileParsingResult,
@@ -581,6 +582,63 @@ def test_parse_file_with_task_callbacks(spy_agency):
     assert called is True
 
 
+def test_callback_processing_does_not_update_timestamps(session):
+    """Callback processing should not update last_finish_time to prevent stale 
DAG detection."""
+    stat = process_parse_results(
+        run_duration=1.0,
+        finish_time=timezone.utcnow(),
+        run_count=5,
+        bundle_name="test",
+        bundle_version=None,
+        parsing_result=None,
+        session=session,
+        is_callback_only=True,
+    )
+
+    assert stat.last_finish_time is None
+    assert stat.run_count == 5
+
+
+def test_normal_parsing_updates_timestamps(session):
+    """last_finish_time should be updated when parsing a dag file."""
+    finish_time = timezone.utcnow()
+
+    stat = process_parse_results(
+        run_duration=2.0,
+        finish_time=finish_time,
+        run_count=3,
+        bundle_name="test-bundle",
+        bundle_version="v1",
+        parsing_result=DagFileParsingResult(fileloc="test.py", 
serialized_dags=[]),
+        session=session,
+        is_callback_only=False,
+    )
+
+    assert stat.last_finish_time == finish_time
+    assert stat.run_count == 4
+    assert stat.import_errors == 0
+
+
+def test_import_error_updates_timestamps(session):
+    """last_finish_time should be updated when parsing a dag file results in 
import errors."""
+    finish_time = timezone.utcnow()
+
+    stat = process_parse_results(
+        run_duration=1.5,
+        finish_time=finish_time,
+        run_count=2,
+        bundle_name="test-bundle",
+        bundle_version="v1",
+        parsing_result=None,
+        session=session,
+        is_callback_only=False,
+    )
+
+    assert stat.last_finish_time == finish_time
+    assert stat.run_count == 3
+    assert stat.import_errors == 1
+
+
 class TestExecuteDagCallbacks:
     """Test the _execute_dag_callbacks function with context_from_server"""
 

Reply via email to