This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new ece82775675 [v3-2-test] Improve DB performance of datetime range
filters filters in API queries (#66696) (#67102)
ece82775675 is described below
commit ece82775675cad38722717a802cfffaefd2187f0
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon May 18 13:04:00 2026 +0200
[v3-2-test] Improve DB performance of datetime range filters filters in API
queries (#66696) (#67102)
* fix(api): replace COALESCE with index-friendly OR conditions in datetime
range filters
Adds NullableDatetimeRangeFilter, a RangeFilter subclass for
start_date/end_date
columns that emits OR predicates instead of COALESCE(col, now()), allowing
PostgreSQL to use btree indexes on those columns.
Two bugs fixed versus the original implementation in PR #66696:
- Lower bounds now use or_(col >= x, col.is_(None)) without a now() guard,
so future-scheduled tasks (NULL start_date) are never incorrectly
excluded.
- The factory dispatches on (attribute_name or filter_name), so aliased
callers
like datetime_range_filter_factory("dag_run_end_date", DagRun, "end_date")
also receive NullableDatetimeRangeFilter rather than a plain RangeFilter.
* fix(api): scope NullableDatetimeRangeFilter to filter_name, not
attribute_name
datetime_range_filter_factory("dag_run_start_date", DagRun, "start_date")
passes attribute_name="start_date", so the guard
if (attribute_name or filter_name) in ("start_date", "end_date"):
resolved to "start_date" and incorrectly returned
NullableDatetimeRangeFilter
for the dag_run_start/end_date filters in the DAGs route. Those columns are
reached via an outer join; NULL means "no run", not "currently running", so
the OR (col IS NULL) branch inflated total_entries counts.
The original COALESCE guard checked filter_name only, so
"dag_run_start_date"
was excluded. Revert to filter_name to preserve those semantics — only
callers with filter_name="start_date" or "end_date" (task instances,
dag_run,
job routes) get NullableDatetimeRangeFilter.
Fixes TestGetDags::test_get_dags failures for query_params 13/14/17/21/23.
* fix(tests): correct test_aliased_*_returns_nullable_filter assertions
The dag_run_start_date and dag_run_end_date filters in the DAGs route use
an outer join, so NULL means "the DAG has no runs" — not "currently
running".
They must return a plain RangeFilter, not NullableDatetimeRangeFilter.
Replace the two tests that incorrectly expected NullableDatetimeRangeFilter
for aliased callers with tests that assert plain RangeFilter is returned.
(cherry picked from commit 37667f11aa37eb27072a79b2de1d5dbec09c2218)
Co-authored-by: Hemkumar Chheda <[email protected]>
---
.../src/airflow/api_fastapi/common/parameters.py | 56 +++++++++++++++----
.../unit/api_fastapi/common/test_parameters.py | 62 ++++++++++++++++++++++
2 files changed, 108 insertions(+), 10 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index a93ec040e1c..38134d825e7 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -898,6 +898,44 @@ class RangeFilter(BaseParam[Range]):
)
+class NullableDatetimeRangeFilter(RangeFilter):
+ """
+ RangeFilter for nullable datetime columns (``start_date``, ``end_date``),
rewritten for index use.
+
+ ``COALESCE(column, now())`` wraps the column in a function call that
prevents PostgreSQL from
+ using btree indexes, forcing sequential scans on large tables. This class
emits equivalent
+ ``OR`` predicates so each branch can be satisfied by an independent index
scan.
+
+ NULL semantics: ``start_date=NULL`` means the task has not started yet;
``end_date=NULL`` means
+ the task is still running. For lower bounds the NULL branch passes
unconditionally — a not-yet-
+ started/ended task will eventually satisfy any past lower bound. For upper
bounds the NULL branch
+ is ``col IS NULL AND now() <= x``, preserving the COALESCE(col, now())
semantics without the
+ function-wrap index penalty.
+ """
+
+ def to_orm(self, select: Select) -> Select:
+ if self.skip_none is False:
+ raise ValueError(f"Cannot set 'skip_none' to False on a
{type(self)}")
+
+ if self.value is None:
+ return select
+
+ if self.value.lower_bound_gte:
+ x = self.value.lower_bound_gte
+ select = select.where(or_(self.attribute >= x,
self.attribute.is_(None)))
+ if self.value.lower_bound_gt:
+ x = self.value.lower_bound_gt
+ select = select.where(or_(self.attribute > x,
self.attribute.is_(None)))
+ if self.value.upper_bound_lte:
+ x = self.value.upper_bound_lte
+ select = select.where(or_(self.attribute <= x,
and_(self.attribute.is_(None), func.now() <= x)))
+ if self.value.upper_bound_lt:
+ x = self.value.upper_bound_lt
+ select = select.where(or_(self.attribute < x,
and_(self.attribute.is_(None), func.now() < x)))
+
+ return select
+
+
def datetime_range_filter_factory(
filter_name: str, model: Base, attribute_name: str | None = None
) -> Callable[[datetime | None, datetime | None, datetime | None, datetime |
None], RangeFilter]:
@@ -908,17 +946,15 @@ def datetime_range_filter_factory(
upper_bound_lt: datetime | None = Query(alias=f"{filter_name}_lt",
default=None),
) -> RangeFilter:
attr = getattr(model, attribute_name or filter_name)
- if filter_name in ("start_date", "end_date"):
- attr = func.coalesce(attr, func.now())
- return RangeFilter(
- Range(
- lower_bound_gte=lower_bound_gte,
- lower_bound_gt=lower_bound_gt,
- upper_bound_lte=upper_bound_lte,
- upper_bound_lt=upper_bound_lt,
- ),
- attr,
+ range_val = Range(
+ lower_bound_gte=lower_bound_gte,
+ lower_bound_gt=lower_bound_gt,
+ upper_bound_lte=upper_bound_lte,
+ upper_bound_lt=upper_bound_lt,
)
+ if filter_name in ("start_date", "end_date"):
+ return NullableDatetimeRangeFilter(range_val, attr)
+ return RangeFilter(range_val, attr)
return depends_datetime
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
index 3c78a09ac61..6d5014b4cec 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import re
+from datetime import datetime, timezone
from types import SimpleNamespace
from typing import Annotated
@@ -27,11 +28,14 @@ from sqlalchemy import select
from airflow.api_fastapi.common.parameters import (
FilterParam,
+ NullableDatetimeRangeFilter,
+ RangeFilter,
SortParam,
_PrefixPatternParam,
_PrefixSearchParam,
_SearchParam,
_TaskDisplayNamePrefixPatternParam,
+ datetime_range_filter_factory,
filter_param_factory,
)
from airflow.models import DagModel, DagRun, Log
@@ -335,3 +339,61 @@ class TestTaskDisplayNamePrefixPatternParam:
sql = _compile(statement)
assert "true" in sql or "1 = 1" in sql
+
+
+def _make_datetime_filter(filter_name, model=TaskInstance,
attribute_name=None, **kwargs):
+ """Call datetime_range_filter_factory outside FastAPI by supplying None
for all omitted bounds."""
+ defaults = dict(lower_bound_gte=None, lower_bound_gt=None,
upper_bound_lte=None, upper_bound_lt=None)
+ defaults.update(kwargs)
+ return datetime_range_filter_factory(filter_name, model,
attribute_name)(**defaults)
+
+
+class TestDatetimeRangeFilterFactory:
+ """datetime_range_filter_factory dispatches to NullableDatetimeRangeFilter
for start/end dates."""
+
+ def test_start_date_returns_nullable_filter(self):
+ rf = _make_datetime_filter("start_date")
+ assert isinstance(rf, NullableDatetimeRangeFilter)
+
+ def test_end_date_returns_nullable_filter(self):
+ rf = _make_datetime_filter("end_date")
+ assert isinstance(rf, NullableDatetimeRangeFilter)
+
+ def test_aliased_filter_name_returns_plain_filter(self):
+ """dag_run_start_date uses attribute_name='start_date' via outer join;
NULL means 'no run',
+ not 'currently running', so it must return a plain RangeFilter to
avoid inflating counts."""
+ rf = _make_datetime_filter("dag_run_start_date", model=DagRun,
attribute_name="start_date")
+ assert type(rf) is RangeFilter
+
+ def test_aliased_end_date_returns_plain_filter(self):
+ """dag_run_end_date uses attribute_name='end_date' via outer join;
must return plain RangeFilter."""
+ rf = _make_datetime_filter("dag_run_end_date", model=DagRun,
attribute_name="end_date")
+ assert type(rf) is RangeFilter
+
+ def test_other_column_returns_plain_filter(self):
+ rf = _make_datetime_filter("queued_dttm")
+ assert type(rf) is RangeFilter
+
+ def test_lower_bound_does_not_include_now(self):
+ """NULL branch on lower bounds passes unconditionally — no now()
call."""
+ bound = datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc)
+ rf = _make_datetime_filter("start_date", lower_bound_gte=bound)
+ sql = _compile(rf.to_orm(select(TaskInstance)))
+ assert "is null" in sql
+ assert "now()" not in sql
+ assert "coalesce" not in sql
+
+ def test_upper_bound_includes_now_for_running_tasks(self):
+ """NULL branch on upper bounds uses now() to proxy the in-progress
task's current time."""
+ bound = datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc)
+ rf = _make_datetime_filter("end_date", upper_bound_lte=bound)
+ sql = _compile(rf.to_orm(select(TaskInstance)))
+ assert "is null" in sql
+ assert "now()" in sql
+ assert "coalesce" not in sql
+
+ def test_no_coalesce_for_start_date(self):
+ bound = datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc)
+ rf = _make_datetime_filter("start_date", upper_bound_lte=bound)
+ sql = _compile(rf.to_orm(select(TaskInstance)))
+ assert "coalesce" not in sql