This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f87772f52a Fix dag run selection (#38941)
f87772f52a is described below
commit f87772f52aeaad2c99eb4c0b1f985b9aa4810ea9
Author: Brent Bovenzi <[email protected]>
AuthorDate: Thu Apr 11 16:36:22 2024 -0400
Fix dag run selection (#38941)
* Fix dag run link params
* Do run_id checks inside of the grid_data hook
* remove firstRunId context
---
airflow/www/static/js/api/useGridData.ts | 32 ++++++++++++++++++++++------
airflow/www/static/js/dag/Main.tsx | 14 +-----------
airflow/www/static/js/dag/details/Header.tsx | 11 ++++------
airflow/www/static/js/dag/useSelection.ts | 7 ------
airflow/www/templates/airflow/dag.html | 1 +
airflow/www/views.py | 27 ++++-------------------
6 files changed, 36 insertions(+), 56 deletions(-)
diff --git a/airflow/www/static/js/api/useGridData.ts
b/airflow/www/static/js/api/useGridData.ts
index ee9325e852..1a633acea3 100644
--- a/airflow/www/static/js/api/useGridData.ts
+++ b/airflow/www/static/js/api/useGridData.ts
@@ -33,9 +33,9 @@ import useFilters, {
FILTER_UPSTREAM_PARAM,
ROOT_PARAM,
} from "src/dag/useFilters";
-import type { Task, DagRun, RunOrdering } from "src/types";
+import type { Task, DagRun, RunOrdering, API } from "src/types";
import { camelCase } from "lodash";
-import useSelection, { RUN_ID } from "src/dag/useSelection";
+import useSelection from "src/dag/useSelection";
const DAG_ID_PARAM = "dag_id";
@@ -80,9 +80,12 @@ const useGridData = () => {
filterDownstream,
filterUpstream,
},
+ onBaseDateChange,
} = useFilters();
- const { firstRunIdSetByUrl } = useSelection();
-
+ const {
+ onSelect,
+ selected: { taskId, runId },
+ } = useSelection();
const query = useQuery(
[
"gridData",
@@ -93,7 +96,7 @@ const useGridData = () => {
root,
filterUpstream,
filterDownstream,
- firstRunIdSetByUrl,
+ runId,
],
async () => {
const params = {
@@ -105,11 +108,28 @@ const useGridData = () => {
[NUM_RUNS_PARAM]: numRuns,
[RUN_TYPE_PARAM]: runType,
[RUN_STATE_PARAM]: runState,
- [RUN_ID]: firstRunIdSetByUrl || "",
};
const response = await axios.get<AxiosResponse, GridData>(gridDataUrl, {
params,
});
+ if (runId && !response.dagRuns.find((dr) => dr.runId === runId)) {
+ const dagRunUrl = getMetaValue("dag_run_url")
+ .replace("__DAG_ID__", dagId)
+ .replace("__DAG_RUN_ID__", runId);
+
+ // If the run id cannot be found in the response, try fetching it to
see if its real and then adjust the base date filter
+ try {
+ const selectedRun = await axios.get<AxiosResponse, API.DAGRun>(
+ dagRunUrl
+ );
+ if (selectedRun?.executionDate) {
+ onBaseDateChange(selectedRun.executionDate);
+ }
+ // otherwise the run_id isn't valid and we should unselect it
+ } catch (e) {
+ onSelect({ taskId });
+ }
+ }
// turn off auto refresh if there are no active runs
if (!areActiveRuns(response.dagRuns)) stopRefresh();
return response;
diff --git a/airflow/www/static/js/dag/Main.tsx
b/airflow/www/static/js/dag/Main.tsx
index 2d70b452d5..144aba94a9 100644
--- a/airflow/www/static/js/dag/Main.tsx
+++ b/airflow/www/static/js/dag/Main.tsx
@@ -47,7 +47,6 @@ import FilterBar from "./nav/FilterBar";
import LegendRow from "./nav/LegendRow";
import useToggleGroups from "./useToggleGroups";
import keyboardShortcutIdentifier from "./keyboardShortcutIdentifier";
-import { DagRunSelectionContext, RUN_ID } from "./useSelection";
const detailsPanelKey = "hideDetailsPanel";
const minPanelWidth = 300;
@@ -74,7 +73,7 @@ const headerHeight =
10
) || 0;
-const MainInContext = () => {
+const Main = () => {
const {
data: { groups },
isLoading,
@@ -319,15 +318,4 @@ const MainInContext = () => {
);
};
-const Main = () => {
- const [searchParams] = useSearchParams();
- const [firstRunIdSetByUrl] = useState(searchParams.get(RUN_ID));
-
- return (
- <DagRunSelectionContext.Provider value={firstRunIdSetByUrl}>
- <MainInContext />
- </DagRunSelectionContext.Provider>
- );
-};
-
export default Main;
diff --git a/airflow/www/static/js/dag/details/Header.tsx
b/airflow/www/static/js/dag/details/Header.tsx
index e1222270d4..d13dfa6c81 100644
--- a/airflow/www/static/js/dag/details/Header.tsx
+++ b/airflow/www/static/js/dag/details/Header.tsx
@@ -51,17 +51,14 @@ const Header = ({ mapIndex }: Props) => {
} = useSelection();
const dagRun = dagRuns.find((r) => r.runId === runId);
-
const group = getTask({ taskId, task: groups });
- // If runId and/or taskId can't be found remove the selection
+ // If taskId can't be found remove the selection
useEffect(() => {
- if (runId && !dagRun && taskId && !group) {
- clearSelection();
- } else if (runId && !dagRun) {
- onSelect({ taskId });
+ if (taskId && !group) {
+ onSelect({ runId });
}
- }, [dagRun, taskId, group, runId, onSelect, clearSelection]);
+ }, [taskId, group, onSelect, runId]);
let runLabel;
if (dagRun && runId) {
diff --git a/airflow/www/static/js/dag/useSelection.ts
b/airflow/www/static/js/dag/useSelection.ts
index 00ead40c52..04113a870c 100644
--- a/airflow/www/static/js/dag/useSelection.ts
+++ b/airflow/www/static/js/dag/useSelection.ts
@@ -17,7 +17,6 @@
* under the License.
*/
-import { createContext, useContext } from "react";
import { useSearchParams } from "react-router-dom";
import {
LIMIT_PARAM,
@@ -35,13 +34,8 @@ export interface SelectionProps {
mapIndex?: number;
}
-// The first run_id need to be treated differently from the selection, because
it is used in backend to
-// calculate the base_date, which we don't want jumping around when user is
clicking in the grid.
-export const DagRunSelectionContext = createContext<string | null>(null);
-
const useSelection = () => {
const [searchParams, setSearchParams] = useSearchParams();
- const firstRunIdSetByUrl = useContext(DagRunSelectionContext);
// Clear selection, but keep other search params
const clearSelection = () => {
@@ -99,7 +93,6 @@ const useSelection = () => {
},
clearSelection,
onSelect,
- firstRunIdSetByUrl,
};
};
diff --git a/airflow/www/templates/airflow/dag.html
b/airflow/www/templates/airflow/dag.html
index 56bfc27070..d9ef3c139a 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -84,6 +84,7 @@
<meta name="datasets_api" content="{{
url_for('/api/v1.airflow_api_connexion_endpoints_dataset_endpoint_get_datasets')
}}">
<meta name="event_logs_api" content="{{
url_for('/api/v1.airflow_api_connexion_endpoints_event_log_endpoint_get_event_logs')
}}">
<meta name="audit_log_url" content="{{ url_for('LogModelView.list') }}">
+ <meta name="dag_run_url" content="{{
url_for('/api/v1.airflow_api_connexion_endpoints_dag_run_endpoint_get_dag_run',
dag_id='__DAG_ID__', dag_run_id='__DAG_RUN_ID__') }}">
<!-- End Urls -->
<meta name="is_paused" content="{{ dag_is_paused }}">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 55aa5a5e50..ce0727ab4d 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3167,7 +3167,6 @@ class Airflow(AirflowBaseView):
def grid_data(self):
"""Return grid data."""
dag_id = request.args.get("dag_id")
- run_id = request.args.get("dag_run_id")
dag = get_airflow_app().dag_bag.get_dag(dag_id)
if not dag:
@@ -3185,43 +3184,25 @@ class Airflow(AirflowBaseView):
if num_runs is None:
num_runs = conf.getint("webserver",
"default_dag_run_display_number")
- dagrun = None
- if run_id:
- with create_session() as session:
- dagrun = dag.get_dagrun(run_id=run_id, session=session)
- if not dagrun:
- return {"error": f"can't find dag_run_id={run_id}"}, 404
- base_date = dagrun.execution_date
- else:
- try:
- base_date = timezone.parse(request.args["base_date"],
strict=True)
- except (KeyError, ValueError):
- base_date = dag.get_latest_execution_date() or
timezone.utcnow()
+ try:
+ base_date = timezone.parse(request.args["base_date"], strict=True)
+ except (KeyError, ValueError):
+ base_date = dag.get_latest_execution_date() or timezone.utcnow()
with create_session() as session:
query = select(DagRun).where(DagRun.dag_id == dag.dag_id,
DagRun.execution_date <= base_date)
run_types = request.args.getlist("run_type")
if run_types:
- if run_id:
- return {"error": "Can not provide filters when dag_run_id
filter is selected."}, 400
query = query.where(DagRun.run_type.in_(run_types))
run_states = request.args.getlist("run_state")
if run_states:
- if run_id:
- return {"error": "Can not provide filters when dag_run_id
filter is selected."}, 400
query = query.where(DagRun.state.in_(run_states))
dag_runs = wwwutils.sorted_dag_runs(
query, ordering=dag.timetable.run_ordering, limit=num_runs,
session=session
)
- if dagrun:
- found_requested_run_id = any(True for d in dag_runs if d.run_id ==
run_id)
- if not found_requested_run_id:
- return {
- "error": f"Dag with dag_run_id={run_id} found, but not in
selected time range or filters."
- }, 404
encoded_runs = [wwwutils.encode_dag_run(dr,
json_encoder=utils_json.WebEncoder) for dr in dag_runs]
data = {