This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c51c87503 Do not return ongoin dagrun when a end_date is less than 
utcnow (#33488)
7c51c87503 is described below

commit 7c51c87503004d57fe726dae2ab830a5ffd9b64b
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Fri Aug 18 19:09:10 2023 +0200

    Do not return ongoin dagrun when a end_date is less than utcnow (#33488)
---
 .../www/static/js/cluster-activity/useFilters.tsx  |  5 ++++-
 airflow/www/views.py                               | 13 +++++------
 tests/www/views/test_views_cluster_activity.py     | 25 ++++++++++++++++------
 3 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/airflow/www/static/js/cluster-activity/useFilters.tsx 
b/airflow/www/static/js/cluster-activity/useFilters.tsx
index 6e30683e3a..2e1c415223 100644
--- a/airflow/www/static/js/cluster-activity/useFilters.tsx
+++ b/airflow/www/static/js/cluster-activity/useFilters.tsx
@@ -49,7 +49,10 @@ export const now = date.toISOString();
 const useFilters = (): FilterHookReturn => {
   const [searchParams, setSearchParams] = useSearchParams();
 
-  const endDate = searchParams.get(END_DATE_PARAM) || now;
+  const endDate =
+    searchParams.get(END_DATE_PARAM) ||
+    // @ts-ignore
+    moment(now).add(1, "h").toISOString();
   const startDate =
     searchParams.get(START_DATE_PARAM) ||
     // @ts-ignore
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 8838785f45..a38752beb1 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -67,7 +67,7 @@ from jinja2.utils import htmlsafe_json_dumps, pformat  # 
type: ignore
 from markupsafe import Markup, escape
 from pendulum.datetime import DateTime
 from pendulum.parsing.exceptions import ParserError
-from sqlalchemy import Date, and_, case, desc, func, inspect, or_, select, 
union_all
+from sqlalchemy import Date, and_, case, desc, func, inspect, select, union_all
 from sqlalchemy.exc import IntegrityError
 from sqlalchemy.orm import Session, joinedload
 from wtforms import BooleanField, validators
@@ -3739,13 +3739,14 @@ class Airflow(AirflowBaseView):
         """Returns cluster activity historical metrics."""
         start_date = _safe_parse_datetime(request.args.get("start_date"))
         end_date = _safe_parse_datetime(request.args.get("end_date"))
+
         with create_session() as session:
             # DagRuns
-            dag_runs_type = session.execute(
+            dag_run_types = session.execute(
                 select(DagRun.run_type, func.count(DagRun.run_id))
                 .where(
                     DagRun.start_date >= start_date,
-                    or_(DagRun.end_date.is_(None), DagRun.end_date <= 
end_date),
+                    func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) 
<= end_date,
                 )
                 .group_by(DagRun.run_type)
             ).all()
@@ -3754,7 +3755,7 @@ class Airflow(AirflowBaseView):
                 select(DagRun.state, func.count(DagRun.run_id))
                 .where(
                     DagRun.start_date >= start_date,
-                    or_(DagRun.end_date.is_(None), DagRun.end_date <= 
end_date),
+                    func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) 
<= end_date,
                 )
                 .group_by(DagRun.state)
             ).all()
@@ -3765,7 +3766,7 @@ class Airflow(AirflowBaseView):
                 .join(TaskInstance.dag_run)
                 .where(
                     DagRun.start_date >= start_date,
-                    or_(DagRun.end_date.is_(None), DagRun.end_date <= 
end_date),
+                    func.coalesce(DagRun.end_date, datetime.datetime.utcnow()) 
<= end_date,
                 )
                 .group_by(TaskInstance.state)
             ).all()
@@ -3773,7 +3774,7 @@ class Airflow(AirflowBaseView):
             data = {
                 "dag_run_types": {
                     **{dag_run_type.value: 0 for dag_run_type in DagRunType},
-                    **{run_type: sum_value for run_type, sum_value in 
dag_runs_type},
+                    **{run_type: sum_value for run_type, sum_value in 
dag_run_types},
                 },
                 "dag_run_states": {
                     **{dag_run_state.value: 0 for dag_run_state in 
DagRunState},
diff --git a/tests/www/views/test_views_cluster_activity.py 
b/tests/www/views/test_views_cluster_activity.py
index 13105eb28a..1825a1d680 100644
--- a/tests/www/views/test_views_cluster_activity.py
+++ b/tests/www/views/test_views_cluster_activity.py
@@ -49,7 +49,7 @@ def freeze_time_for_dagruns(time_machine):
 
 
 @pytest.fixture
-def make_dag_runs(dag_maker, session):
+def make_dag_runs(dag_maker, session, time_machine):
     with dag_maker(
         dag_id="test_dag_id",
         serialized=True,
@@ -76,29 +76,40 @@ def make_dag_runs(dag_maker, session):
         start_date=dag_maker.dag.next_dagrun_info(date).logical_date,
     )
 
+    run3 = dag_maker.create_dagrun(
+        run_id="run_3",
+        state=DagRunState.RUNNING,
+        run_type=DagRunType.SCHEDULED,
+        execution_date=pendulum.DateTime(2023, 2, 3, 0, 0, 0, 
tzinfo=pendulum.UTC),
+        start_date=pendulum.DateTime(2023, 2, 3, 0, 0, 0, tzinfo=pendulum.UTC),
+    )
+    run3.end_date = None
+
     for ti in run1.task_instances:
         ti.state = TaskInstanceState.SUCCESS
 
     for ti in run2.task_instances:
         ti.state = TaskInstanceState.FAILED
 
+    time_machine.move_to("2023-07-02T00:00:00+00:00", tick=False)
+
     session.flush()
 
 
 @pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
-def test_historical_metrics_data(admin_client, session):
+def test_historical_metrics_data(admin_client, session, time_machine):
     resp = admin_client.get(
-        
"/object/historical_metrics_data?start_date=2023-01-01T00:00&end_date=2023-05-02T00:00",
+        
"/object/historical_metrics_data?start_date=2023-01-01T00:00&end_date=2023-08-02T00:00",
         follow_redirects=True,
     )
     assert resp.status_code == 200
     assert resp.json == {
-        "dag_run_states": {"failed": 1, "queued": 0, "running": 0, "success": 
1},
-        "dag_run_types": {"backfill": 0, "dataset_triggered": 1, "manual": 0, 
"scheduled": 1},
+        "dag_run_states": {"failed": 1, "queued": 0, "running": 1, "success": 
1},
+        "dag_run_types": {"backfill": 0, "dataset_triggered": 1, "manual": 0, 
"scheduled": 2},
         "task_instance_states": {
             "deferred": 0,
             "failed": 2,
-            "no_status": 0,
+            "no_status": 2,
             "queued": 0,
             "removed": 0,
             "restarting": 0,
@@ -117,7 +128,7 @@ def test_historical_metrics_data(admin_client, session):
 @pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
 def test_historical_metrics_data_date_filters(admin_client, session):
     resp = admin_client.get(
-        
"/object/historical_metrics_data?start_date=2023-02-02T00:00&end_date=2023-05-02T00:00",
+        
"/object/historical_metrics_data?start_date=2023-02-02T00:00&end_date=2023-06-02T00:00",
         follow_redirects=True,
     )
     assert resp.status_code == 200

Reply via email to