Copilot commented on code in PR #62447:
URL: https://github.com/apache/airflow/pull/62447#discussion_r2850557953


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION) 
-> list[DagRun]:
             ).all()
         )
 
+    @classmethod
+    @provide_session
+    def get_stalled_dag_runs(
+        cls,
+        session: Session = NEW_SESSION,
+        threshold_seconds: float = 3600,
+    ) -> list[DagRun]:
+        """
+        Identify dag runs that have been running longer than the threshold.
+
+        This optimizes scheduler performance by pre-filtering stalled runs
+        so the scheduler can prioritize them for cleanup or alerting.
+        """
+        import time
+
+        # Get the current time
+        current_time = time.time()
+
+        # Query all running dag runs
+        running_dag_runs = session.scalars(
+            select(cls).where(cls.state == DagRunState.RUNNING)
+        ).all()
+
+        # Initialize the result list
+        stalled_runs = []
+
+        # Iterate over each dag run to check if it's stalled
+        for dag_run in running_dag_runs:
+            # Get the task instances for this dag run
+            task_instances = dag_run.get_task_instances(session=session)
+
+            # Check if any task instance is still running
+            has_running_tasks = False
+            for ti in task_instances:
+                if ti.state == TaskInstanceState.RUNNING:
+                    has_running_tasks = True
+                    break
+
+            # Calculate the duration
+            if dag_run.start_date:
+                duration = current_time - dag_run.start_date.timestamp()
+            else:
+                duration = 0
+
+            # Assert that duration is positive
+            assert duration >= 0, "Duration should never be negative"
+
+            # Add to stalled runs if duration exceeds threshold
+            if duration > threshold_seconds and not has_running_tasks:
+                stalled_runs.append(dag_run)
+
+        return stalled_runs
+
+    @staticmethod
+    @lru_cache
+    def get_dag_run_by_run_id(run_id: str, session: Session = NEW_SESSION) -> 
DagRun | None:
+        """Cache and return a DagRun by its run_id for fast repeated 
lookups."""

Review Comment:
   `@lru_cache` will retain strong references to all arguments used as cache 
keys. Since `session` is an argument here, the cache can hold onto Session 
objects (and their connection state) longer than intended and can also return 
stale/detached ORM instances. Consider caching only lightweight identifiers or 
removing `session` from the cached signature.
   ```suggestion
       def get_dag_run_by_run_id(run_id: str, session: Session = NEW_SESSION) 
-> DagRun | None:
           """Return a DagRun by its run_id."""
   ```



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION) 
-> list[DagRun]:
             ).all()
         )
 
+    @classmethod
+    @provide_session
+    def get_stalled_dag_runs(
+        cls,
+        session: Session = NEW_SESSION,
+        threshold_seconds: float = 3600,
+    ) -> list[DagRun]:
+        """
+        Identify dag runs that have been running longer than the threshold.
+
+        This optimizes scheduler performance by pre-filtering stalled runs
+        so the scheduler can prioritize them for cleanup or alerting.
+        """
+        import time
+
+        # Get the current time
+        current_time = time.time()
+
+        # Query all running dag runs
+        running_dag_runs = session.scalars(
+            select(cls).where(cls.state == DagRunState.RUNNING)
+        ).all()
+
+        # Initialize the result list
+        stalled_runs = []
+
+        # Iterate over each dag run to check if it's stalled
+        for dag_run in running_dag_runs:
+            # Get the task instances for this dag run
+            task_instances = dag_run.get_task_instances(session=session)
+
+            # Check if any task instance is still running
+            has_running_tasks = False
+            for ti in task_instances:
+                if ti.state == TaskInstanceState.RUNNING:
+                    has_running_tasks = True
+                    break
+
+            # Calculate the duration
+            if dag_run.start_date:
+                duration = current_time - dag_run.start_date.timestamp()
+            else:
+                duration = 0
+
+            # Assert that duration is positive
+            assert duration >= 0, "Duration should never be negative"
+
+            # Add to stalled runs if duration exceeds threshold
+            if duration > threshold_seconds and not has_running_tasks:
+                stalled_runs.append(dag_run)
+
+        return stalled_runs
+
+    @staticmethod
+    @lru_cache
+    def get_dag_run_by_run_id(run_id: str, session: Session = NEW_SESSION) -> 
DagRun | None:
+        """Cache and return a DagRun by its run_id for fast repeated 
lookups."""

Review Comment:
   `get_dag_run_by_run_id` defaults `session` to `NEW_SESSION` but is not 
decorated with `@provide_session`. Calling it without explicitly passing a real 
Session will fail; add `@provide_session` or require an explicit `session` 
argument.
   ```suggestion
       @provide_session
       def get_dag_run_by_run_id(run_id: str, session: Session = NEW_SESSION) 
-> DagRun | None:
           """Return a DagRun by its run_id."""
   ```



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION) 
-> list[DagRun]:
             ).all()
         )
 
+    @classmethod
+    @provide_session
+    def get_stalled_dag_runs(
+        cls,
+        session: Session = NEW_SESSION,
+        threshold_seconds: float = 3600,
+    ) -> list[DagRun]:
+        """
+        Identify dag runs that have been running longer than the threshold.
+
+        This optimizes scheduler performance by pre-filtering stalled runs
+        so the scheduler can prioritize them for cleanup or alerting.
+        """
+        import time
+
+        # Get the current time
+        current_time = time.time()
+
+        # Query all running dag runs
+        running_dag_runs = session.scalars(
+            select(cls).where(cls.state == DagRunState.RUNNING)
+        ).all()
+
+        # Initialize the result list
+        stalled_runs = []
+
+        # Iterate over each dag run to check if it's stalled
+        for dag_run in running_dag_runs:
+            # Get the task instances for this dag run
+            task_instances = dag_run.get_task_instances(session=session)
+
+            # Check if any task instance is still running
+            has_running_tasks = False
+            for ti in task_instances:
+                if ti.state == TaskInstanceState.RUNNING:
+                    has_running_tasks = True
+                    break
+
+            # Calculate the duration
+            if dag_run.start_date:
+                duration = current_time - dag_run.start_date.timestamp()
+            else:
+                duration = 0
+
+            # Assert that duration is positive
+            assert duration >= 0, "Duration should never be negative"
+
+            # Add to stalled runs if duration exceeds threshold

Review Comment:
   There are several newly-added inline comments here that narrate the next 
line (e.g. “Get the current time”, “Initialize the result list”). This file 
generally relies on clear naming instead; removing these would reduce noise and 
keep the code style consistent.
   ```suggestion
           current_time = time.time()
   
           running_dag_runs = session.scalars(
               select(cls).where(cls.state == DagRunState.RUNNING)
           ).all()
   
           stalled_runs = []
   
           for dag_run in running_dag_runs:
               task_instances = dag_run.get_task_instances(session=session)
   
               has_running_tasks = False
               for ti in task_instances:
                   if ti.state == TaskInstanceState.RUNNING:
                       has_running_tasks = True
                       break
   
               if dag_run.start_date:
                   duration = current_time - dag_run.start_date.timestamp()
               else:
                   duration = 0
   
               assert duration >= 0, "Duration should never be negative"
   ```



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION) 
-> list[DagRun]:
             ).all()
         )
 
+    @classmethod
+    @provide_session
+    def get_stalled_dag_runs(
+        cls,
+        session: Session = NEW_SESSION,
+        threshold_seconds: float = 3600,
+    ) -> list[DagRun]:
+        """
+        Identify dag runs that have been running longer than the threshold.
+
+        This optimizes scheduler performance by pre-filtering stalled runs
+        so the scheduler can prioritize them for cleanup or alerting.
+        """
+        import time
+
+        # Get the current time
+        current_time = time.time()
+
+        # Query all running dag runs
+        running_dag_runs = session.scalars(
+            select(cls).where(cls.state == DagRunState.RUNNING)
+        ).all()

Review Comment:
   `get_stalled_dag_runs` currently selects *all* running DagRuns into memory. 
For large installations this can be expensive and negates the stated scheduler 
optimization; consider filtering by age in SQL (e.g., `start_date < cutoff`) so 
only potentially-stalled runs are fetched.



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION) 
-> list[DagRun]:
             ).all()
         )
 
+    @classmethod
+    @provide_session
+    def get_stalled_dag_runs(
+        cls,
+        session: Session = NEW_SESSION,
+        threshold_seconds: float = 3600,
+    ) -> list[DagRun]:
+        """
+        Identify dag runs that have been running longer than the threshold.
+
+        This optimizes scheduler performance by pre-filtering stalled runs
+        so the scheduler can prioritize them for cleanup or alerting.
+        """
+        import time
+
+        # Get the current time
+        current_time = time.time()
+
+        # Query all running dag runs
+        running_dag_runs = session.scalars(
+            select(cls).where(cls.state == DagRunState.RUNNING)
+        ).all()
+
+        # Initialize the result list
+        stalled_runs = []
+
+        # Iterate over each dag run to check if it's stalled
+        for dag_run in running_dag_runs:
+            # Get the task instances for this dag run
+            task_instances = dag_run.get_task_instances(session=session)
+
+            # Check if any task instance is still running
+            has_running_tasks = False
+            for ti in task_instances:
+                if ti.state == TaskInstanceState.RUNNING:
+                    has_running_tasks = True

Review Comment:
   The "no active tasks" check only considers `TaskInstanceState.RUNNING`. A 
DagRun with tasks in other unfinished states (e.g. 
QUEUED/RESTARTING/DEFERRED/None) would be classified as stalled incorrectly. 
This should likely check `State.unfinished` (or a documented set of "active" 
states) instead.



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION) 
-> list[DagRun]:
             ).all()
         )
 
+    @classmethod
+    @provide_session
+    def get_stalled_dag_runs(
+        cls,
+        session: Session = NEW_SESSION,
+        threshold_seconds: float = 3600,
+    ) -> list[DagRun]:
+        """
+        Identify dag runs that have been running longer than the threshold.
+
+        This optimizes scheduler performance by pre-filtering stalled runs
+        so the scheduler can prioritize them for cleanup or alerting.
+        """
+        import time
+
+        # Get the current time
+        current_time = time.time()
+
+        # Query all running dag runs
+        running_dag_runs = session.scalars(
+            select(cls).where(cls.state == DagRunState.RUNNING)
+        ).all()
+
+        # Initialize the result list
+        stalled_runs = []
+
+        # Iterate over each dag run to check if it's stalled
+        for dag_run in running_dag_runs:
+            # Get the task instances for this dag run
+            task_instances = dag_run.get_task_instances(session=session)
+
+            # Check if any task instance is still running
+            has_running_tasks = False
+            for ti in task_instances:
+                if ti.state == TaskInstanceState.RUNNING:
+                    has_running_tasks = True
+                    break
+
+            # Calculate the duration
+            if dag_run.start_date:
+                duration = current_time - dag_run.start_date.timestamp()
+            else:
+                duration = 0
+
+            # Assert that duration is positive
+            assert duration >= 0, "Duration should never be negative"
+

Review Comment:
   Calling `dag_run.get_task_instances()` inside the loop makes this an N+1 
query pattern (one query for DagRuns + one per DagRun for TIs). This can be 
expressed as a single query using `NOT EXISTS`/outer join against 
`TaskInstance` filtered to unfinished states to keep scheduling throughput high.
   ```suggestion
   
           # Query all running dag runs
           running_dag_runs = session.scalars(
               select(cls).where(cls.state == DagRunState.RUNNING)
           ).all()
   
           if not running_dag_runs:
               return []
   
           # Collect dag_id and run_id for all running dag runs
           dag_ids = {dag_run.dag_id for dag_run in running_dag_runs}
           run_ids = {dag_run.run_id for dag_run in running_dag_runs}
   
           # Find all (dag_id, run_id) pairs that currently have running task 
instances.
           # This avoids an N+1 pattern of querying task instances per dag run.
           rows = session.execute(
               select(
                   TI.dag_id.label("dag_id"),
                   TI.run_id.label("run_id"),
               ).where(
                   and_(
                       TI.state == TaskInstanceState.RUNNING,
                       TI.dag_id.in_(dag_ids),
                       TI.run_id.in_(run_ids),
                   )
               )
           ).all()
   
           running_ti_keys = {(row.dag_id, row.run_id) for row in rows}
   
           # Get the current time as an aware datetime for duration calculations
           now = timezone.utcnow()
   
           # Initialize the result list
           stalled_runs = []
   
           # Iterate over each dag run to check if it's stalled
           for dag_run in running_dag_runs:
               # Skip dag runs without a start_date; duration cannot be 
computed meaningfully.
               if not dag_run.start_date:
                   continue
   
               # Check if any task instance is still running for this dag run
               has_running_tasks = (dag_run.dag_id, dag_run.run_id) in 
running_ti_keys
   
               # Calculate the duration in seconds
               duration = (now - dag_run.start_date).total_seconds()
               if duration < 0:
                   # Ignore dag runs with inconsistent timestamps
                   continue
   ```



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION) 
-> list[DagRun]:
             ).all()
         )
 
+    @classmethod
+    @provide_session
+    def get_stalled_dag_runs(
+        cls,
+        session: Session = NEW_SESSION,
+        threshold_seconds: float = 3600,
+    ) -> list[DagRun]:
+        """
+        Identify dag runs that have been running longer than the threshold.
+
+        This optimizes scheduler performance by pre-filtering stalled runs
+        so the scheduler can prioritize them for cleanup or alerting.
+        """
+        import time
+
+        # Get the current time
+        current_time = time.time()
+
+        # Query all running dag runs
+        running_dag_runs = session.scalars(
+            select(cls).where(cls.state == DagRunState.RUNNING)
+        ).all()
+
+        # Initialize the result list
+        stalled_runs = []
+
+        # Iterate over each dag run to check if it's stalled
+        for dag_run in running_dag_runs:
+            # Get the task instances for this dag run
+            task_instances = dag_run.get_task_instances(session=session)
+
+            # Check if any task instance is still running
+            has_running_tasks = False
+            for ti in task_instances:
+                if ti.state == TaskInstanceState.RUNNING:
+                    has_running_tasks = True
+                    break
+
+            # Calculate the duration
+            if dag_run.start_date:
+                duration = current_time - dag_run.start_date.timestamp()
+            else:
+                duration = 0
+
+            # Assert that duration is positive
+            assert duration >= 0, "Duration should never be negative"
+
+            # Add to stalled runs if duration exceeds threshold
+            if duration > threshold_seconds and not has_running_tasks:
+                stalled_runs.append(dag_run)
+
+        return stalled_runs
+
+    @staticmethod
+    @lru_cache
+    def get_dag_run_by_run_id(run_id: str, session: Session = NEW_SESSION) -> 
DagRun | None:
+        """Cache and return a DagRun by its run_id for fast repeated 
lookups."""
+        return session.scalar(
+            select(DagRun).where(DagRun.run_id == run_id)

Review Comment:
   This lookup filters only on `run_id`, but `run_id` is only unique *per dag*; 
across multiple DAGs the same `run_id` can exist, so this can return the wrong 
DagRun. The query (and cache key) should include `dag_id` (or otherwise use a 
globally unique identifier like DagRun.id) to avoid collisions.
   ```suggestion
       @lru_cache(maxsize=1024)
       def get_dag_run_by_run_id(dag_id: str, run_id: str, session: Session = 
NEW_SESSION) -> DagRun | None:
           """Cache and return a DagRun by its dag_id and run_id for fast 
repeated lookups."""
           return session.scalar(
               select(DagRun).where(and_(DagRun.dag_id == dag_id, DagRun.run_id 
== run_id))
   ```



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION) 
-> list[DagRun]:
             ).all()
         )
 
+    @classmethod
+    @provide_session
+    def get_stalled_dag_runs(
+        cls,
+        session: Session = NEW_SESSION,
+        threshold_seconds: float = 3600,
+    ) -> list[DagRun]:
+        """
+        Identify dag runs that have been running longer than the threshold.
+
+        This optimizes scheduler performance by pre-filtering stalled runs
+        so the scheduler can prioritize them for cleanup or alerting.
+        """
+        import time
+
+        # Get the current time
+        current_time = time.time()
+
+        # Query all running dag runs
+        running_dag_runs = session.scalars(
+            select(cls).where(cls.state == DagRunState.RUNNING)
+        ).all()
+
+        # Initialize the result list
+        stalled_runs = []
+
+        # Iterate over each dag run to check if it's stalled
+        for dag_run in running_dag_runs:
+            # Get the task instances for this dag run
+            task_instances = dag_run.get_task_instances(session=session)
+
+            # Check if any task instance is still running
+            has_running_tasks = False
+            for ti in task_instances:
+                if ti.state == TaskInstanceState.RUNNING:
+                    has_running_tasks = True
+                    break
+
+            # Calculate the duration
+            if dag_run.start_date:
+                duration = current_time - dag_run.start_date.timestamp()
+            else:

Review Comment:
   Computing duration via `start_date.timestamp()` can be error-prone with 
timezone-aware datetimes; using `timezone.utcnow() - dag_run.start_date` is 
simpler and avoids timestamp conversions.



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION) 
-> list[DagRun]:
             ).all()
         )
 
+    @classmethod
+    @provide_session
+    def get_stalled_dag_runs(
+        cls,
+        session: Session = NEW_SESSION,
+        threshold_seconds: float = 3600,
+    ) -> list[DagRun]:
+        """
+        Identify dag runs that have been running longer than the threshold.
+
+        This optimizes scheduler performance by pre-filtering stalled runs
+        so the scheduler can prioritize them for cleanup or alerting.
+        """
+        import time
+
+        # Get the current time
+        current_time = time.time()

Review Comment:
   `import time` and `time.time()` here are unnecessary and inconsistent with 
the rest of the file’s timezone-aware datetime usage. Prefer 
`timezone.utcnow()` and datetime arithmetic, and keep standard-library imports 
at module scope unless there’s a specific isolation/circular-import need.



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION) 
-> list[DagRun]:
             ).all()
         )
 
+    @classmethod
+    @provide_session
+    def get_stalled_dag_runs(
+        cls,
+        session: Session = NEW_SESSION,
+        threshold_seconds: float = 3600,
+    ) -> list[DagRun]:
+        """
+        Identify dag runs that have been running longer than the threshold.
+
+        This optimizes scheduler performance by pre-filtering stalled runs
+        so the scheduler can prioritize them for cleanup or alerting.
+        """
+        import time
+
+        # Get the current time
+        current_time = time.time()
+
+        # Query all running dag runs
+        running_dag_runs = session.scalars(
+            select(cls).where(cls.state == DagRunState.RUNNING)
+        ).all()
+
+        # Initialize the result list
+        stalled_runs = []
+
+        # Iterate over each dag run to check if it's stalled
+        for dag_run in running_dag_runs:
+            # Get the task instances for this dag run
+            task_instances = dag_run.get_task_instances(session=session)
+
+            # Check if any task instance is still running
+            has_running_tasks = False
+            for ti in task_instances:
+                if ti.state == TaskInstanceState.RUNNING:
+                    has_running_tasks = True
+                    break
+
+            # Calculate the duration
+            if dag_run.start_date:
+                duration = current_time - dag_run.start_date.timestamp()
+            else:
+                duration = 0
+
+            # Assert that duration is positive
+            assert duration >= 0, "Duration should never be negative"

Review Comment:
   Avoid using `assert` in production code (it can be stripped with Python 
`-O`). If negative durations are possible due to clock skew or bad data, handle 
it explicitly (e.g., clamp to 0 or skip) instead of asserting.
   ```suggestion
               # Ensure duration is non-negative even in case of clock skew or 
bad data
               if duration < 0:
                   duration = 0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to