Repository: nifi Updated Branches: refs/heads/master 72f8999b1 -> 6938e58c8
NIFI-5066: - Allowing the enable/disable buttons to be active under the same conditions as the start/stop buttons. This closes #2633. Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6938e58c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6938e58c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6938e58c Branch: refs/heads/master Commit: 6938e58c81d857f5d56a694a6fef8cf94e60bce2 Parents: 72f8999 Author: Matt Gilman <[email protected]> Authored: Thu Apr 12 14:12:01 2018 -0400 Committer: Mark Payne <[email protected]> Committed: Wed Apr 25 14:58:29 2018 -0400 ---------------------------------------------------------------------- .../api/entity/ScheduleComponentsEntity.java | 6 +- .../org/apache/nifi/groups/ProcessGroup.java | 36 +++- .../nifi/groups/StandardProcessGroup.java | 12 +- .../apache/nifi/audit/ProcessGroupAuditor.java | 25 +++ .../org/apache/nifi/web/NiFiServiceFacade.java | 2 + .../nifi/web/StandardNiFiServiceFacade.java | 32 ++++ .../org/apache/nifi/web/api/FlowResource.java | 75 +++++++-- .../apache/nifi/web/dao/ProcessGroupDAO.java | 10 ++ .../web/dao/impl/StandardProcessGroupDAO.java | 36 +++- .../WEB-INF/partials/canvas/navigation.jsp | 4 +- .../src/main/webapp/js/nf/canvas/nf-actions.js | 164 +++++++++++-------- .../main/webapp/js/nf/canvas/nf-canvas-utils.js | 40 +++-- 12 files changed, 329 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java index 280d015..dff1ea7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ScheduleComponentsEntity.java @@ -29,6 +29,8 @@ import java.util.Map; public class ScheduleComponentsEntity extends Entity { public static final String STATE_RUNNING = "RUNNING"; public static final String STATE_STOPPED = "STOPPED"; + public static final String STATE_ENABLED = "ENABLED"; + public static final String STATE_DISABLED = "DISABLED"; private String id; private String state; @@ -49,11 +51,11 @@ public class ScheduleComponentsEntity extends Entity { } /** - * @return The desired state of the descendant components. Possible states are 'RUNNING' and 'STOPPED' + * @return The desired state of the descendant components. Possible states are 'RUNNING', 'STOPPED', 'ENABLED', and 'DISABLED' */ @ApiModelProperty( value = "The desired state of the descendant components", - allowableValues = STATE_RUNNING + ", " + STATE_STOPPED + allowableValues = STATE_RUNNING + ", " + STATE_STOPPED + ", " + STATE_ENABLED + ", " + STATE_DISABLED ) public String getState() { return state; http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index d7a9e8b..da9374a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -57,24 +57,44 @@ import org.apache.nifi.remote.RemoteGroupPort; public interface ProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent { /** - * Predicate for filtering schedulable Processors. + * Predicate for starting eligible Processors. */ - Predicate<ProcessorNode> SCHEDULABLE_PROCESSORS = node -> !node.isRunning() && !ScheduledState.DISABLED.equals(node.getScheduledState()) && node.isValid(); + Predicate<ProcessorNode> START_PROCESSORS_FILTER = node -> !node.isRunning() && !ScheduledState.DISABLED.equals(node.getScheduledState()) && node.isValid(); /** - * Predicate for filtering unschedulable Processors. + * Predicate for stopping eligible Processors. */ - Predicate<ProcessorNode> UNSCHEDULABLE_PROCESSORS = node -> node.isRunning(); + Predicate<ProcessorNode> STOP_PROCESSORS_FILTER = node -> node.isRunning(); /** - * Predicate for filtering schedulable Ports + * Predicate for enabling eligible Processors. */ - Predicate<Port> SCHEDULABLE_PORTS = port -> !port.isRunning() && !ScheduledState.DISABLED.equals(port.getScheduledState()) && port.isValid(); + Predicate<ProcessorNode> ENABLE_PROCESSORS_FILTER = node -> ScheduledState.DISABLED.equals(node.getScheduledState()); /** - * Predicate for filtering schedulable Ports + * Predicate for disabling eligible Processors. */ - Predicate<Port> UNSCHEDULABLE_PORTS = port -> ScheduledState.RUNNING.equals(port.getScheduledState()); + Predicate<ProcessorNode> DISABLE_PROCESSORS_FILTER = node -> !node.isRunning(); + + /** + * Predicate for starting eligible Ports. + */ + Predicate<Port> START_PORTS_FILTER = port -> !port.isRunning() && !ScheduledState.DISABLED.equals(port.getScheduledState()) && port.isValid(); + + /** + * Predicate for stopping eligible Ports. + */ + Predicate<Port> STOP_PORTS_FILTER = port -> ScheduledState.RUNNING.equals(port.getScheduledState()); + + /** + * Predicate for enabling eligible Processors. + */ + Predicate<Port> ENABLE_PORTS_FILTER = port -> ScheduledState.DISABLED.equals(port.getScheduledState()); + + /** + * Predicate for disabling eligible Ports. + */ + Predicate<Port> DISABLE_PORTS_FILTER = port -> !port.isRunning(); /** * @return a reference to this ProcessGroup's parent. This will be http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index d2167f4..94196d0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -412,7 +412,7 @@ public final class StandardProcessGroup implements ProcessGroup { public void startProcessing() { readLock.lock(); try { - findAllProcessors().stream().filter(SCHEDULABLE_PROCESSORS).forEach(node -> { + findAllProcessors().stream().filter(START_PROCESSORS_FILTER).forEach(node -> { try { node.getProcessGroup().startProcessor(node, true); } catch (final Throwable t) { @@ -420,11 +420,11 @@ public final class StandardProcessGroup implements ProcessGroup { } }); - findAllInputPorts().stream().filter(SCHEDULABLE_PORTS).forEach(port -> { + findAllInputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> { port.getProcessGroup().startInputPort(port); }); - findAllOutputPorts().stream().filter(SCHEDULABLE_PORTS).forEach(port -> { + findAllOutputPorts().stream().filter(START_PORTS_FILTER).forEach(port -> { port.getProcessGroup().startOutputPort(port); }); } finally { @@ -436,7 +436,7 @@ public final class StandardProcessGroup implements ProcessGroup { public void stopProcessing() { readLock.lock(); try { - findAllProcessors().stream().filter(UNSCHEDULABLE_PROCESSORS).forEach(node -> { + findAllProcessors().stream().filter(STOP_PROCESSORS_FILTER).forEach(node -> { try { node.getProcessGroup().stopProcessor(node); } catch (final Throwable t) { @@ -444,11 +444,11 @@ public final class StandardProcessGroup implements ProcessGroup { } }); - findAllInputPorts().stream().filter(UNSCHEDULABLE_PORTS).forEach(port -> { + findAllInputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> { port.getProcessGroup().stopInputPort(port); }); - findAllOutputPorts().stream().filter(UNSCHEDULABLE_PORTS).forEach(port -> { + findAllOutputPorts().stream().filter(STOP_PORTS_FILTER).forEach(port -> { port.getProcessGroup().stopOutputPort(port); }); } finally { http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java index 3ba5bea..3d92f5b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java @@ -197,6 +197,31 @@ public class ProcessGroupAuditor extends NiFiAuditor { return result; } + /** + * Audits the update of process group configuration. + * + * @param proceedingJoinPoint join point + * @param groupId group id + * @param state scheduled state + * @throws Throwable ex + */ + @Around("within(org.apache.nifi.web.dao.ProcessGroupDAO+) && " + + "execution(void enableComponents(String, org.apache.nifi.controller.ScheduledState, java.util.Set<String>)) && " + + "args(groupId, state, componentIds)") + public void enableComponentsAdvice(ProceedingJoinPoint proceedingJoinPoint, String groupId, ScheduledState state, Set<String> componentIds) throws Throwable { + final Operation operation; + + proceedingJoinPoint.proceed(); + + // determine the running state + if (ScheduledState.DISABLED.equals(state)) { + operation = Operation.Disable; + } else { + operation = Operation.Enable; + } + + saveUpdateAction(groupId, operation); + } /** * Audits the update of controller serivce state http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index ba813ff..e402e1c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -1028,6 +1028,8 @@ public interface NiFiServiceFacade { */ ActivateControllerServicesEntity activateControllerServices(String processGroupId, ControllerServiceState state, Map<String, Revision> serviceRevisions); + ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions); + /** * Schedules all applicable components under the specified ProcessGroup on behalf of the currently logged in user. * http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index ff52883..241c83d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -969,6 +969,38 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public ScheduleComponentsEntity enableComponents(String processGroupId, ScheduledState state, Map<String, Revision> componentRevisions) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new + UpdateRevisionTask<ScheduleComponentsEntity>() { + @Override + public RevisionUpdate<ScheduleComponentsEntity> update() { + // schedule the components + processGroupDAO.enableComponents(processGroupId, state, componentRevisions.keySet()); + + // update the revisions + final Map<String, Revision> updatedRevisions = new HashMap<>(); + for (final Revision revision : componentRevisions.values()) { + final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); + updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); + } + + // save + controllerFacade.save(); + + // gather details for response + final ScheduleComponentsEntity entity = new ScheduleComponentsEntity(); + entity.setId(processGroupId); + entity.setState(state.name()); + return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); + } + }); + + return updatedComponent.getComponent(); + } + + @Override public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index 60b811e..0382e8a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -34,6 +34,8 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; @@ -131,7 +133,12 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.nifi.web.api.entity.ScheduleComponentsEntity.STATE_DISABLED; +import static org.apache.nifi.web.api.entity.ScheduleComponentsEntity.STATE_ENABLED; /** * RESTful endpoint for managing a Flow. @@ -563,27 +570,61 @@ public class FlowResource extends ApplicationResource { if (requestScheduleComponentsEntity.getState() == null) { throw new IllegalArgumentException("The scheduled state must be specified."); } else { - try { - state = ScheduledState.valueOf(requestScheduleComponentsEntity.getState()); - } catch (final IllegalArgumentException iae) { - throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", "))); + if (requestScheduleComponentsEntity.getState().equals(STATE_ENABLED)) { + state = ScheduledState.STOPPED; + } else { + try { + state = ScheduledState.valueOf(requestScheduleComponentsEntity.getState()); + } catch (final IllegalArgumentException iae) { + throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", + StringUtils.join(Stream.of(ScheduledState.RUNNING, ScheduledState.STOPPED, STATE_ENABLED, ScheduledState.DISABLED), ", "))); + } } } // ensure its a supported scheduled state - if (ScheduledState.DISABLED.equals(state) || ScheduledState.STARTING.equals(state) || ScheduledState.STOPPING.equals(state)) { - throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", "))); + if (ScheduledState.STARTING.equals(state) || ScheduledState.STOPPING.equals(state)) { + throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", + StringUtils.join(Stream.of(ScheduledState.RUNNING, ScheduledState.STOPPED, STATE_ENABLED, ScheduledState.DISABLED), ", "))); } // if the components are not specified, gather all components and their current revision if (requestScheduleComponentsEntity.getComponents() == null) { + final Supplier<Predicate<ProcessorNode>> getProcessorFilter = () -> { + if (ScheduledState.RUNNING.equals(state)) { + return ProcessGroup.START_PROCESSORS_FILTER; + } else if (ScheduledState.STOPPED.equals(state)) { + if (requestScheduleComponentsEntity.getState().equals(STATE_ENABLED)) { + return ProcessGroup.ENABLE_PROCESSORS_FILTER; + } else { + return ProcessGroup.STOP_PROCESSORS_FILTER; + } + } else { + return ProcessGroup.DISABLE_PROCESSORS_FILTER; + } + }; + + final Supplier<Predicate<Port>> getPortFilter = () -> { + if (ScheduledState.RUNNING.equals(state)) { + return ProcessGroup.START_PORTS_FILTER; + } else if (ScheduledState.STOPPED.equals(state)) { + if (requestScheduleComponentsEntity.getState().equals(STATE_ENABLED)) { + return ProcessGroup.ENABLE_PORTS_FILTER; + } else { + return ProcessGroup.STOP_PORTS_FILTER; + } + } else { + return ProcessGroup.DISABLE_PORTS_FILTER; + } + }; + // get the current revisions for the components being updated final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(id, group -> { final Set<String> componentIds = new HashSet<>(); // ensure authorized for each processor we will attempt to schedule group.findAllProcessors().stream() - .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS) + .filter(getProcessorFilter.get()) .filter(processor -> processor.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) .forEach(processor -> { componentIds.add(processor.getIdentifier()); @@ -591,7 +632,7 @@ public class FlowResource extends ApplicationResource { // ensure authorized for each input port we will attempt to schedule group.findAllInputPorts().stream() - .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) + .filter(getPortFilter.get()) .filter(inputPort -> inputPort.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) .forEach(inputPort -> { componentIds.add(inputPort.getIdentifier()); @@ -599,7 +640,7 @@ public class FlowResource extends ApplicationResource { // ensure authorized for each output port we will attempt to schedule group.findAllOutputPorts().stream() - .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) + .filter(getPortFilter.get()) .filter(outputPort -> outputPort.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) .forEach(outputPort -> { componentIds.add(outputPort.getIdentifier()); @@ -646,14 +687,26 @@ public class FlowResource extends ApplicationResource { }, () -> serviceFacade.verifyScheduleComponents(id, state, requestComponentRevisions.keySet()), (revisions, scheduleComponentsEntity) -> { - final ScheduledState scheduledState = ScheduledState.valueOf(scheduleComponentsEntity.getState()); + + final ScheduledState scheduledState; + if (STATE_ENABLED.equals(scheduleComponentsEntity.getState())) { + scheduledState = ScheduledState.STOPPED; + } else { + scheduledState = ScheduledState.valueOf(scheduleComponentsEntity.getState()); + } final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents(); final Map<String, Revision> componentRevisions = componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey()))); // update the process group - final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions); + final ScheduleComponentsEntity entity; + if (STATE_ENABLED.equals(scheduleComponentsEntity.getState()) || STATE_DISABLED.equals(scheduleComponentsEntity.getState())) { + entity = serviceFacade.enableComponents(id, scheduledState, componentRevisions); + } else { + entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions); + } + return generateOkResponse(entity).build(); } ); http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java index 36fd7dc..5cf7ecb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java @@ -84,12 +84,22 @@ public interface ProcessGroupDAO { * * @param groupId id * @param state scheduled state + * @param componentIds components * * @return a Future that can be used to wait for the services to finish starting or stopping */ Future<Void> scheduleComponents(String groupId, ScheduledState state, Set<String> componentIds); /** + * Enables or disabled the components in the specified process group. + * + * @param groupId id + * @param state scheduled state + * @param componentIds components + */ + void enableComponents(String groupId, ScheduledState state, Set<String> componentIds); + + /** * Enables or disables the controller services in the specified process group * * @param groupId id http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index ca47764..265d128 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -171,7 +171,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou remotePort.getRemoteProcessGroup().startTransmitting(remotePort); break; } - } else { + } else if (ScheduledState.STOPPED.equals(state)) { switch (connectable.getConnectableType()) { case PROCESSOR: final CompletableFuture<?> processorFuture = connectable.getProcessGroup().stopProcessor((ProcessorNode) connectable); @@ -196,6 +196,40 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou } @Override + public void enableComponents(final String groupId, final ScheduledState state, final Set<String> componentIds) { + final ProcessGroup group = locateProcessGroup(flowController, groupId); + + for (final String componentId : componentIds) { + final Connectable connectable = group.findLocalConnectable(componentId); + if (ScheduledState.STOPPED.equals(state)) { + switch (connectable.getConnectableType()) { + case PROCESSOR: + connectable.getProcessGroup().enableProcessor((ProcessorNode) connectable); + break; + case INPUT_PORT: + connectable.getProcessGroup().enableInputPort((Port) connectable); + break; + case OUTPUT_PORT: + connectable.getProcessGroup().enableOutputPort((Port) connectable); + break; + } + } else if (ScheduledState.DISABLED.equals(state)) { + switch (connectable.getConnectableType()) { + case PROCESSOR: + connectable.getProcessGroup().disableProcessor((ProcessorNode) connectable); + break; + case INPUT_PORT: + connectable.getProcessGroup().disableInputPort((Port) connectable); + break; + case OUTPUT_PORT: + connectable.getProcessGroup().disableOutputPort((Port) connectable); + break; + } + } + } + } + + @Override public Future<Void> activateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) { final List<ControllerServiceNode> serviceNodes = serviceIds.stream() .map(flowController::getControllerServiceNode) http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/navigation.jsp ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/navigation.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/navigation.jsp index f122076..67bda8c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/navigation.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/navigation.jsp @@ -107,13 +107,13 @@ <div class="button-spacer-large"> </div> <div id="operate-enable" class="action-button" title="Enable"> <button ng-click="appCtrl.nf.Actions['enable'](appCtrl.nf.CanvasUtils.getSelection());" - ng-disabled="!appCtrl.nf.CanvasUtils.canEnable(appCtrl.nf.CanvasUtils.getSelection());"> + ng-disabled="!appCtrl.nf.CanvasUtils.getSelection().empty() && !appCtrl.nf.CanvasUtils.canModify(appCtrl.nf.CanvasUtils.getSelection());"> <div class="graph-control-action-icon fa fa-flash"></div></button> </div> <div class="button-spacer-small"> </div> <div id="operate-disable" class="action-button" title="Disable"> <button ng-click="appCtrl.nf.Actions['disable'](appCtrl.nf.CanvasUtils.getSelection());" - ng-disabled="!appCtrl.nf.CanvasUtils.canDisable(appCtrl.nf.CanvasUtils.getSelection());"> + ng-disabled="!appCtrl.nf.CanvasUtils.getSelection().empty() && !appCtrl.nf.CanvasUtils.canModify(appCtrl.nf.CanvasUtils.getSelection());"> <div class="graph-control-action-icon icon icon-enable-false"></div></button> </div> <div class="button-spacer-large"> </div> http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js index 2b09d14..41e945b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js @@ -502,39 +502,58 @@ * @argument {selection} selection The selection */ enable: function (selection) { - var componentsToEnable = nfCanvasUtils.filterEnable(selection); + if (selection.empty()) { + // build the entity + var entity = { + 'id': nfCanvasUtils.getGroupId(), + 'state': 'ENABLED' + }; - if (componentsToEnable.empty()) { - nfDialog.showOkDialog({ - headerText: 'Enable Components', - dialogContent: 'No eligible components are selected. Please select the components to be enabled and ensure they are no longer running.' - }); + updateResource(config.urls.api + '/flow/process-groups/' + encodeURIComponent(nfCanvasUtils.getGroupId()), entity).done(updateProcessGroup); } else { - var enableRequests = []; - - // enable the selected processors - componentsToEnable.each(function (d) { - var selected = d3.select(this); - - // build the entity - var entity = { - 'revision': nfClient.getRevision(d), - 'component': { - 'id': d.id, - 'state': 'STOPPED' - } - }; + var componentsToEnable = nfCanvasUtils.filterEnable(selection); - enableRequests.push(updateResource(d.uri, entity).done(function (response) { - nfCanvasUtils.getComponentByType(d.type).set(response); - })); - }); + if (!componentsToEnable.empty()) { + var enableRequests = []; - // inform Angular app once the updates have completed - if (enableRequests.length > 0) { - $.when.apply(window, enableRequests).always(function () { - nfNgBridge.digest(); + // enable the selected processors + componentsToEnable.each(function (d) { + var selected = d3.select(this); + + // prepare the request + var uri, entity; + if (nfCanvasUtils.isProcessGroup(selected)) { + uri = config.urls.api + '/flow/process-groups/' + encodeURIComponent(d.id); + entity = { + 'id': d.id, + 'state': 'ENABLED' + } + } else { + uri = d.uri; + entity = { + 'revision': nfClient.getRevision(d), + 'component': { + 'id': d.id, + 'state': 'STOPPED' + } + }; + } + + enableRequests.push(updateResource(uri, entity).done(function (response) { + if (nfCanvasUtils.isProcessGroup(selected)) { + nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id); + } else { + nfCanvasUtils.getComponentByType(d.type).set(response); + } + })); }); + + // inform Angular app once the updates have completed + if (enableRequests.length > 0) { + $.when.apply(window, enableRequests).always(function () { + nfNgBridge.digest(); + }); + } } } }, @@ -545,39 +564,58 @@ * @argument {selection} selection The selection */ disable: function (selection) { - var componentsToDisable = nfCanvasUtils.filterDisable(selection); + if (selection.empty()) { + // build the entity + var entity = { + 'id': nfCanvasUtils.getGroupId(), + 'state': 'DISABLED' + }; - if (componentsToDisable.empty()) { - nfDialog.showOkDialog({ - headerText: 'Disable Components', - dialogContent: 'No eligible components are selected. Please select the components to be disabled and ensure they are no longer running.' - }); + updateResource(config.urls.api + '/flow/process-groups/' + encodeURIComponent(nfCanvasUtils.getGroupId()), entity).done(updateProcessGroup); } else { - var disableRequests = []; - - // disable the selected components - componentsToDisable.each(function (d) { - var selected = d3.select(this); - - // build the entity - var entity = { - 'revision': nfClient.getRevision(d), - 'component': { - 'id': d.id, - 'state': 'DISABLED' - } - }; + var componentsToDisable = nfCanvasUtils.filterDisable(selection); - disableRequests.push(updateResource(d.uri, entity).done(function (response) { - nfCanvasUtils.getComponentByType(d.type).set(response); - })); - }); + if (!componentsToDisable.empty()) { + var disableRequests = []; - // inform Angular app once the updates have completed - if (disableRequests.length > 0) { - $.when.apply(window, disableRequests).always(function () { - nfNgBridge.digest(); + // disable the selected components + componentsToDisable.each(function (d) { + var selected = d3.select(this); + + // prepare the request + var uri, entity; + if (nfCanvasUtils.isProcessGroup(selected)) { + uri = config.urls.api + '/flow/process-groups/' + encodeURIComponent(d.id); + entity = { + 'id': d.id, + 'state': 'DISABLED' + } + } else { + uri = d.uri; + entity = { + 'revision': nfClient.getRevision(d), + 'component': { + 'id': d.id, + 'state': 'DISABLED' + } + }; + } + + disableRequests.push(updateResource(uri, entity).done(function (response) { + if (nfCanvasUtils.isProcessGroup(selected)) { + nfCanvasUtils.getComponentByType('ProcessGroup').reload(d.id); + } else { + nfCanvasUtils.getComponentByType(d.type).set(response); + } + })); }); + + // inform Angular app once the updates have completed + if (disableRequests.length > 0) { + $.when.apply(window, disableRequests).always(function () { + nfNgBridge.digest(); + }); + } } } }, @@ -618,12 +656,7 @@ }); // ensure there are startable components selected - if (componentsToStart.empty()) { - nfDialog.showOkDialog({ - headerText: 'Start Components', - dialogContent: 'No eligible components are selected. Please select the components to be started and ensure they are no longer running.' - }); - } else { + if (!componentsToStart.empty()) { var startRequests = []; // start each selected component @@ -688,12 +721,7 @@ }); // ensure there are some component to stop - if (componentsToStop.empty()) { - nfDialog.showOkDialog({ - headerText: 'Stop Components', - dialogContent: 'No eligible components are selected. Please select the components to be stopped.' - }); - } else { + if (!componentsToStop.empty()) { var stopRequests = []; // stop each selected component http://git-wip-us.apache.org/repos/asf/nifi/blob/6938e58c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js index 7a8cf65..d6785bb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas-utils.js @@ -1141,10 +1141,19 @@ var selected = d3.select(this); var selectedData = selected.datum(); + // enable always allowed for PGs since they will invoke the /flow endpoint for enabling all applicable components (based on permissions) + if (nfCanvasUtils.isProcessGroup(selected)) { + return true; + } + + // not a PG, verify permissions to modify + if (nfCanvasUtils.canModify(selected) === false) { + return false; + } + // ensure its a processor, input port, or output port and supports modification and is disabled (can enable) return ((nfCanvasUtils.isProcessor(selected) || nfCanvasUtils.isInputPort(selected) || nfCanvasUtils.isOutputPort(selected)) && - nfCanvasUtils.supportsModification(selected) && - selectedData.status.aggregateSnapshot.runStatus === 'Disabled'); + nfCanvasUtils.supportsModification(selected) && selectedData.status.aggregateSnapshot.runStatus === 'Disabled'); }); }, @@ -1155,11 +1164,7 @@ */ canEnable: function (selection) { if (selection.empty()) { - return false; - } - - if (nfCanvasUtils.canModify(selection) === false) { - return false; + return true; } return nfCanvasUtils.filterEnable(selection).size() === selection.size(); @@ -1175,11 +1180,20 @@ var selected = d3.select(this); var selectedData = selected.datum(); + // disable always allowed for PGs since they will invoke the /flow endpoint for disabling all applicable components (based on permissions) + if (nfCanvasUtils.isProcessGroup(selected)) { + return true; + } + + // not a PG, verify permissions to modify + if (nfCanvasUtils.canModify(selected) === false) { + return false; + } + // ensure its a processor, input port, or output port and supports modification and is stopped (can disable) return ((nfCanvasUtils.isProcessor(selected) || nfCanvasUtils.isInputPort(selected) || nfCanvasUtils.isOutputPort(selected)) && - nfCanvasUtils.supportsModification(selected) && - (selectedData.status.aggregateSnapshot.runStatus === 'Stopped' || - selectedData.status.aggregateSnapshot.runStatus === 'Invalid')); + nfCanvasUtils.supportsModification(selected) && + (selectedData.status.aggregateSnapshot.runStatus === 'Stopped' || selectedData.status.aggregateSnapshot.runStatus === 'Invalid')); }); }, @@ -1190,11 +1204,7 @@ */ canDisable: function (selection) { if (selection.empty()) { - return false; - } - - if (nfCanvasUtils.canModify(selection) === false) { - return false; + return true; } return nfCanvasUtils.filterDisable(selection).size() === selection.size();
