mcgilman commented on code in PR #8216:
URL: https://github.com/apache/nifi/pull/8216#discussion_r1445505855


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts:
##########
@@ -2056,4 +2058,277 @@ export class FlowEffects {
             ),
         { dispatch: false }
     );
+
+    startCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.startComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    startComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.startComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    startComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.startComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.startComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Starting ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.startProcessGroup(request),
+                            
this.flowService.startRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([startPgResponse]) => {
+                                return FlowActions.startComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: startPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support starting` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    /**
+     * If the component started was the current process group, reload the flow
+     */
+    startCurrentProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id === 
currentPg),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    /**
+     * If a ProcessGroup was started, it should be reloaded as the response 
from the start operation doesn't contain all the displayed info
+     */
+    startProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id !== 
currentPg),
+            filter(([response]) => response.type === 
ComponentType.ProcessGroup),

Review Comment:
   Nit-picky but could flip the ordering of the `filter` and only compare if 
the identifiers the `response.type` is `ProcessGroup`.



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts:
##########
@@ -2056,4 +2058,277 @@ export class FlowEffects {
             ),
         { dispatch: false }
     );
+
+    startCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.startComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    startComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.startComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    startComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.startComponent(request)).pipe(

Review Comment:
   Does this `from` observable need its own `catchError` or does the one below 
handle it?



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts:
##########
@@ -2056,4 +2058,277 @@ export class FlowEffects {
             ),
         { dispatch: false }
     );
+
+    startCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.startComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    startComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.startComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    startComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.startComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.startComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Starting ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.startProcessGroup(request),
+                            
this.flowService.startRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([startPgResponse]) => {
+                                return FlowActions.startComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: startPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support starting` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    /**
+     * If the component started was the current process group, reload the flow
+     */
+    startCurrentProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id === 
currentPg),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    /**
+     * If a ProcessGroup was started, it should be reloaded as the response 
from the start operation doesn't contain all the displayed info
+     */
+    startProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id !== 
currentPg),
+            filter(([response]) => response.type === 
ComponentType.ProcessGroup),
+            switchMap(([response]) =>
+                of(
+                    FlowActions.reloadProcessGroup({
+                        request: {
+                            id: response.component.id
+                        }
+                    })
+                )
+            )
+        )
+    );
+
+    stopCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.stopComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            })
+        )
+    );
+
+    stopComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.stopComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    stopComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.stopComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.stopComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Stopping ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.stopProcessGroup(request),
+                            
this.flowService.stopRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([stopPgResponse]) => {
+                                return FlowActions.stopComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: stopPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support stopping` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    /**
+     * If the component stopped was the current process group, reload the flow
+     */
+    stopCurrentProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id === 
currentPg),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    /**
+     * If a ProcessGroup was stopped, it should be reloaded as the response 
from the stop operation doesn't contain all the displayed info
+     */
+    stopProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id !== 
currentPg),
+            filter(([response]) => response.type === 
ComponentType.ProcessGroup),
+            switchMap(([response]) =>
+                of(
+                    FlowActions.reloadProcessGroup({
+                        request: {
+                            id: response.component.id
+                        }
+                    })
+                )
+            )
+        )
+    );
+
+    runOnce$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.runOnce),
+            map((action) => action.request),
+            switchMap((request) => {
+                return from(this.flowService.runOnce(request)).pipe(
+                    map((response) =>
+                        FlowActions.runOnceSuccess({
+                            response: {
+                                component: response
+                            }
+                        })
+                    )
+                );
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    runOnceSuccess$ = createEffect(() =>

Review Comment:
   I think we can just get rid of this entire effect since we were able to 
update the store based on the response to the run once API call.



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts:
##########
@@ -139,262 +156,348 @@ export const loadRemoteProcessGroupSuccess = 
createAction(
  */
 
 export const addSelectedComponents = createAction(
-    '[Canvas] Add Selected Component',
+    `${CANVAS_PREFIX} Add Selected Component`,
     props<{ request: SelectComponentsRequest }>()
 );
 
 export const selectComponents = createAction(
-    '[Canvas] Select Components',
+    `${CANVAS_PREFIX} Select Components`,
     props<{ request: SelectComponentsRequest }>()
 );
 
-export const deselectAllComponents = createAction('[Canvas] Deselect All 
Components');
+export const deselectAllComponents = createAction(`${CANVAS_PREFIX} Deselect 
All Components`);
 
 export const removeSelectedComponents = createAction(
-    '[Canvas] Remove Selected Components',
+    `${CANVAS_PREFIX} Remove Selected Components`,
     props<{ request: SelectComponentsRequest }>()
 );
 
-export const centerSelectedComponent = createAction('[Canvas] Center Selected 
Component');
+export const centerSelectedComponent = createAction(`${CANVAS_PREFIX} Center 
Selected Component`);
 
 /*
     Create Component Actions
  */
 
 export const createComponentRequest = createAction(
-    '[Canvas] Create Component Request',
+    `${CANVAS_PREFIX} Create Component Request`,
     props<{ request: CreateComponentRequest }>()
 );
 
-export const createFunnel = createAction('[Canvas] Create Funnel', props<{ 
request: CreateComponentRequest }>());
+export const createFunnel = createAction(
+    `${CANVAS_PREFIX} Create Funnel`,
+    props<{ request: CreateComponentRequest }>()
+);
 
-export const createLabel = createAction('[Canvas] Create Label', props<{ 
request: CreateComponentRequest }>());
+export const createLabel = createAction(`${CANVAS_PREFIX} Create Label`, 
props<{ request: CreateComponentRequest }>());
 
 export const openNewProcessGroupDialog = createAction(
-    '[Canvas] Open New Process Group Dialog',
+    `${CANVAS_PREFIX} Open New Process Group Dialog`,
     props<{ request: CreateProcessGroupDialogRequest }>()
 );
 
 export const createProcessGroup = createAction(
-    '[Canvas] Create Process Group',
+    `${CANVAS_PREFIX} Create Process Group`,
     props<{ request: CreateProcessGroupRequest }>()
 );
 
 export const uploadProcessGroup = createAction(
-    '[Canvas] Upload Process Group',
+    `${CANVAS_PREFIX} Upload Process Group`,
     props<{ request: UploadProcessGroupRequest }>()
 );
 
 export const getParameterContextsAndOpenGroupComponentsDialog = createAction(
-    '[Canvas] Get Parameter Contexts And Open Group Components Dialog',
+    `${CANVAS_PREFIX} Get Parameter Contexts And Open Group Components Dialog`,
     props<{ request: OpenGroupComponentsDialogRequest }>()
 );
 
 export const openGroupComponentsDialog = createAction(
-    '[Canvas] Open Group Components Dialog',
+    `${CANVAS_PREFIX} Open Group Components Dialog`,
     props<{ request: GroupComponentsDialogRequest }>()
 );
 
-export const groupComponents = createAction('[Canvas] Group Components', 
props<{ request: GroupComponentsRequest }>());
+export const groupComponents = createAction(
+    `${CANVAS_PREFIX} Group Components`,
+    props<{ request: GroupComponentsRequest }>()
+);
 
 export const groupComponentsSuccess = createAction(
-    '[Canvas] Group Components Success',
+    `${CANVAS_PREFIX} Group Components Success`,
     props<{ response: GroupComponentsSuccess }>()
 );
 
 export const openNewProcessorDialog = createAction(
-    '[Canvas] Open New Processor Dialog',
+    `${CANVAS_PREFIX} Open New Processor Dialog`,
     props<{ request: CreateComponentRequest }>()
 );
 
-export const createProcessor = createAction('[Canvas] Create Processor', 
props<{ request: CreateProcessorRequest }>());
+export const createProcessor = createAction(
+    `${CANVAS_PREFIX} Create Processor`,
+    props<{ request: CreateProcessorRequest }>()
+);
 
 export const getDefaultsAndOpenNewConnectionDialog = createAction(
-    '[Canvas] Get Defaults And Open New Connection Dialog',
+    `${CANVAS_PREFIX} Get Defaults And Open New Connection Dialog`,
     props<{ request: CreateConnectionRequest }>()
 );
 
 export const openNewConnectionDialog = createAction(
-    '[Canvas] Open New Connection Dialog',
+    `${CANVAS_PREFIX} Open New Connection Dialog`,
     props<{ request: CreateConnectionDialogRequest }>()
 );
 
-export const createConnection = createAction('[Canvas] Create Connection', 
props<{ request: CreateConnection }>());
+export const createConnection = createAction(
+    `${CANVAS_PREFIX} Create Connection`,
+    props<{ request: CreateConnection }>()
+);
 
 export const openNewPortDialog = createAction(
-    '[Canvas] Open New Port Dialog',
+    `${CANVAS_PREFIX} Open New Port Dialog`,
     props<{ request: CreateComponentRequest }>()
 );
 
-export const createPort = createAction('[Canvas] Create Port', props<{ 
request: CreatePortRequest }>());
+export const createPort = createAction(`${CANVAS_PREFIX} Create Port`, props<{ 
request: CreatePortRequest }>());
 
 export const createComponentSuccess = createAction(
-    '[Canvas] Create Component Success',
+    `${CANVAS_PREFIX} Create Component Success`,
     props<{ response: CreateComponentResponse }>()
 );
 
 export const createComponentComplete = createAction(
-    '[Canvas] Create Component Complete',
+    `${CANVAS_PREFIX} Create Component Complete`,
     props<{ response: CreateComponentResponse }>()
 );
 
 export const navigateToViewStatusHistoryForComponent = createAction(
-    '[Canvas] Navigate To Status History For Component',
+    `${CANVAS_PREFIX} Navigate To Status History For Component`,
     props<{ request: OpenComponentDialogRequest }>()
 );
 
 export const viewStatusHistoryForComponent = createAction(
-    '[Canvas] View Status History for Component',
+    `${CANVAS_PREFIX} View Status History for Component`,
     props<{ request: StatusHistoryRequest }>()
 );
 /*
     Update Component Actions
  */
 
 export const navigateToEditComponent = createAction(
-    '[Canvas] Navigate To Edit Component',
+    `${CANVAS_PREFIX} Navigate To Edit Component`,
     props<{ request: OpenComponentDialogRequest }>()
 );
 
-export const editComponent = createAction('[Canvas] Edit Component', props<{ 
request: EditComponentDialogRequest }>());
+export const editComponent = createAction(
+    `${CANVAS_PREFIX} Edit Component`,
+    props<{ request: EditComponentDialogRequest }>()
+);
 
-export const navigateToEditCurrentProcessGroup = createAction('[Canvas] 
Navigate To Edit Current Process Group');
+export const navigateToEditCurrentProcessGroup = createAction(
+    `${CANVAS_PREFIX} Navigate To Edit Current Process Group`
+);
 
 export const navigateToControllerServicesForProcessGroup = createAction(
-    '[Canvas] Navigate To Controller Services For Process Group',
+    `${CANVAS_PREFIX} Navigate To Controller Services For Process Group`,
     props<{ request: NavigateToControllerServicesRequest }>()
 );
 
 export const editCurrentProcessGroup = createAction(
-    '[Canvas] Edit Current Process Group',
+    `${CANVAS_PREFIX} Edit Current Process Group`,
     props<{
         request: EditCurrentProcessGroupRequest;
     }>()
 );
 
 export const openEditPortDialog = createAction(
-    '[Canvas] Open Edit Port Dialog',
+    `${CANVAS_PREFIX} Open Edit Port Dialog`,
     props<{ request: EditComponentDialogRequest }>()
 );
 
 export const openEditProcessorDialog = createAction(
-    '[Canvas] Open Edit Processor Dialog',
+    `${CANVAS_PREFIX} Open Edit Processor Dialog`,
     props<{ request: EditComponentDialogRequest }>()
 );
 
 export const openEditConnectionDialog = createAction(
-    '[Canvas] Open Edit Connection Dialog',
+    `${CANVAS_PREFIX} Open Edit Connection Dialog`,
     props<{ request: EditConnectionDialogRequest }>()
 );
 
 export const openEditProcessGroupDialog = createAction(
-    '[Canvas] Open Edit Process Group Dialog',
+    `${CANVAS_PREFIX} Open Edit Process Group Dialog`,
     props<{ request: EditComponentDialogRequest }>()
 );
 
-export const updateComponent = createAction('[Canvas] Update Component', 
props<{ request: UpdateComponentRequest }>());
+export const updateComponent = createAction(
+    `${CANVAS_PREFIX} Update Component`,
+    props<{ request: UpdateComponentRequest }>()
+);
 
 export const updateComponentSuccess = createAction(
-    '[Canvas] Update Component Success',
+    `${CANVAS_PREFIX} Update Component Success`,
     props<{ response: UpdateComponentResponse }>()
 );
 
 export const updateComponentFailure = createAction(
-    '[Canvas] Update Component Failure',
+    `${CANVAS_PREFIX} Update Component Failure`,
     props<{ response: UpdateComponentFailure }>()
 );
 
-export const updateProcessor = createAction('[Canvas] Update Processor', 
props<{ request: UpdateComponentRequest }>());
+export const updateProcessor = createAction(
+    `${CANVAS_PREFIX} Update Processor`,
+    props<{ request: UpdateComponentRequest }>()
+);
 
 export const updateProcessorSuccess = createAction(
-    '[Canvas] Update Processor Success',
+    `${CANVAS_PREFIX} Update Processor Success`,
     props<{ response: UpdateComponentResponse }>()
 );
 
 export const updateConnection = createAction(
-    '[Canvas] Update Connection',
+    `${CANVAS_PREFIX} Update Connection`,
     props<{ request: UpdateConnectionRequest }>()
 );
 
 export const updateConnectionSuccess = createAction(
-    '[Canvas] Update Connection Success',
+    `${CANVAS_PREFIX} Update Connection Success`,
     props<{ response: UpdateConnectionSuccess }>()
 );
 
-export const updatePositions = createAction('[Canvas] Update Positions', 
props<{ request: UpdatePositionsRequest }>());
+export const updatePositions = createAction(
+    `${CANVAS_PREFIX} Update Positions`,
+    props<{ request: UpdatePositionsRequest }>()
+);
 
 export const updatePositionComplete = createAction(
-    '[Canvas] Update Position Complete',
+    `${CANVAS_PREFIX} Update Position Complete`,
     props<{ response: UpdateComponentResponse }>()
 );
 
-export const moveComponents = createAction('[Canvas] Move Components', props<{ 
request: MoveComponentsRequest }>());
+export const moveComponents = createAction(
+    `${CANVAS_PREFIX} Move Components`,
+    props<{ request: MoveComponentsRequest }>()
+);
 
 /*
     Delete Component Actions
  */
 
 export const deleteComponents = createAction(
-    '[Canvas] Delete Components',
+    `${CANVAS_PREFIX} Delete Components`,
     props<{ request: DeleteComponentRequest[] }>()
 );
 
 export const deleteComponentsSuccess = createAction(
-    '[Canvas] Delete Components Success',
+    `${CANVAS_PREFIX} Delete Components Success`,
     props<{ response: DeleteComponentResponse[] }>()
 );
 
 /*
     Transition
  */
 
-export const setDragging = createAction('[Canvas] Set Dragging', props<{ 
dragging: boolean }>());
+export const setDragging = createAction(`${CANVAS_PREFIX} Set Dragging`, 
props<{ dragging: boolean }>());
 
 export const setTransitionRequired = createAction(
-    '[Canvas] Set Transition Required',
+    `${CANVAS_PREFIX} Set Transition Required`,
     props<{ transitionRequired: boolean }>()
 );
 
-export const setSkipTransform = createAction('[Canvas] Set Skip Transform', 
props<{ skipTransform: boolean }>());
+export const setSkipTransform = createAction(
+    `${CANVAS_PREFIX} Set Skip Transform`,
+    props<{ skipTransform: boolean }>()
+);
 
 export const navigateToComponent = createAction(
-    '[Canvas] Navigate To Component',
+    `${CANVAS_PREFIX} Navigate To Component`,
     props<{ request: NavigateToComponentRequest }>()
 );
 
-export const navigateWithoutTransform = createAction('[Canvas] Navigate 
Without Transform', props<{ url: string[] }>());
+export const navigateWithoutTransform = createAction(
+    `${CANVAS_PREFIX} Navigate Without Transform`,
+    props<{ url: string[] }>()
+);
 
 /*
     Palette actions
  */
 
 export const setNavigationCollapsed = createAction(
-    '[Canvas] Set Navigation Collapsed',
+    `${CANVAS_PREFIX} Set Navigation Collapsed`,
     props<{ navigationCollapsed: boolean }>()
 );
 
 export const setOperationCollapsed = createAction(
-    '[Canvas] Set Operation Collapsed',
+    `${CANVAS_PREFIX} Set Operation Collapsed`,
     props<{ operationCollapsed: boolean }>()
 );
 
 /*
     General
  */
 
-export const showOkDialog = createAction('[Canvas] Show Ok Dialog', props<{ 
title: string; message: string }>());
+export const showOkDialog = createAction(
+    `${CANVAS_PREFIX} Show Ok Dialog`,
+    props<{ title: string; message: string }>()
+);
 
 export const renderConnectionsForComponent = createAction(
-    '[Canvas] Render Connections For Component',
+    `${CANVAS_PREFIX} Render Connections For Component`,
     props<{ id: string; updatePath: boolean; updateLabel: boolean }>()
 );
 
 export const navigateToProvenanceForComponent = createAction(
-    '[Canvas] Navigate To Provenance For Component',
+    `${CANVAS_PREFIX} Navigate To Provenance For Component`,
     props<{ id: string }>()
 );
 
 export const replayLastProvenanceEvent = createAction(
-    '[Canvas] Replay Last Provenance Event',
+    `${CANVAS_PREFIX} Replay Last Provenance Event`,
     props<{ request: ReplayLastProvenanceEventRequest }>()
 );
+
+export const runOnce = createAction(`${CANVAS_PREFIX} Run Once`, props<{ 
request: RunOnceRequest }>());
+
+export const runOnceSuccess = createAction(`${CANVAS_PREFIX} Run Once 
Success`, props<{ response: RunOnceResponse }>());
+
+export const startComponent = createAction(
+    `${CANVAS_PREFIX} Start Component`,
+    props<{ request: StartComponentRequest | StartProcessGroupRequest }>()
+);
+
+export const startComponents = createAction(
+    `${CANVAS_PREFIX} Start Components`,
+    props<{ request: StartComponentsRequest }>()
+);
+
+export const startComponentSuccess = createAction(
+    `${CANVAS_PREFIX} Start Component Success`,
+    props<{ response: StartComponentResponse }>()
+);
+
+export const stopComponent = createAction(
+    `${CANVAS_PREFIX} Stop Component`,
+    props<{ request: StopComponentRequest | StopProcessGroupRequest }>()
+);
+
+export const stopComponents = createAction(
+    `${CANVAS_PREFIX} Stop Components`,
+    props<{ request: StopComponentsRequest }>()
+);
+
+export const stopComponentSuccess = createAction(
+    `${CANVAS_PREFIX} Stop Component Success`,
+    props<{ response: StopComponentResponse }>()
+);
+
+export const startCurrentProcessGroup = createAction(`${CANVAS_PREFIX} Start 
Current Process Group`);
+
+export const stopCurrentProcessGroup = createAction(`${CANVAS_PREFIX} Stop 
Current Process Group`);
+
+export const reloadProcessGroup = createAction(

Review Comment:
   We have another use case where we've performed some action on a Connection 
and once the action is completed we need to reload the source/destination 
component. I think it seems like a pretty similar scenario as we've performed 
some action and now we want to reload a Process Group. Can these actions 
(`reloadProcessGroup` and `reloadProcessGroupSuccess`) be renamed and moved so 
they are co-located with the others? Specifically look for `loadProcessor`, 
`loadInputPort`, `loadRemoteProcessor`, etc. in this file.



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts:
##########
@@ -2056,4 +2058,277 @@ export class FlowEffects {
             ),
         { dispatch: false }
     );
+
+    startCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.startComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    startComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.startComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    startComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.startComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.startComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Starting ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.startProcessGroup(request),
+                            
this.flowService.startRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([startPgResponse]) => {
+                                return FlowActions.startComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: startPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support starting` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    /**
+     * If the component started was the current process group, reload the flow
+     */
+    startCurrentProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id === 
currentPg),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    /**
+     * If a ProcessGroup was started, it should be reloaded as the response 
from the start operation doesn't contain all the displayed info
+     */
+    startProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id !== 
currentPg),
+            filter(([response]) => response.type === 
ComponentType.ProcessGroup),
+            switchMap(([response]) =>
+                of(
+                    FlowActions.reloadProcessGroup({
+                        request: {
+                            id: response.component.id
+                        }
+                    })
+                )
+            )
+        )
+    );
+
+    stopCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.stopComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            })
+        )
+    );
+
+    stopComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.stopComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    stopComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.stopComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.stopComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Stopping ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.stopProcessGroup(request),
+                            
this.flowService.stopRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([stopPgResponse]) => {
+                                return FlowActions.stopComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: stopPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support stopping` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    /**
+     * If the component stopped was the current process group, reload the flow
+     */
+    stopCurrentProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id === 
currentPg),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    /**
+     * If a ProcessGroup was stopped, it should be reloaded as the response 
from the stop operation doesn't contain all the displayed info
+     */
+    stopProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id !== 
currentPg),
+            filter(([response]) => response.type === 
ComponentType.ProcessGroup),
+            switchMap(([response]) =>
+                of(
+                    FlowActions.reloadProcessGroup({
+                        request: {
+                            id: response.component.id
+                        }
+                    })
+                )
+            )
+        )
+    );
+
+    runOnce$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.runOnce),
+            map((action) => action.request),
+            switchMap((request) => {
+                return from(this.flowService.runOnce(request)).pipe(

Review Comment:
   Same comment from above about needing a `catchError`.



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts:
##########
@@ -2056,4 +2058,277 @@ export class FlowEffects {
             ),
         { dispatch: false }
     );
+
+    startCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.startComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    startComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.startComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    startComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.startComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.startComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Starting ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.startProcessGroup(request),
+                            
this.flowService.startRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([startPgResponse]) => {
+                                return FlowActions.startComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: startPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support starting` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    /**
+     * If the component started was the current process group, reload the flow
+     */
+    startCurrentProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id === 
currentPg),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    /**
+     * If a ProcessGroup was started, it should be reloaded as the response 
from the start operation doesn't contain all the displayed info
+     */
+    startProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id !== 
currentPg),
+            filter(([response]) => response.type === 
ComponentType.ProcessGroup),
+            switchMap(([response]) =>
+                of(
+                    FlowActions.reloadProcessGroup({
+                        request: {
+                            id: response.component.id
+                        }
+                    })
+                )
+            )
+        )
+    );
+
+    stopCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.stopComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            })
+        )
+    );
+
+    stopComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.stopComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    stopComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.stopComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.stopComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Stopping ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.stopProcessGroup(request),
+                            
this.flowService.stopRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([stopPgResponse]) => {
+                                return FlowActions.stopComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: stopPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support stopping` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    /**
+     * If the component stopped was the current process group, reload the flow
+     */
+    stopCurrentProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id === 
currentPg),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    /**
+     * If a ProcessGroup was stopped, it should be reloaded as the response 
from the stop operation doesn't contain all the displayed info
+     */
+    stopProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id !== 
currentPg),
+            filter(([response]) => response.type === 
ComponentType.ProcessGroup),

Review Comment:
   Same comment as above about the ordering of the `filter` calls.



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts:
##########
@@ -2056,4 +2058,277 @@ export class FlowEffects {
             ),
         { dispatch: false }
     );
+
+    startCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.startComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    startComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.startComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    startComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.startComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.startComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Starting ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.startProcessGroup(request),
+                            
this.flowService.startRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([startPgResponse]) => {
+                                return FlowActions.startComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: startPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support starting` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    /**
+     * If the component started was the current process group, reload the flow
+     */
+    startCurrentProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id === 
currentPg),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    /**
+     * If a ProcessGroup was started, it should be reloaded as the response 
from the start operation doesn't contain all the displayed info
+     */
+    startProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id !== 
currentPg),
+            filter(([response]) => response.type === 
ComponentType.ProcessGroup),
+            switchMap(([response]) =>
+                of(
+                    FlowActions.reloadProcessGroup({
+                        request: {
+                            id: response.component.id
+                        }
+                    })
+                )
+            )
+        )
+    );
+
+    stopCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.stopComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            })
+        )
+    );
+
+    stopComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.stopComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    stopComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.stopComponent(request)).pipe(

Review Comment:
   Same comment from above what whether this needs a `catchError`



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts:
##########
@@ -2056,4 +2058,277 @@ export class FlowEffects {
             ),
         { dispatch: false }
     );
+
+    startCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.startComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    startComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.startComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    startComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.startComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.startComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Starting ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.startProcessGroup(request),
+                            
this.flowService.startRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([startPgResponse]) => {
+                                return FlowActions.startComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: startPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support starting` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    /**
+     * If the component started was the current process group, reload the flow
+     */
+    startCurrentProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id === 
currentPg),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    /**
+     * If a ProcessGroup was started, it should be reloaded as the response 
from the start operation doesn't contain all the displayed info
+     */
+    startProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id !== 
currentPg),
+            filter(([response]) => response.type === 
ComponentType.ProcessGroup),
+            switchMap(([response]) =>
+                of(
+                    FlowActions.reloadProcessGroup({
+                        request: {
+                            id: response.component.id
+                        }
+                    })
+                )
+            )
+        )
+    );
+
+    stopCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.stopComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            })
+        )
+    );
+
+    stopComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.stopComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    stopComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.stopComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.stopComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Stopping ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.stopProcessGroup(request),
+                            
this.flowService.stopRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([stopPgResponse]) => {
+                                return FlowActions.stopComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: stopPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support stopping` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    /**
+     * If the component stopped was the current process group, reload the flow
+     */
+    stopCurrentProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id === 
currentPg),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    /**
+     * If a ProcessGroup was stopped, it should be reloaded as the response 
from the stop operation doesn't contain all the displayed info
+     */
+    stopProcessGroupSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.stopComponentSuccess),
+            map((action) => action.response),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            filter(([response, currentPg]) => response.component.id !== 
currentPg),
+            filter(([response]) => response.type === 
ComponentType.ProcessGroup),
+            switchMap(([response]) =>
+                of(
+                    FlowActions.reloadProcessGroup({
+                        request: {
+                            id: response.component.id
+                        }
+                    })
+                )
+            )
+        )
+    );
+
+    runOnce$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.runOnce),
+            map((action) => action.request),
+            switchMap((request) => {
+                return from(this.flowService.runOnce(request)).pipe(
+                    map((response) =>
+                        FlowActions.runOnceSuccess({
+                            response: {
+                                component: response
+                            }
+                        })
+                    )
+                );
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    runOnceSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.runOnceSuccess),
+            switchMap(() => of(FlowActions.reloadFlow()))
+        )
+    );
+
+    reloadProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.reloadProcessGroup),
+            map((action) => action.request),
+            mergeMap((request) => {
+                return from(this.flowService.getProcessGroup(request.id)).pipe(

Review Comment:
   This one is missing its `catchError`.



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts:
##########
@@ -103,6 +104,7 @@ import { YesNoDialog } from 
'../../../../ui/common/yes-no-dialog/yes-no-dialog.c
 import { EditParameterDialog } from 
'../../../../ui/common/edit-parameter-dialog/edit-parameter-dialog.component';
 import { selectParameterSaving } from '../parameter/parameter.selectors';
 import { ParameterService } from '../../service/parameter.service';
+import { getXHRResponse } from 'rxjs/internal/ajax/getXHRResponse';

Review Comment:
   I think this was imported by accident.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to