This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7c51c87503 Do not return ongoin dagrun when a end_date is less than
utcnow (#33488)
7c51c87503 is described below
commit 7c51c87503004d57fe726dae2ab830a5ffd9b64b
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Fri Aug 18 19:09:10 2023 +0200
Do not return ongoin dagrun when a end_date is less than utcnow (#33488)
---
.../www/static/js/cluster-activity/useFilters.tsx | 5 ++++-
airflow/www/views.py | 13 +++++------
tests/www/views/test_views_cluster_activity.py | 25 ++++++++++++++++------
3 files changed, 29 insertions(+), 14 deletions(-)
diff --git a/airflow/www/static/js/cluster-activity/useFilters.tsx
b/airflow/www/static/js/cluster-activity/useFilters.tsx
index 6e30683e3a..2e1c415223 100644
--- a/airflow/www/static/js/cluster-activity/useFilters.tsx
+++ b/airflow/www/static/js/cluster-activity/useFilters.tsx
@@ -49,7 +49,10 @@ export const now = date.toISOString();
const useFilters = (): FilterHookReturn => {
const [searchParams, setSearchParams] = useSearchParams();
- const endDate = searchParams.get(END_DATE_PARAM) || now;
+ const endDate =
+ searchParams.get(END_DATE_PARAM) ||
+ // @ts-ignore
+ moment(now).add(1, "h").toISOString();
const startDate =
searchParams.get(START_DATE_PARAM) ||
// @ts-ignore
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 8838785f45..a38752beb1 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -67,7 +67,7 @@ from jinja2.utils import htmlsafe_json_dumps, pformat #
type: ignore
from markupsafe import Markup, escape
from pendulum.datetime import DateTime
from pendulum.parsing.exceptions import ParserError
-from sqlalchemy import Date, and_, case, desc, func, inspect, or_, select,
union_all
+from sqlalchemy import Date, and_, case, desc, func, inspect, select, union_all
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, joinedload
from wtforms import BooleanField, validators
@@ -3739,13 +3739,14 @@ class Airflow(AirflowBaseView):
"""Returns cluster activity historical metrics."""
start_date = _safe_parse_datetime(request.args.get("start_date"))
end_date = _safe_parse_datetime(request.args.get("end_date"))
+
with create_session() as session:
# DagRuns
- dag_runs_type = session.execute(
+ dag_run_types = session.execute(
select(DagRun.run_type, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
- or_(DagRun.end_date.is_(None), DagRun.end_date <=
end_date),
+ func.coalesce(DagRun.end_date, datetime.datetime.utcnow())
<= end_date,
)
.group_by(DagRun.run_type)
).all()
@@ -3754,7 +3755,7 @@ class Airflow(AirflowBaseView):
select(DagRun.state, func.count(DagRun.run_id))
.where(
DagRun.start_date >= start_date,
- or_(DagRun.end_date.is_(None), DagRun.end_date <=
end_date),
+ func.coalesce(DagRun.end_date, datetime.datetime.utcnow())
<= end_date,
)
.group_by(DagRun.state)
).all()
@@ -3765,7 +3766,7 @@ class Airflow(AirflowBaseView):
.join(TaskInstance.dag_run)
.where(
DagRun.start_date >= start_date,
- or_(DagRun.end_date.is_(None), DagRun.end_date <=
end_date),
+ func.coalesce(DagRun.end_date, datetime.datetime.utcnow())
<= end_date,
)
.group_by(TaskInstance.state)
).all()
@@ -3773,7 +3774,7 @@ class Airflow(AirflowBaseView):
data = {
"dag_run_types": {
**{dag_run_type.value: 0 for dag_run_type in DagRunType},
- **{run_type: sum_value for run_type, sum_value in
dag_runs_type},
+ **{run_type: sum_value for run_type, sum_value in
dag_run_types},
},
"dag_run_states": {
**{dag_run_state.value: 0 for dag_run_state in
DagRunState},
diff --git a/tests/www/views/test_views_cluster_activity.py
b/tests/www/views/test_views_cluster_activity.py
index 13105eb28a..1825a1d680 100644
--- a/tests/www/views/test_views_cluster_activity.py
+++ b/tests/www/views/test_views_cluster_activity.py
@@ -49,7 +49,7 @@ def freeze_time_for_dagruns(time_machine):
@pytest.fixture
-def make_dag_runs(dag_maker, session):
+def make_dag_runs(dag_maker, session, time_machine):
with dag_maker(
dag_id="test_dag_id",
serialized=True,
@@ -76,29 +76,40 @@ def make_dag_runs(dag_maker, session):
start_date=dag_maker.dag.next_dagrun_info(date).logical_date,
)
+ run3 = dag_maker.create_dagrun(
+ run_id="run_3",
+ state=DagRunState.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ execution_date=pendulum.DateTime(2023, 2, 3, 0, 0, 0,
tzinfo=pendulum.UTC),
+ start_date=pendulum.DateTime(2023, 2, 3, 0, 0, 0, tzinfo=pendulum.UTC),
+ )
+ run3.end_date = None
+
for ti in run1.task_instances:
ti.state = TaskInstanceState.SUCCESS
for ti in run2.task_instances:
ti.state = TaskInstanceState.FAILED
+ time_machine.move_to("2023-07-02T00:00:00+00:00", tick=False)
+
session.flush()
@pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
-def test_historical_metrics_data(admin_client, session):
+def test_historical_metrics_data(admin_client, session, time_machine):
resp = admin_client.get(
-
"/object/historical_metrics_data?start_date=2023-01-01T00:00&end_date=2023-05-02T00:00",
+
"/object/historical_metrics_data?start_date=2023-01-01T00:00&end_date=2023-08-02T00:00",
follow_redirects=True,
)
assert resp.status_code == 200
assert resp.json == {
- "dag_run_states": {"failed": 1, "queued": 0, "running": 0, "success":
1},
- "dag_run_types": {"backfill": 0, "dataset_triggered": 1, "manual": 0,
"scheduled": 1},
+ "dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success":
1},
+ "dag_run_types": {"backfill": 0, "dataset_triggered": 1, "manual": 0,
"scheduled": 2},
"task_instance_states": {
"deferred": 0,
"failed": 2,
- "no_status": 0,
+ "no_status": 2,
"queued": 0,
"removed": 0,
"restarting": 0,
@@ -117,7 +128,7 @@ def test_historical_metrics_data(admin_client, session):
@pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
def test_historical_metrics_data_date_filters(admin_client, session):
resp = admin_client.get(
-
"/object/historical_metrics_data?start_date=2023-02-02T00:00&end_date=2023-05-02T00:00",
+
"/object/historical_metrics_data?start_date=2023-02-02T00:00&end_date=2023-06-02T00:00",
follow_redirects=True,
)
assert resp.status_code == 200