This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 95ef3476b45c06ecc08fad53bd5f56a8c6abbe70 Author: HTErik <[email protected]> AuthorDate: Tue Feb 20 22:35:07 2024 +0100 Base date for fetching dag grid view must include selected run_id (#34887) Previously, if user set dag_run_id parameter in the url, that refers to a old run, which doesn't fit in the most recent 25 runs, then the requested run will not be selected. This change fixes this by setting the base_date to a time where the run_id is known to exist if dag_run_id is provided as an explicit query parameter. closes: #34723 (cherry picked from commit a0ebabb796624a5e1158a3697c6f5f5a88771c60) --- airflow/www/static/js/api/useGridData.ts | 4 ++++ airflow/www/static/js/dag/Main.tsx | 14 +++++++++++++- airflow/www/static/js/dag/useSelection.ts | 9 ++++++++- airflow/www/views.py | 28 ++++++++++++++++++++++++---- 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/airflow/www/static/js/api/useGridData.ts b/airflow/www/static/js/api/useGridData.ts index a0cbfd6b76..ee9325e852 100644 --- a/airflow/www/static/js/api/useGridData.ts +++ b/airflow/www/static/js/api/useGridData.ts @@ -35,6 +35,7 @@ import useFilters, { } from "src/dag/useFilters"; import type { Task, DagRun, RunOrdering } from "src/types"; import { camelCase } from "lodash"; +import useSelection, { RUN_ID } from "src/dag/useSelection"; const DAG_ID_PARAM = "dag_id"; @@ -80,6 +81,7 @@ const useGridData = () => { filterUpstream, }, } = useFilters(); + const { firstRunIdSetByUrl } = useSelection(); const query = useQuery( [ @@ -91,6 +93,7 @@ const useGridData = () => { root, filterUpstream, filterDownstream, + firstRunIdSetByUrl, ], async () => { const params = { @@ -102,6 +105,7 @@ const useGridData = () => { [NUM_RUNS_PARAM]: numRuns, [RUN_TYPE_PARAM]: runType, [RUN_STATE_PARAM]: runState, + [RUN_ID]: firstRunIdSetByUrl || "", }; const response = await axios.get<AxiosResponse, GridData>(gridDataUrl, { params, diff --git a/airflow/www/static/js/dag/Main.tsx b/airflow/www/static/js/dag/Main.tsx index 28dffedf28..9a4ace85eb 100644 --- a/airflow/www/static/js/dag/Main.tsx +++ b/airflow/www/static/js/dag/Main.tsx @@ -35,6 +35,7 @@ import FilterBar from "./nav/FilterBar"; import LegendRow from "./nav/LegendRow"; import useToggleGroups from "./useToggleGroups"; import keyboardShortcutIdentifier from "./keyboardShortcutIdentifier"; +import { DagRunSelectionContext, RUN_ID } from "./useSelection"; const detailsPanelKey = "hideDetailsPanel"; const minPanelWidth = 300; @@ -61,7 +62,7 @@ const headerHeight = 10 ) || 0; -const Main = () => { +const MainInContext = () => { const { data: { groups }, isLoading, @@ -256,4 +257,15 @@ const Main = () => { ); }; +const Main = () => { + const [searchParams] = useSearchParams(); + const [firstRunIdSetByUrl] = useState(searchParams.get(RUN_ID)); + + return ( + <DagRunSelectionContext.Provider value={firstRunIdSetByUrl}> + <MainInContext /> + </DagRunSelectionContext.Provider> + ); +}; + export default Main; diff --git a/airflow/www/static/js/dag/useSelection.ts b/airflow/www/static/js/dag/useSelection.ts index 09b7c92f53..7464c254ae 100644 --- a/airflow/www/static/js/dag/useSelection.ts +++ b/airflow/www/static/js/dag/useSelection.ts @@ -17,9 +17,10 @@ * under the License. */ +import { createContext, useContext } from "react"; import { useSearchParams } from "react-router-dom"; -const RUN_ID = "dag_run_id"; +export const RUN_ID = "dag_run_id"; const TASK_ID = "task_id"; const MAP_INDEX = "map_index"; @@ -29,8 +30,13 @@ export interface SelectionProps { mapIndex?: number; } +// The first run_id need to be treated differently from the selection, because it is used in backend to +// calculate the base_date, which we don't want jumping around when user is clicking in the grid. +export const DagRunSelectionContext = createContext<string | null>(null); + const useSelection = () => { const [searchParams, setSearchParams] = useSearchParams(); + const firstRunIdSetByUrl = useContext(DagRunSelectionContext); // Clear selection, but keep other search params const clearSelection = () => { @@ -70,6 +76,7 @@ const useSelection = () => { }, clearSelection, onSelect, + firstRunIdSetByUrl, }; }; diff --git a/airflow/www/views.py b/airflow/www/views.py index 093ff436eb..78bc7e473a 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3532,6 +3532,7 @@ class Airflow(AirflowBaseView): def grid_data(self): """Return grid data.""" dag_id = request.args.get("dag_id") + run_id = request.args.get("dag_run_id") dag = get_airflow_app().dag_bag.get_dag(dag_id) if not dag: @@ -3549,25 +3550,44 @@ class Airflow(AirflowBaseView): if num_runs is None: num_runs = conf.getint("webserver", "default_dag_run_display_number") - try: - base_date = timezone.parse(request.args["base_date"], strict=True) - except (KeyError, ValueError): - base_date = dag.get_latest_execution_date() or timezone.utcnow() + dagrun = None + if run_id: + with create_session() as session: + dagrun = dag.get_dagrun(run_id=run_id, session=session) + if not dagrun: + return {"error": f"can't find dag_run_id={run_id}"}, 404 + base_date = dagrun.execution_date + else: + try: + base_date = timezone.parse(request.args["base_date"], strict=True) + except (KeyError, ValueError): + base_date = dag.get_latest_execution_date() or timezone.utcnow() with create_session() as session: query = select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date) run_types = request.args.getlist("run_type") if run_types: + if run_id: + return {"error": "Can not provide filters when dag_run_id filter is selected."}, 400 query = query.where(DagRun.run_type.in_(run_types)) run_states = request.args.getlist("run_state") if run_states: + if run_id: + return {"error": "Can not provide filters when dag_run_id filter is selected."}, 400 query = query.where(DagRun.state.in_(run_states)) dag_runs = wwwutils.sorted_dag_runs( query, ordering=dag.timetable.run_ordering, limit=num_runs, session=session ) + if dagrun: + found_requested_run_id = any(True for d in dag_runs if d.run_id == run_id) + if not found_requested_run_id: + return { + "error": f"Dag with dag_run_id={run_id} found, but not in selected time range or filters." + }, 404 + encoded_runs = [wwwutils.encode_dag_run(dr, json_encoder=utils_json.WebEncoder) for dr in dag_runs] data = { "groups": dag_to_grid(dag, dag_runs, session),
