Copilot commented on code in PR #62447:
URL: https://github.com/apache/airflow/pull/62447#discussion_r2850557953
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION)
-> list[DagRun]:
).all()
)
+ @classmethod
+ @provide_session
+ def get_stalled_dag_runs(
+ cls,
+ session: Session = NEW_SESSION,
+ threshold_seconds: float = 3600,
+ ) -> list[DagRun]:
+ """
+ Identify dag runs that have been running longer than the threshold.
+
+ This optimizes scheduler performance by pre-filtering stalled runs
+ so the scheduler can prioritize them for cleanup or alerting.
+ """
+ import time
+
+ # Get the current time
+ current_time = time.time()
+
+ # Query all running dag runs
+ running_dag_runs = session.scalars(
+ select(cls).where(cls.state == DagRunState.RUNNING)
+ ).all()
+
+ # Initialize the result list
+ stalled_runs = []
+
+ # Iterate over each dag run to check if it's stalled
+ for dag_run in running_dag_runs:
+ # Get the task instances for this dag run
+ task_instances = dag_run.get_task_instances(session=session)
+
+ # Check if any task instance is still running
+ has_running_tasks = False
+ for ti in task_instances:
+ if ti.state == TaskInstanceState.RUNNING:
+ has_running_tasks = True
+ break
+
+ # Calculate the duration
+ if dag_run.start_date:
+ duration = current_time - dag_run.start_date.timestamp()
+ else:
+ duration = 0
+
+ # Assert that duration is positive
+ assert duration >= 0, "Duration should never be negative"
+
+ # Add to stalled runs if duration exceeds threshold
+ if duration > threshold_seconds and not has_running_tasks:
+ stalled_runs.append(dag_run)
+
+ return stalled_runs
+
+ @staticmethod
+ @lru_cache
+ def get_dag_run_by_run_id(run_id: str, session: Session = NEW_SESSION) ->
DagRun | None:
+ """Cache and return a DagRun by its run_id for fast repeated
lookups."""
Review Comment:
`@lru_cache` will retain strong references to all arguments used as cache
keys. Since `session` is an argument here, the cache can hold onto Session
objects (and their connection state) longer than intended and can also return
stale/detached ORM instances. Consider caching only lightweight identifiers or
removing `session` from the cached signature.
```suggestion
def get_dag_run_by_run_id(run_id: str, session: Session = NEW_SESSION)
-> DagRun | None:
"""Return a DagRun by its run_id."""
```
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION)
-> list[DagRun]:
).all()
)
+ @classmethod
+ @provide_session
+ def get_stalled_dag_runs(
+ cls,
+ session: Session = NEW_SESSION,
+ threshold_seconds: float = 3600,
+ ) -> list[DagRun]:
+ """
+ Identify dag runs that have been running longer than the threshold.
+
+ This optimizes scheduler performance by pre-filtering stalled runs
+ so the scheduler can prioritize them for cleanup or alerting.
+ """
+ import time
+
+ # Get the current time
+ current_time = time.time()
+
+ # Query all running dag runs
+ running_dag_runs = session.scalars(
+ select(cls).where(cls.state == DagRunState.RUNNING)
+ ).all()
+
+ # Initialize the result list
+ stalled_runs = []
+
+ # Iterate over each dag run to check if it's stalled
+ for dag_run in running_dag_runs:
+ # Get the task instances for this dag run
+ task_instances = dag_run.get_task_instances(session=session)
+
+ # Check if any task instance is still running
+ has_running_tasks = False
+ for ti in task_instances:
+ if ti.state == TaskInstanceState.RUNNING:
+ has_running_tasks = True
+ break
+
+ # Calculate the duration
+ if dag_run.start_date:
+ duration = current_time - dag_run.start_date.timestamp()
+ else:
+ duration = 0
+
+ # Assert that duration is positive
+ assert duration >= 0, "Duration should never be negative"
+
+ # Add to stalled runs if duration exceeds threshold
+ if duration > threshold_seconds and not has_running_tasks:
+ stalled_runs.append(dag_run)
+
+ return stalled_runs
+
+ @staticmethod
+ @lru_cache
+ def get_dag_run_by_run_id(run_id: str, session: Session = NEW_SESSION) ->
DagRun | None:
+ """Cache and return a DagRun by its run_id for fast repeated
lookups."""
Review Comment:
`get_dag_run_by_run_id` defaults `session` to `NEW_SESSION` but is not
decorated with `@provide_session`. Calling it without explicitly passing a real
Session will fail; add `@provide_session` or require an explicit `session`
argument.
```suggestion
@provide_session
def get_dag_run_by_run_id(run_id: str, session: Session = NEW_SESSION)
-> DagRun | None:
"""Return a DagRun by its run_id."""
```
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION)
-> list[DagRun]:
).all()
)
+ @classmethod
+ @provide_session
+ def get_stalled_dag_runs(
+ cls,
+ session: Session = NEW_SESSION,
+ threshold_seconds: float = 3600,
+ ) -> list[DagRun]:
+ """
+ Identify dag runs that have been running longer than the threshold.
+
+ This optimizes scheduler performance by pre-filtering stalled runs
+ so the scheduler can prioritize them for cleanup or alerting.
+ """
+ import time
+
+ # Get the current time
+ current_time = time.time()
+
+ # Query all running dag runs
+ running_dag_runs = session.scalars(
+ select(cls).where(cls.state == DagRunState.RUNNING)
+ ).all()
+
+ # Initialize the result list
+ stalled_runs = []
+
+ # Iterate over each dag run to check if it's stalled
+ for dag_run in running_dag_runs:
+ # Get the task instances for this dag run
+ task_instances = dag_run.get_task_instances(session=session)
+
+ # Check if any task instance is still running
+ has_running_tasks = False
+ for ti in task_instances:
+ if ti.state == TaskInstanceState.RUNNING:
+ has_running_tasks = True
+ break
+
+ # Calculate the duration
+ if dag_run.start_date:
+ duration = current_time - dag_run.start_date.timestamp()
+ else:
+ duration = 0
+
+ # Assert that duration is positive
+ assert duration >= 0, "Duration should never be negative"
+
+ # Add to stalled runs if duration exceeds threshold
Review Comment:
There are several newly-added inline comments here that narrate the next
line (e.g. “Get the current time”, “Initialize the result list”). This file
generally relies on clear naming instead; removing these would reduce noise and
keep the code style consistent.
```suggestion
current_time = time.time()
running_dag_runs = session.scalars(
select(cls).where(cls.state == DagRunState.RUNNING)
).all()
stalled_runs = []
for dag_run in running_dag_runs:
task_instances = dag_run.get_task_instances(session=session)
has_running_tasks = False
for ti in task_instances:
if ti.state == TaskInstanceState.RUNNING:
has_running_tasks = True
break
if dag_run.start_date:
duration = current_time - dag_run.start_date.timestamp()
else:
duration = 0
assert duration >= 0, "Duration should never be negative"
```
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION)
-> list[DagRun]:
).all()
)
+ @classmethod
+ @provide_session
+ def get_stalled_dag_runs(
+ cls,
+ session: Session = NEW_SESSION,
+ threshold_seconds: float = 3600,
+ ) -> list[DagRun]:
+ """
+ Identify dag runs that have been running longer than the threshold.
+
+ This optimizes scheduler performance by pre-filtering stalled runs
+ so the scheduler can prioritize them for cleanup or alerting.
+ """
+ import time
+
+ # Get the current time
+ current_time = time.time()
+
+ # Query all running dag runs
+ running_dag_runs = session.scalars(
+ select(cls).where(cls.state == DagRunState.RUNNING)
+ ).all()
Review Comment:
`get_stalled_dag_runs` currently selects *all* running DagRuns into memory.
For large installations this can be expensive and negates the stated scheduler
optimization; consider filtering by age in SQL (e.g., `start_date < cutoff`) so
only potentially-stalled runs are fetched.
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION)
-> list[DagRun]:
).all()
)
+ @classmethod
+ @provide_session
+ def get_stalled_dag_runs(
+ cls,
+ session: Session = NEW_SESSION,
+ threshold_seconds: float = 3600,
+ ) -> list[DagRun]:
+ """
+ Identify dag runs that have been running longer than the threshold.
+
+ This optimizes scheduler performance by pre-filtering stalled runs
+ so the scheduler can prioritize them for cleanup or alerting.
+ """
+ import time
+
+ # Get the current time
+ current_time = time.time()
+
+ # Query all running dag runs
+ running_dag_runs = session.scalars(
+ select(cls).where(cls.state == DagRunState.RUNNING)
+ ).all()
+
+ # Initialize the result list
+ stalled_runs = []
+
+ # Iterate over each dag run to check if it's stalled
+ for dag_run in running_dag_runs:
+ # Get the task instances for this dag run
+ task_instances = dag_run.get_task_instances(session=session)
+
+ # Check if any task instance is still running
+ has_running_tasks = False
+ for ti in task_instances:
+ if ti.state == TaskInstanceState.RUNNING:
+ has_running_tasks = True
Review Comment:
The "no active tasks" check only considers `TaskInstanceState.RUNNING`. A
DagRun with tasks in other unfinished states (e.g.
QUEUED/RESTARTING/DEFERRED/None) would be classified as stalled incorrectly.
This should likely check `State.unfinished` (or a documented set of "active"
states) instead.
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION)
-> list[DagRun]:
).all()
)
+ @classmethod
+ @provide_session
+ def get_stalled_dag_runs(
+ cls,
+ session: Session = NEW_SESSION,
+ threshold_seconds: float = 3600,
+ ) -> list[DagRun]:
+ """
+ Identify dag runs that have been running longer than the threshold.
+
+ This optimizes scheduler performance by pre-filtering stalled runs
+ so the scheduler can prioritize them for cleanup or alerting.
+ """
+ import time
+
+ # Get the current time
+ current_time = time.time()
+
+ # Query all running dag runs
+ running_dag_runs = session.scalars(
+ select(cls).where(cls.state == DagRunState.RUNNING)
+ ).all()
+
+ # Initialize the result list
+ stalled_runs = []
+
+ # Iterate over each dag run to check if it's stalled
+ for dag_run in running_dag_runs:
+ # Get the task instances for this dag run
+ task_instances = dag_run.get_task_instances(session=session)
+
+ # Check if any task instance is still running
+ has_running_tasks = False
+ for ti in task_instances:
+ if ti.state == TaskInstanceState.RUNNING:
+ has_running_tasks = True
+ break
+
+ # Calculate the duration
+ if dag_run.start_date:
+ duration = current_time - dag_run.start_date.timestamp()
+ else:
+ duration = 0
+
+ # Assert that duration is positive
+ assert duration >= 0, "Duration should never be negative"
+
Review Comment:
Calling `dag_run.get_task_instances()` inside the loop makes this an N+1
query pattern (one query for DagRuns + one per DagRun for TIs). This can be
expressed as a single query using `NOT EXISTS`/outer join against
`TaskInstance` filtered to unfinished states to keep scheduling throughput high.
```suggestion
# Query all running dag runs
running_dag_runs = session.scalars(
select(cls).where(cls.state == DagRunState.RUNNING)
).all()
if not running_dag_runs:
return []
# Collect dag_id and run_id for all running dag runs
dag_ids = {dag_run.dag_id for dag_run in running_dag_runs}
run_ids = {dag_run.run_id for dag_run in running_dag_runs}
# Find all (dag_id, run_id) pairs that currently have running task
instances.
# This avoids an N+1 pattern of querying task instances per dag run.
rows = session.execute(
select(
TI.dag_id.label("dag_id"),
TI.run_id.label("run_id"),
).where(
and_(
TI.state == TaskInstanceState.RUNNING,
TI.dag_id.in_(dag_ids),
TI.run_id.in_(run_ids),
)
)
).all()
running_ti_keys = {(row.dag_id, row.run_id) for row in rows}
# Get the current time as an aware datetime for duration calculations
now = timezone.utcnow()
# Initialize the result list
stalled_runs = []
# Iterate over each dag run to check if it's stalled
for dag_run in running_dag_runs:
# Skip dag runs without a start_date; duration cannot be
computed meaningfully.
if not dag_run.start_date:
continue
# Check if any task instance is still running for this dag run
has_running_tasks = (dag_run.dag_id, dag_run.run_id) in
running_ti_keys
# Calculate the duration in seconds
duration = (now - dag_run.start_date).total_seconds()
if duration < 0:
# Ignore dag runs with inconsistent timestamps
continue
```
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION)
-> list[DagRun]:
).all()
)
+ @classmethod
+ @provide_session
+ def get_stalled_dag_runs(
+ cls,
+ session: Session = NEW_SESSION,
+ threshold_seconds: float = 3600,
+ ) -> list[DagRun]:
+ """
+ Identify dag runs that have been running longer than the threshold.
+
+ This optimizes scheduler performance by pre-filtering stalled runs
+ so the scheduler can prioritize them for cleanup or alerting.
+ """
+ import time
+
+ # Get the current time
+ current_time = time.time()
+
+ # Query all running dag runs
+ running_dag_runs = session.scalars(
+ select(cls).where(cls.state == DagRunState.RUNNING)
+ ).all()
+
+ # Initialize the result list
+ stalled_runs = []
+
+ # Iterate over each dag run to check if it's stalled
+ for dag_run in running_dag_runs:
+ # Get the task instances for this dag run
+ task_instances = dag_run.get_task_instances(session=session)
+
+ # Check if any task instance is still running
+ has_running_tasks = False
+ for ti in task_instances:
+ if ti.state == TaskInstanceState.RUNNING:
+ has_running_tasks = True
+ break
+
+ # Calculate the duration
+ if dag_run.start_date:
+ duration = current_time - dag_run.start_date.timestamp()
+ else:
+ duration = 0
+
+ # Assert that duration is positive
+ assert duration >= 0, "Duration should never be negative"
+
+ # Add to stalled runs if duration exceeds threshold
+ if duration > threshold_seconds and not has_running_tasks:
+ stalled_runs.append(dag_run)
+
+ return stalled_runs
+
+ @staticmethod
+ @lru_cache
+ def get_dag_run_by_run_id(run_id: str, session: Session = NEW_SESSION) ->
DagRun | None:
+ """Cache and return a DagRun by its run_id for fast repeated
lookups."""
+ return session.scalar(
+ select(DagRun).where(DagRun.run_id == run_id)
Review Comment:
This lookup filters only on `run_id`, but `run_id` is only unique *per dag*;
across multiple DAGs the same `run_id` can exist, so this can return the wrong
DagRun. The query (and cache key) should include `dag_id` (or otherwise use a
globally unique identifier like DagRun.id) to avoid collisions.
```suggestion
@lru_cache(maxsize=1024)
def get_dag_run_by_run_id(dag_id: str, run_id: str, session: Session =
NEW_SESSION) -> DagRun | None:
"""Cache and return a DagRun by its dag_id and run_id for fast
repeated lookups."""
return session.scalar(
select(DagRun).where(and_(DagRun.dag_id == dag_id, DagRun.run_id
== run_id))
```
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION)
-> list[DagRun]:
).all()
)
+ @classmethod
+ @provide_session
+ def get_stalled_dag_runs(
+ cls,
+ session: Session = NEW_SESSION,
+ threshold_seconds: float = 3600,
+ ) -> list[DagRun]:
+ """
+ Identify dag runs that have been running longer than the threshold.
+
+ This optimizes scheduler performance by pre-filtering stalled runs
+ so the scheduler can prioritize them for cleanup or alerting.
+ """
+ import time
+
+ # Get the current time
+ current_time = time.time()
+
+ # Query all running dag runs
+ running_dag_runs = session.scalars(
+ select(cls).where(cls.state == DagRunState.RUNNING)
+ ).all()
+
+ # Initialize the result list
+ stalled_runs = []
+
+ # Iterate over each dag run to check if it's stalled
+ for dag_run in running_dag_runs:
+ # Get the task instances for this dag run
+ task_instances = dag_run.get_task_instances(session=session)
+
+ # Check if any task instance is still running
+ has_running_tasks = False
+ for ti in task_instances:
+ if ti.state == TaskInstanceState.RUNNING:
+ has_running_tasks = True
+ break
+
+ # Calculate the duration
+ if dag_run.start_date:
+ duration = current_time - dag_run.start_date.timestamp()
+ else:
Review Comment:
Computing duration via `start_date.timestamp()` can be error-prone with
timezone-aware datetimes; using `timezone.utcnow() - dag_run.start_date` is
simpler and avoids timestamp conversions.
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION)
-> list[DagRun]:
).all()
)
+ @classmethod
+ @provide_session
+ def get_stalled_dag_runs(
+ cls,
+ session: Session = NEW_SESSION,
+ threshold_seconds: float = 3600,
+ ) -> list[DagRun]:
+ """
+ Identify dag runs that have been running longer than the threshold.
+
+ This optimizes scheduler performance by pre-filtering stalled runs
+ so the scheduler can prioritize them for cleanup or alerting.
+ """
+ import time
+
+ # Get the current time
+ current_time = time.time()
Review Comment:
`import time` and `time.time()` here are unnecessary and inconsistent with
the rest of the file’s timezone-aware datetime usage. Prefer
`timezone.utcnow()` and datetime arithmetic, and keep standard-library imports
at module scope unless there’s a specific isolation/circular-import need.
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -2036,6 +2037,67 @@ def get_latest_runs(cls, session: Session = NEW_SESSION)
-> list[DagRun]:
).all()
)
+ @classmethod
+ @provide_session
+ def get_stalled_dag_runs(
+ cls,
+ session: Session = NEW_SESSION,
+ threshold_seconds: float = 3600,
+ ) -> list[DagRun]:
+ """
+ Identify dag runs that have been running longer than the threshold.
+
+ This optimizes scheduler performance by pre-filtering stalled runs
+ so the scheduler can prioritize them for cleanup or alerting.
+ """
+ import time
+
+ # Get the current time
+ current_time = time.time()
+
+ # Query all running dag runs
+ running_dag_runs = session.scalars(
+ select(cls).where(cls.state == DagRunState.RUNNING)
+ ).all()
+
+ # Initialize the result list
+ stalled_runs = []
+
+ # Iterate over each dag run to check if it's stalled
+ for dag_run in running_dag_runs:
+ # Get the task instances for this dag run
+ task_instances = dag_run.get_task_instances(session=session)
+
+ # Check if any task instance is still running
+ has_running_tasks = False
+ for ti in task_instances:
+ if ti.state == TaskInstanceState.RUNNING:
+ has_running_tasks = True
+ break
+
+ # Calculate the duration
+ if dag_run.start_date:
+ duration = current_time - dag_run.start_date.timestamp()
+ else:
+ duration = 0
+
+ # Assert that duration is positive
+ assert duration >= 0, "Duration should never be negative"
Review Comment:
Avoid using `assert` in production code (it can be stripped with Python
`-O`). If negative durations are possible due to clock skew or bad data, handle
it explicitly (e.g., clamp to 0 or skip) instead of asserting.
```suggestion
# Ensure duration is non-negative even in case of clock skew or
bad data
if duration < 0:
duration = 0
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]