This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ddd2840e37 Add INFO-level logging to asset scheduling path (#63958)
5ddd2840e37 is described below

commit 5ddd2840e37347f3ee4c0a175556ddc65c340e06
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Mar 21 02:49:57 2026 +0000

    Add INFO-level logging to asset scheduling path (#63958)
    
    The scheduler's asset (dataset) condition evaluation path had zero
    INFO-level logging, making it impossible to debug why asset-triggered
    DagRuns were created or skipped. This was a significant gap discovered
    during a P1 customer incident where the root cause took hours to trace
    because the scheduler was a complete black box for asset scheduling.
    
    Add log lines for:
    - ADRQ records loaded per DAG (count and DAG IDs)
    - Asset condition evaluation result (met/not met per DAG)
    - DAGs deferred due to max_active_runs
    - Asset-triggered DagRun creation (DAG ID, triggered_date, queued count)
    - Consumed asset events count per DagRun
    - ADRQ rows deleted after DagRun creation
    
    * Address review feedback: fix casing in log messages and mypy error
    
    - Change "DAGs" to "Dags" in asset-triggered log messages (dag.py)
    - Fix mypy attr-defined error by casting session.execute() result to
      CursorResult, matching the existing pattern in delete_dag.py
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 31 ++++++++++++++++++----
 airflow-core/src/airflow/models/dag.py             | 14 +++++++++-
 2 files changed, 39 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 8400e13cd79..c25e5e2adff 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -31,7 +31,7 @@ from contextlib import ExitStack
 from datetime import date, datetime, timedelta
 from functools import lru_cache, partial
 from itertools import groupby
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, cast
 
 from sqlalchemy import CTE, and_, delete, exists, func, inspect, or_, select, 
text, tuple_, update
 from sqlalchemy.exc import DBAPIError, OperationalError
@@ -107,6 +107,7 @@ if TYPE_CHECKING:
     from types import FrameType
 
     from pendulum.datetime import DateTime
+    from sqlalchemy.engine import CursorResult
     from sqlalchemy.orm import Session
     from sqlalchemy.orm.interfaces import LoaderOption
     from sqlalchemy.sql.selectable import Subquery
@@ -2042,6 +2043,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 continue
 
             triggered_date: DateTime = 
timezone.coerce_datetime(queued_adrqs[0].created_at)
+            self.log.debug(
+                "Creating asset-triggered DagRun for '%s': %d queued assets, 
triggered_date=%s",
+                dag.dag_id,
+                len(queued_adrqs),
+                triggered_date,
+            )
             cte = (
                 
select(func.max(DagRun.run_after).label("previous_dag_run_run_after"))
                 .where(
@@ -2090,14 +2097,28 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
             Stats.incr("asset.triggered_dagruns")
             dag_run.consumed_asset_events.extend(asset_events)
+            self.log.info(
+                "Created asset-triggered DagRun for '%s': run_id=%s, consumed 
%d asset events",
+                dag.dag_id,
+                dag_run.run_id,
+                len(asset_events),
+            )
 
             # Delete only consumed ADRQ rows to avoid dropping newly queued 
events
             # (e.g. DagRun triggered by asset A while a new event for asset B 
arrives).
             adrq_pks = [(record.asset_id, record.target_dag_id) for record in 
queued_adrqs]
-            session.execute(
-                delete(AssetDagRunQueue).where(
-                    tuple_(AssetDagRunQueue.asset_id, 
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
-                )
+            result = cast(
+                "CursorResult",
+                session.execute(
+                    delete(AssetDagRunQueue).where(
+                        tuple_(AssetDagRunQueue.asset_id, 
AssetDagRunQueue.target_dag_id).in_(adrq_pks)
+                    )
+                ),
+            )
+            self.log.info(
+                "Deleted %d ADRQ rows for '%s'",
+                result.rowcount,
+                dag.dag_id,
             )
 
     def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) 
-> dict[int, Backfill]:
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index fb462aac4e0..677fbc26048 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -665,6 +665,12 @@ class DagModel(Base):
             else:
                 adrq_by_dag[adrq.target_dag_id].append(adrq)
 
+        if adrq_by_dag:
+            log.info(
+                "Asset-triggered Dags with queued events: %s",
+                {dag_id: len(adrqs) for dag_id, adrqs in adrq_by_dag.items()},
+            )
+
         dag_statuses: dict[str, dict[UKey, bool]] = {
             dag_id: {SerializedAssetUniqueKey.from_asset(adrq.asset): True for 
adrq in adrqs}
             for dag_id, adrqs in adrq_by_dag.items()
@@ -673,7 +679,9 @@ class DagModel(Base):
         for ser_dag in ser_dags:
             dag_id = ser_dag.dag_id
             statuses = dag_statuses[dag_id]
-            if not dag_ready(dag_id, 
cond=ser_dag.dag.timetable.asset_condition, statuses=statuses):
+            ready = dag_ready(dag_id, 
cond=ser_dag.dag.timetable.asset_condition, statuses=statuses)
+            if not ready:
+                log.debug("Asset condition not met for dag '%s'", dag_id)
                 del adrq_by_dag[dag_id]
                 del dag_statuses[dag_id]
         del dag_statuses
@@ -698,6 +706,10 @@ class DagModel(Base):
                 )
             )
             if exclusion_list:
+                log.info(
+                    "Asset-triggered Dags at max_active_runs, deferring: %s",
+                    exclusion_list,
+                )
                 asset_triggered_dag_ids -= exclusion_list
                 triggered_date_by_dag = {
                     k: v for k, v in triggered_date_by_dag.items() if k not in 
exclusion_list

Reply via email to