pierrejeambrun commented on code in PR #53035: URL: https://github.com/apache/airflow/pull/53035#discussion_r2239464307
########## airflow-core/src/airflow/api_fastapi/common/types.py: ########## Review Comment: I will open an issue for that. ########## airflow-core/src/airflow/api_fastapi/common/types.py: ########## Review Comment: Unrelated to that PR, but I can't load the Grid while there are response waiting it seems: <img width="1019" height="590" alt="Screenshot 2025-07-29 at 13 02 26" src="https://github.com/user-attachments/assets/c18b0fba-cc8a-4d86-933c-551b6a2fff09" /> <img width="1072" height="935" alt="Screenshot 2025-07-29 at 13 05 10" src="https://github.com/user-attachments/assets/ce19dd03-1341-466d-a2d4-b20688359c97" /> ########## airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx: ########## @@ -0,0 +1,195 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Badge, Link } from "@chakra-ui/react"; +import type { ColumnDef } from "@tanstack/react-table"; +import type { TFunction } from "i18next"; +import { useTranslation } from "react-i18next"; +import { Link as RouterLink, useParams } from "react-router-dom"; + +import { useHumanInTheLoopServiceGetHitlDetails } from "openapi/queries"; +import type { HITLDetail } from "openapi/requests/types.gen"; +import { DataTable } from "src/components/DataTable"; +import { useTableURLState } from "src/components/DataTable/useTableUrlState"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { StateBadge } from "src/components/StateBadge"; +import Time from "src/components/Time"; +import { TruncatedText } from "src/components/TruncatedText"; +import { useAutoRefresh } from "src/utils"; +import { getTaskInstanceLink } from "src/utils/links"; + +type TaskInstanceRow = { row: { original: HITLDetail } }; + +const statusMapping = (translate: TFunction, operator: string, responseReceived: boolean) => { + const statusMap = { + ApprovalOperator: ["approvalRequired", "approvalReceived"], + HITLBranchOperator: ["choiceRequired", "choiceReceived"], + HITLEntryOperator: ["inputRequired", "inputReceived"], + HITLOperator: ["responseRequired", "responseReceived"], + }; + + const [required, received] = statusMap[operator as keyof typeof statusMap]; + + return translate(`hitl:status.${responseReceived ? received : required}`); +}; Review Comment: Is it possible for user to extend their own `HITLOperator` cc: @Lee-W because this will not work. ########## airflow-core/src/airflow/api_fastapi/common/types.py: ########## @@ -87,6 +87,7 @@ class ExtraMenuItem: class MenuItem(Enum): """Define all menu items defined in the menu.""" + ACTION_REQUIRED = "Action Required" Review Comment: This will only work with simple auth manager. Other auth managers need to be updated. (Currently testing with FabAuthManager, new HITL menu is not showing up in the browse table which is expected) (need to update permissions per roles, etc.) ########## airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx: ########## @@ -0,0 +1,195 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Badge, Link } from "@chakra-ui/react"; +import type { ColumnDef } from "@tanstack/react-table"; +import type { TFunction } from "i18next"; +import { useTranslation } from "react-i18next"; +import { Link as RouterLink, useParams } from "react-router-dom"; + +import { useHumanInTheLoopServiceGetHitlDetails } from "openapi/queries"; +import type { HITLDetail } from "openapi/requests/types.gen"; +import { DataTable } from "src/components/DataTable"; +import { useTableURLState } from "src/components/DataTable/useTableUrlState"; +import { ErrorAlert } from "src/components/ErrorAlert"; +import { StateBadge } from "src/components/StateBadge"; +import Time from "src/components/Time"; +import { TruncatedText } from "src/components/TruncatedText"; +import { useAutoRefresh } from "src/utils"; +import { getTaskInstanceLink } from "src/utils/links"; + +type TaskInstanceRow = { row: { original: HITLDetail } }; + +const statusMapping = (translate: TFunction, operator: string, responseReceived: boolean) => { + const statusMap = { + ApprovalOperator: ["approvalRequired", "approvalReceived"], + HITLBranchOperator: ["choiceRequired", "choiceReceived"], + HITLEntryOperator: ["inputRequired", "inputReceived"], + HITLOperator: ["responseRequired", "responseReceived"], + }; + + const [required, received] = statusMap[operator as keyof typeof statusMap]; + + return translate(`hitl:status.${responseReceived ? received : required}`); +}; + +const taskInstanceColumns = ({ + dagId, + runId, + taskId, + translate, +}: { + dagId?: string; + runId?: string; + taskId?: string; + translate: TFunction; +}): Array<ColumnDef<HITLDetail>> => [ + { + accessorKey: "task_instance.operator", + cell: ({ row: { original } }: TaskInstanceRow) => { + const { operator } = original.task_instance; + + return <Badge>{statusMapping(translate, operator ?? "", original.response_received ?? false)}</Badge>; + }, + header: translate("Status"), + }, + { + accessorKey: "subject", + cell: ({ row: { original } }: TaskInstanceRow) => ( + <Link asChild color="fg.info" fontWeight="bold"> + <RouterLink to={`${getTaskInstanceLink(original.task_instance)}/action_required`}> + <TruncatedText text={original.subject} /> + </RouterLink> + </Link> + ), + header: translate("Subject"), + }, + ...(Boolean(dagId) + ? [] + : [ + { + accessorKey: "task_instance.dag_id", + enableSorting: false, + header: translate("dagId"), + }, + ]), + ...(Boolean(runId) + ? [] + : [ + { + accessorKey: "run_after", + // If we don't show the taskId column, make the dag run a link to the task instance + cell: ({ row: { original } }: TaskInstanceRow) => + Boolean(taskId) ? ( + <Link asChild color="fg.info" fontWeight="bold"> + <RouterLink to={getTaskInstanceLink(original.task_instance)}> + <Time datetime={original.task_instance.run_after} /> + </RouterLink> + </Link> + ) : ( + <Time datetime={original.task_instance.run_after} /> + ), + header: translate("dagRun_one"), + }, + ]), + ...(Boolean(taskId) + ? [] + : [ + { + accessorKey: "task_display_name", + cell: ({ row: { original } }: TaskInstanceRow) => ( + <TruncatedText text={original.task_instance.task_display_name} /> + ), + enableSorting: false, + header: translate("taskId"), + }, + ]), + { + accessorKey: "rendered_map_index", + header: translate("mapIndex"), + }, + { + accessorKey: "state", + cell: ({ + row: { + original: { + task_instance: { state }, + }, + }, + }) => <StateBadge state={state}>{translate(`common:states.${state}`)}</StateBadge>, + header: () => translate("state"), + }, + { + accessorKey: "response_received", + header: translate("Response Received"), + }, + { + accessorKey: "response_at", + cell: ({ row: { original } }) => <Time datetime={original.response_at} />, + header: translate("Response At"), + }, +]; + +export const HITLTaskInstances = () => { + const { t: translate } = useTranslation(); + const { dagId, groupId, runId, taskId } = useParams(); + const { setTableURLState, tableURLState } = useTableURLState(); + const { pagination } = tableURLState; + + const refetchInterval = useAutoRefresh({}); + + const { data, error, isLoading } = useHumanInTheLoopServiceGetHitlDetails( + { + dagIdPattern: dagId, + dagRunId: runId, + }, + undefined, + { + enabled: !isNaN(pagination.pageSize), + refetchInterval, + }, + ); + + const filteredData = data?.hitl_details.filter((hitl) => { + if (taskId !== undefined) { + return hitl.task_instance.task_id === taskId; + } else if (groupId !== undefined) { + return hitl.task_instance.task_id.includes(groupId); + } + + return true; + }); + + return ( + <DataTable + columns={taskInstanceColumns({ + dagId, + runId, + taskId: Boolean(groupId) ? undefined : taskId, + translate, + })} + data={filteredData ?? []} + errorMessage={<ErrorAlert error={error} />} + initialState={tableURLState} + isLoading={isLoading} + modelName={translate("common:taskInstance_other")} Review Comment: modelName is probably wrong, we are not looking at a TaskInstance table but `Action required` <img width="1893" height="841" alt="Screenshot 2025-07-29 at 12 58 11" src="https://github.com/user-attachments/assets/c73c4248-a099-4e55-8e39-e2e154a048d5" /> ########## airflow-core/src/airflow/ui/src/components/FlexibleForm/FlexibleForm.tsx: ########## @@ -93,55 +103,83 @@ export const FlexibleForm = ({ } }; - return Object.entries(params).some(([, param]) => typeof param.schema.section !== "string") - ? Object.entries(params).map(([, secParam]) => { - const currentSection = secParam.schema.section ?? flexibleFormDefaultSection; - - if (processedSections.has(currentSection)) { - return undefined; - } else { - processedSections.set(currentSection, true); - - return ( - <Accordion.Item - // We need to make the item content overflow visible for dropdowns to work, but directly applying the style does not work - css={{ - "& > :nth-child(2)": { - overflow: "visible", - }, - }} - key={currentSection} - value={currentSection} - > - <Accordion.ItemTrigger cursor="button"> - <Text color={sectionError.get(currentSection) ? "fg.error" : undefined}> - {currentSection} - </Text> - {sectionError.get(currentSection) ? ( - <Icon color="fg.error" margin="-1"> - <MdError /> - </Icon> - ) : undefined} - </Accordion.ItemTrigger> - - <Accordion.ItemContent pt={0}> - <Accordion.ItemBody> - <Stack separator={<StackSeparator />}> - {Object.entries(params) - .filter( - ([, param]) => - param.schema.section === currentSection || - (currentSection === flexibleFormDefaultSection && !Boolean(param.schema.section)), - ) - .map(([name]) => ( - <Row key={name} name={name} onUpdate={onUpdate} /> - ))} - </Stack> - </Accordion.ItemBody> - </Accordion.ItemContent> - </Accordion.Item> - ); - } - }) - : undefined; + return Object.entries(params).some(([, param]) => typeof param.schema.section !== "string") ? ( + Object.entries(params).map(([, secParam]) => { + const currentSection = secParam.schema.section ?? flexibleFormDefaultSection; + + if (processedSections.has(currentSection)) { + return undefined; + } else { + processedSections.set(currentSection, true); + + return ( + <Accordion.Item + // We need to make the item content overflow visible for dropdowns to work, but directly applying the style does not work + css={{ + "& > :nth-child(2)": { + overflow: "visible", + }, + }} + key={currentSection} + value={currentSection} + > + <Accordion.ItemTrigger cursor="button"> + <Text color={sectionError.get(currentSection) ? "fg.error" : undefined}>{currentSection}</Text> + {sectionError.get(currentSection) ? ( + <Icon color="fg.error" margin="-1"> + <MdError /> + </Icon> + ) : undefined} + </Accordion.ItemTrigger> + + <Accordion.ItemContent pt={0}> + <Accordion.ItemBody> + <Stack separator={<StackSeparator py={2} />}> + {Boolean(flexFormDescription) ? <Text mb={2}>{flexFormDescription}</Text> : undefined} + {Object.entries(params) + .filter( + ([, param]) => + param.schema.section === currentSection || + (currentSection === flexibleFormDefaultSection && !Boolean(param.schema.section)), + ) + .map(([name]) => ( + <Row key={name} name={name} onUpdate={onUpdate} /> + ))} + </Stack> + </Accordion.ItemBody> + </Accordion.ItemContent> + </Accordion.Item> + ); + } + }) + ) : isHITL ? ( + <Accordion.Item + css={{ + "& > :nth-child(2)": { + overflow: "visible", + }, Review Comment: This is causing warnings in the console ########## airflow-core/src/airflow/ui/src/queries/useUpdateHITLDetail.ts: ########## @@ -0,0 +1,96 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useQueryClient } from "@tanstack/react-query"; +import { useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { + UseDagRunServiceGetDagRunKeyFn, + useDagRunServiceGetDagRunsKey, + useHumanInTheLoopServiceGetMappedTiHitlDetailKey, + useHumanInTheLoopServiceUpdateMappedTiHitlDetail, + useTaskInstanceServiceGetTaskInstanceKey, + useTaskInstanceServiceGetTaskInstancesKey, +} from "openapi/queries"; +import { toaster } from "src/components/ui/Toaster"; +import type { HITLResponseParams } from "src/utils/hitl"; + +export const useUpdateHITLDetail = ({ + dagId, + dagRunId, + mapIndex, + taskId, +}: { + dagId: string; + dagRunId: string; + mapIndex: number | undefined; + taskId: string; +}) => { + const queryClient = useQueryClient(); + const [error, setError] = useState<unknown>(undefined); + const { t: translate } = useTranslation(["common", "hitl"]); + const onSuccess = async () => { + const queryKeys = [ + UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), + [useDagRunServiceGetDagRunsKey], + [useTaskInstanceServiceGetTaskInstancesKey], + [useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex, taskId }], Review Comment: We should probably invalidate all TI of the RUN. (because tasks waiting for upstream approval will change state but TI details will still be cached). You can take a look at other cache invalidation to check how to wildcard match on mapIndex, and taskId, but specific dagId and dagRunId match. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
