mcgilman commented on code in PR #8216:
URL: https://github.com/apache/nifi/pull/8216#discussion_r1445317774
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/service/canvas-utils.service.ts:
##########
@@ -1200,4 +1200,198 @@ export class CanvasUtils {
}
return '#ffffff';
}
+
+ /**
+ * Determines if the components in the specified selection are runnable.
+ *
+ * @argument {d3.Selection} selection The selection
+ * @return {boolean} Whether the selection is runnable
+ */
+ public areRunnable(selection: d3.Selection<any, any, any, any>): boolean {
+ if (selection.empty()) {
+ return true;
+ }
+
+ let runnable = true;
+ selection.each((data, index, nodes): boolean => {
+ if (!this.isRunnable(d3.select(nodes[index]))) {
+ runnable = false;
+ return false;
Review Comment:
I realize this came from `nf-canvas-utils` but I don't think that returning
boolean here does anything. The `jQuery` each looped supports a short circuit
loop break by returning `false`. However, I don't think the D3 selection `each`
method returns anything.
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/service/canvas-context-menu.service.ts:
##########
@@ -389,36 +399,86 @@ export class CanvasContextMenu implements
ContextMenuDefinitionProvider {
isSeparator: true
},
{
- condition: function (canvasUtils: CanvasUtils, selection: any)
{
- // TODO - isRunnable
- return false;
+ condition: (canvasUtils: CanvasUtils, selection: any) => {
+ return this.canvasUtils.areRunnable(selection);
},
clazz: 'fa fa-play',
text: 'Start',
- action: function (store: Store<CanvasState>) {
- // TODO - start
+ action: (store: Store<CanvasState>, selection: any) => {
+ if (selection.empty()) {
+ // attempting to start the current process group
+ this.store.dispatch(startCurrentProcessGroup());
+ } else {
+ const components: StartComponentRequest[] = [];
+ const startable =
this.canvasUtils.getStartable(selection);
+ startable.each((d: any) => {
+ components.push({
+ id: d.id,
+ uri: d.uri,
+ type: d.type,
+ revision: d.revision
+ });
+ });
+ this.store.dispatch(
+ startComponents({
+ request: {
+ components
+ }
+ })
+ );
+ }
}
},
{
- condition: function (canvasUtils: CanvasUtils, selection: any)
{
- // TODO - isStoppable
- return false;
+ condition: (canvasUtils: CanvasUtils, selection: any) => {
+ return this.canvasUtils.areStoppable(selection);
},
clazz: 'fa fa-stop',
text: 'Stop',
- action: function (store: Store<CanvasState>) {
- // TODO - stop
+ action: (store: Store<CanvasState>, selection: any) => {
+ if (selection.empty()) {
+ // attempting to start the current process group
+ this.store.dispatch(stopCurrentProcessGroup());
+ } else {
+ const components: StopComponentRequest[] = [];
+ selection.each((d: any) => {
+ components.push({
+ id: d.id,
+ uri: d.uri,
+ type: d.type,
+ revision: d.revision
+ });
+ // const d = selection.datum();
Review Comment:
This can be removed.
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/index.ts:
##########
@@ -469,3 +469,76 @@ export interface FlowState {
error: string | null;
status: 'pending' | 'loading' | 'error' | 'success';
}
+
+export interface Revision {
Review Comment:
`Revision` already exists [1]. `lastModifier?` will need to be added to the
existing one but it the same back end object.
[1]
https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/state/shared/index.ts#L250
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts:
##########
@@ -2056,4 +2057,207 @@ export class FlowEffects {
),
{ dispatch: false }
);
+
+ startCurrentProcessGroup$ = createEffect(() =>
+ this.actions$.pipe(
+ ofType(FlowActions.startCurrentProcessGroup),
+ withLatestFrom(this.store.select(selectCurrentProcessGroupId)),
+ switchMap(([, pgId]) => {
+ return of(
+ FlowActions.startComponent({
+ request: {
+ id: pgId,
+ type: ComponentType.ProcessGroup
+ }
+ })
+ );
+ }),
+ catchError((error) => of(FlowActions.flowApiError({ error:
error.error })))
+ )
+ );
+
+ startComponents$ = createEffect(() =>
+ this.actions$.pipe(
+ ofType(FlowActions.startComponents),
+ map((action) => action.request),
+ mergeMap((request) => [
+ ...request.components.map((component) => {
+ return FlowActions.startComponent({
+ request: component
+ });
+ })
+ ])
+ )
+ );
+
+ startComponent$ = createEffect(() =>
+ this.actions$.pipe(
+ ofType(FlowActions.startComponent),
+ map((action) => action.request),
+ mergeMap((request) => {
+ switch (request.type) {
+ case ComponentType.InputPort:
+ case ComponentType.OutputPort:
+ case ComponentType.Processor:
+ case ComponentType.RemoteProcessGroup:
+ if ('uri' in request && 'revision' in request) {
+ return
from(this.flowService.startComponent(request)).pipe(
+ map((response) => {
+ return FlowActions.startComponentSuccess({
+ response: {
+ type: request.type,
+ component: response
+ }
+ });
+ })
+ );
+ }
+ return of(
+ FlowActions.flowApiError({
+ error: `Starting ${request.type} requires both
uri and revision properties`
+ })
+ );
+ case ComponentType.ProcessGroup:
+ return combineLatest([
+ this.flowService.startProcessGroup(request),
+
this.flowService.startRemoteProcessGroupsInProcessGroup(request)
+ ]).pipe(
+ map(([startPgResponse]) => {
+ return FlowActions.startComponentSuccess({
+ response: {
+ type: request.type,
+ component: startPgResponse
+ }
+ });
+ })
+ );
+ default:
+ return of(FlowActions.flowApiError({ error:
`${request.type} does not support starting` }));
+ }
+ }),
+ catchError((error) => of(FlowActions.flowApiError({ error:
error.error })))
+ )
+ );
+
+ startComponentSuccess$ = createEffect(() =>
+ this.actions$.pipe(
+ ofType(FlowActions.startComponentSuccess),
+ debounceTime(200), // may be starting many at a time, debounce to
reduce attempts to reload the flow
Review Comment:
Thoughts on implementing an aggregator pattern to await all start component
actions completing before reloading the flow? We've done this for awaiting
component position updates [1] and user group updates [2].
[1]
https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts#L1564
[2]
https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/users/state/user-listing/user-listing.effects.ts#L502
##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts:
##########
@@ -416,5 +317,70 @@ export const flowReducer = createReducer(
on(setOperationCollapsed, (state, { operationCollapsed }) => ({
...state,
operationCollapsed: operationCollapsed
- }))
+ })),
+ on(startComponentSuccess, stopComponentSuccess, (state, { response }) => {
+ return produce(state, (draftState) => {
+ const collection: any[] | null =
getComponentCollection(draftState, response.type);
+
+ if (collection) {
+ const componentIndex: number = collection.findIndex((f: any)
=> response.component.id === f.id);
+ if (componentIndex > -1) {
+ collection[componentIndex] = {
+ ...collection[componentIndex],
+ ...response.component
+ };
+ }
+ }
+
+ draftState.saving = false;
+ });
+ }),
+ on(runOnceSuccess, (state, { response }) => {
+ return produce(state, (draftState) => {
+ const collection: any[] | null =
getComponentCollection(draftState, ComponentType.Processor);
+
+ if (collection) {
+ const componentIndex: number = collection.findIndex((f: any)
=> response.component.id === f.id);
+ if (componentIndex > -1) {
+ collection[componentIndex] = {
+ ...collection[componentIndex],
+ ...response.component
+ };
+ }
+ }
+
+ draftState.saving = false;
+ });
+ })
);
+
+function getComponentCollection(draftState: FlowState, componentType:
ComponentType): any[] | null {
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]