mcgilman commented on code in PR #8216:
URL: https://github.com/apache/nifi/pull/8216#discussion_r1445317774


##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/service/canvas-utils.service.ts:
##########
@@ -1200,4 +1200,198 @@ export class CanvasUtils {
         }
         return '#ffffff';
     }
+
+    /**
+     * Determines if the components in the specified selection are runnable.
+     *
+     * @argument {d3.Selection} selection      The selection
+     * @return {boolean}                    Whether the selection is runnable
+     */
+    public areRunnable(selection: d3.Selection<any, any, any, any>): boolean {
+        if (selection.empty()) {
+            return true;
+        }
+
+        let runnable = true;
+        selection.each((data, index, nodes): boolean => {
+            if (!this.isRunnable(d3.select(nodes[index]))) {
+                runnable = false;
+                return false;

Review Comment:
   I realize this came from `nf-canvas-utils` but I don't think that returning 
boolean here does anything. The `jQuery` each looped supports a short circuit 
loop break by returning `false`. However, I don't think the D3 selection `each` 
method returns anything.



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/service/canvas-context-menu.service.ts:
##########
@@ -389,36 +399,86 @@ export class CanvasContextMenu implements 
ContextMenuDefinitionProvider {
                 isSeparator: true
             },
             {
-                condition: function (canvasUtils: CanvasUtils, selection: any) 
{
-                    // TODO - isRunnable
-                    return false;
+                condition: (canvasUtils: CanvasUtils, selection: any) => {
+                    return this.canvasUtils.areRunnable(selection);
                 },
                 clazz: 'fa fa-play',
                 text: 'Start',
-                action: function (store: Store<CanvasState>) {
-                    // TODO - start
+                action: (store: Store<CanvasState>, selection: any) => {
+                    if (selection.empty()) {
+                        // attempting to start the current process group
+                        this.store.dispatch(startCurrentProcessGroup());
+                    } else {
+                        const components: StartComponentRequest[] = [];
+                        const startable = 
this.canvasUtils.getStartable(selection);
+                        startable.each((d: any) => {
+                            components.push({
+                                id: d.id,
+                                uri: d.uri,
+                                type: d.type,
+                                revision: d.revision
+                            });
+                        });
+                        this.store.dispatch(
+                            startComponents({
+                                request: {
+                                    components
+                                }
+                            })
+                        );
+                    }
                 }
             },
             {
-                condition: function (canvasUtils: CanvasUtils, selection: any) 
{
-                    // TODO - isStoppable
-                    return false;
+                condition: (canvasUtils: CanvasUtils, selection: any) => {
+                    return this.canvasUtils.areStoppable(selection);
                 },
                 clazz: 'fa fa-stop',
                 text: 'Stop',
-                action: function (store: Store<CanvasState>) {
-                    // TODO - stop
+                action: (store: Store<CanvasState>, selection: any) => {
+                    if (selection.empty()) {
+                        // attempting to start the current process group
+                        this.store.dispatch(stopCurrentProcessGroup());
+                    } else {
+                        const components: StopComponentRequest[] = [];
+                        selection.each((d: any) => {
+                            components.push({
+                                id: d.id,
+                                uri: d.uri,
+                                type: d.type,
+                                revision: d.revision
+                            });
+                            // const d = selection.datum();

Review Comment:
   This can be removed.



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/index.ts:
##########
@@ -469,3 +469,76 @@ export interface FlowState {
     error: string | null;
     status: 'pending' | 'loading' | 'error' | 'success';
 }
+
+export interface Revision {

Review Comment:
   `Revision` already exists [1]. `lastModifier?` will need to be added to the 
existing one but it the same back end object.
   
   [1] 
https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/state/shared/index.ts#L250



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts:
##########
@@ -2056,4 +2057,207 @@ export class FlowEffects {
             ),
         { dispatch: false }
     );
+
+    startCurrentProcessGroup$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startCurrentProcessGroup),
+            withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+            switchMap(([, pgId]) => {
+                return of(
+                    FlowActions.startComponent({
+                        request: {
+                            id: pgId,
+                            type: ComponentType.ProcessGroup
+                        }
+                    })
+                );
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    startComponents$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponents),
+            map((action) => action.request),
+            mergeMap((request) => [
+                ...request.components.map((component) => {
+                    return FlowActions.startComponent({
+                        request: component
+                    });
+                })
+            ])
+        )
+    );
+
+    startComponent$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponent),
+            map((action) => action.request),
+            mergeMap((request) => {
+                switch (request.type) {
+                    case ComponentType.InputPort:
+                    case ComponentType.OutputPort:
+                    case ComponentType.Processor:
+                    case ComponentType.RemoteProcessGroup:
+                        if ('uri' in request && 'revision' in request) {
+                            return 
from(this.flowService.startComponent(request)).pipe(
+                                map((response) => {
+                                    return FlowActions.startComponentSuccess({
+                                        response: {
+                                            type: request.type,
+                                            component: response
+                                        }
+                                    });
+                                })
+                            );
+                        }
+                        return of(
+                            FlowActions.flowApiError({
+                                error: `Starting ${request.type} requires both 
uri and revision properties`
+                            })
+                        );
+                    case ComponentType.ProcessGroup:
+                        return combineLatest([
+                            this.flowService.startProcessGroup(request),
+                            
this.flowService.startRemoteProcessGroupsInProcessGroup(request)
+                        ]).pipe(
+                            map(([startPgResponse]) => {
+                                return FlowActions.startComponentSuccess({
+                                    response: {
+                                        type: request.type,
+                                        component: startPgResponse
+                                    }
+                                });
+                            })
+                        );
+                    default:
+                        return of(FlowActions.flowApiError({ error: 
`${request.type} does not support starting` }));
+                }
+            }),
+            catchError((error) => of(FlowActions.flowApiError({ error: 
error.error })))
+        )
+    );
+
+    startComponentSuccess$ = createEffect(() =>
+        this.actions$.pipe(
+            ofType(FlowActions.startComponentSuccess),
+            debounceTime(200), // may be starting many at a time, debounce to 
reduce attempts to reload the flow

Review Comment:
   Thoughts on implementing an aggregator pattern to await all start component 
actions completing before reloading the flow? We've done this for awaiting 
component position updates [1] and user group updates [2].
   
   [1] 
https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts#L1564
   [2] 
https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/users/state/user-listing/user-listing.effects.ts#L502



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts:
##########
@@ -416,5 +317,70 @@ export const flowReducer = createReducer(
     on(setOperationCollapsed, (state, { operationCollapsed }) => ({
         ...state,
         operationCollapsed: operationCollapsed
-    }))
+    })),
+    on(startComponentSuccess, stopComponentSuccess, (state, { response }) => {
+        return produce(state, (draftState) => {
+            const collection: any[] | null = 
getComponentCollection(draftState, response.type);
+
+            if (collection) {
+                const componentIndex: number = collection.findIndex((f: any) 
=> response.component.id === f.id);
+                if (componentIndex > -1) {
+                    collection[componentIndex] = {
+                        ...collection[componentIndex],
+                        ...response.component
+                    };
+                }
+            }
+
+            draftState.saving = false;
+        });
+    }),
+    on(runOnceSuccess, (state, { response }) => {
+        return produce(state, (draftState) => {
+            const collection: any[] | null = 
getComponentCollection(draftState, ComponentType.Processor);
+
+            if (collection) {
+                const componentIndex: number = collection.findIndex((f: any) 
=> response.component.id === f.id);
+                if (componentIndex > -1) {
+                    collection[componentIndex] = {
+                        ...collection[componentIndex],
+                        ...response.component
+                    };
+                }
+            }
+
+            draftState.saving = false;
+        });
+    })
 );
+
+function getComponentCollection(draftState: FlowState, componentType: 
ComponentType): any[] | null {

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to