http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 36a9524..35686a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,31 @@ */ package org.apache.nifi.web; -import com.google.common.collect.Sets; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; import org.apache.nifi.action.FlowChangeAction; @@ -84,6 +108,7 @@ import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.history.History; import org.apache.nifi.history.HistoryQuery; import org.apache.nifi.history.PreviousValue; +import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; @@ -92,6 +117,7 @@ import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.AccessPolicyDTO; import org.apache.nifi.web.api.dto.AccessPolicySummaryDTO; +import org.apache.nifi.web.api.dto.AffectedComponentDTO; import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO; @@ -137,6 +163,7 @@ import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.UserDTO; import org.apache.nifi.web.api.dto.UserGroupDTO; +import org.apache.nifi.web.api.dto.VariableRegistryDTO; import org.apache.nifi.web.api.dto.action.HistoryDTO; import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; @@ -157,6 +184,7 @@ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.entity.AccessPolicyEntity; import org.apache.nifi.web.api.entity.AccessPolicySummaryEntity; import org.apache.nifi.web.api.entity.ActionEntity; +import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; import org.apache.nifi.web.api.entity.BulletinEntity; import org.apache.nifi.web.api.entity.ComponentReferenceEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; @@ -190,6 +218,8 @@ import org.apache.nifi.web.api.entity.TemplateEntity; import org.apache.nifi.web.api.entity.TenantEntity; import org.apache.nifi.web.api.entity.UserEntity; import org.apache.nifi.web.api.entity.UserGroupEntity; +import org.apache.nifi.web.api.entity.VariableEntity; +import org.apache.nifi.web.api.entity.VariableRegistryEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.dao.AccessPolicyDAO; import org.apache.nifi.web.dao.ConnectionDAO; @@ -217,28 +247,7 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; +import com.google.common.collect.Sets; /** * Implementation of NiFiServiceFacade that performs revision checking. @@ -423,6 +432,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) { + processGroupDAO.verifyActivateControllerServices(groupId, state, serviceIds); + } + + @Override public void verifyDeleteProcessGroup(final String groupId) { processGroupDAO.verifyDelete(groupId); } @@ -624,6 +638,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { */ private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); + return updateComponent(user, revision, authorizable, daoUpdate, dtoCreation); + } + + private <D, C> RevisionUpdate<D> updateComponent(final NiFiUser user, final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) { try { final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() { @Override @@ -774,6 +792,81 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public Set<AffectedComponentDTO> getComponentsAffectedByVariableRegistryUpdate(final VariableRegistryDTO variableRegistryDto) { + final ProcessGroup group = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); + if (group == null) { + throw new ResourceNotFoundException("Could not find Process Group with ID " + variableRegistryDto.getProcessGroupId()); + } + + final Map<String, String> variableMap = new HashMap<>(); + variableRegistryDto.getVariables().stream() // have to use forEach here instead of using Collectors.toMap because value may be null + .map(VariableEntity::getVariable) + .forEach(var -> variableMap.put(var.getName(), var.getValue())); + + final Set<AffectedComponentDTO> affectedComponentDtos = new HashSet<>(); + + final Set<String> updatedVariableNames = getUpdatedVariables(group, variableMap); + for (final String variableName : updatedVariableNames) { + final Set<ConfiguredComponent> affectedComponents = group.getComponentsAffectedByVariable(variableName); + + for (final ConfiguredComponent component : affectedComponents) { + if (component instanceof ProcessorNode) { + final ProcessorNode procNode = (ProcessorNode) component; + if (procNode.isRunning()) { + affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(procNode)); + } + } else if (component instanceof ControllerServiceNode) { + final ControllerServiceNode serviceNode = (ControllerServiceNode) component; + if (serviceNode.isActive()) { + affectedComponentDtos.add(dtoFactory.createAffectedComponentDto(serviceNode)); + } + } else { + throw new RuntimeException("Found unexpected type of Component [" + component.getCanonicalClassName() + "] dependending on variable"); + } + } + } + + return affectedComponentDtos; + } + + private Set<String> getUpdatedVariables(final ProcessGroup group, final Map<String, String> newVariableValues) { + final Set<String> updatedVariableNames = new HashSet<>(); + + final ComponentVariableRegistry registry = group.getVariableRegistry(); + for (final Map.Entry<String, String> entry : newVariableValues.entrySet()) { + final String varName = entry.getKey(); + final String newValue = entry.getValue(); + + final String curValue = registry.getVariableValue(varName); + if (!Objects.equals(newValue, curValue)) { + updatedVariableNames.add(varName); + } + } + + return updatedVariableNames; + } + + + @Override + public VariableRegistryEntity updateVariableRegistry(Revision revision, VariableRegistryDTO variableRegistryDto) { + return updateVariableRegistry(NiFiUserUtils.getNiFiUser(), revision, variableRegistryDto); + } + + @Override + public VariableRegistryEntity updateVariableRegistry(NiFiUser user, Revision revision, VariableRegistryDTO variableRegistryDto) { + final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(variableRegistryDto.getProcessGroupId()); + final RevisionUpdate<VariableRegistryDTO> snapshot = updateComponent(user, revision, + processGroupNode, + () -> processGroupDAO.updateVariableRegistry(variableRegistryDto), + processGroup -> dtoFactory.createVariableRegistryDto(processGroup)); + + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroupNode); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + return entityFactory.createVariableRegistryEntity(snapshot.getComponent(), updatedRevision, permissions); + } + + + @Override public ProcessGroupEntity updateProcessGroup(final Revision revision, final ProcessGroupDTO processGroupDTO) { final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId()); final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(revision, @@ -790,14 +883,27 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) { + if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) { + processGroupDAO.verifyUpdate(processGroupDTO); + } + } + + @Override public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); + return scheduleComponents(user, processGroupId, state, componentRevisions); + } + + @Override + public ScheduleComponentsEntity scheduleComponents(final NiFiUser user, final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) { + final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new UpdateRevisionTask<ScheduleComponentsEntity>() { @Override public RevisionUpdate<ScheduleComponentsEntity> update() { // schedule the components - processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet()); + processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet()); // update the revisions final Map<String, Revision> updatedRevisions = new HashMap<>(); @@ -821,6 +927,46 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public ActivateControllerServicesEntity activateControllerServices(final String processGroupId, final ControllerServiceState state, final Map<String, Revision> serviceRevisions) { + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + return activateControllerServices(user, processGroupId, state, serviceRevisions); + } + + @Override + public ActivateControllerServicesEntity activateControllerServices(final NiFiUser user, final String processGroupId, final ControllerServiceState state, + final Map<String, Revision> serviceRevisions) { + + final RevisionUpdate<ActivateControllerServicesEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(serviceRevisions.values()), user, + new UpdateRevisionTask<ActivateControllerServicesEntity>() { + @Override + public RevisionUpdate<ActivateControllerServicesEntity> update() { + // schedule the components + processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet()); + + // update the revisions + final Map<String, Revision> updatedRevisions = new HashMap<>(); + for (final Revision revision : serviceRevisions.values()) { + final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); + updatedRevisions.put(revision.getComponentId(), currentRevision.incrementRevision(revision.getClientId())); + } + + // save + controllerFacade.save(); + + // gather details for response + final ActivateControllerServicesEntity entity = new ActivateControllerServicesEntity(); + entity.setId(processGroupId); + entity.setState(state.name()); + return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); + } + }); + + return updatedComponent.getComponent(); + } + + + @Override public ControllerConfigurationEntity updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) { final RevisionUpdate<ControllerConfigurationDTO> updatedComponent = updateComponent( revision, @@ -3062,7 +3208,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createProcessGroupEntity(processGroup); } - private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds) { + private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds, final NiFiUser user) { final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(serviceNode); final ControllerServiceReference ref = serviceNode.getReferences(); @@ -3070,26 +3216,77 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents()); final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier())); - final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode, user); final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(serviceNode.getIdentifier())); final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList()); return entityFactory.createControllerServiceEntity(dto, revision, permissions, bulletinEntities); } @Override - public Set<ControllerServiceEntity> getControllerServices(final String groupId) { - final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId); + public VariableRegistryEntity getVariableRegistry(final String groupId, final boolean includeAncestorGroups) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + if (processGroup == null) { + throw new ResourceNotFoundException("Could not find group with ID " + groupId); + } + + return createVariableRegistryEntity(processGroup, includeAncestorGroups); + } + + private VariableRegistryEntity createVariableRegistryEntity(final ProcessGroup processGroup, final boolean includeAncestorGroups) { + final VariableRegistryDTO registryDto = dtoFactory.createVariableRegistryDto(processGroup); + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier())); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); + + if (includeAncestorGroups) { + ProcessGroup parent = processGroup.getParent(); + while (parent != null) { + final PermissionsDTO parentPerms = dtoFactory.createPermissionsDto(processGroup); + if (Boolean.TRUE.equals(parentPerms.getCanRead())) { + final VariableRegistryDTO parentRegistryDto = dtoFactory.createVariableRegistryDto(parent); + final Set<VariableEntity> parentVariables = parentRegistryDto.getVariables(); + registryDto.getVariables().addAll(parentVariables); + } + + parent = parent.getParent(); + } + } + + return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions); + } + + @Override + public VariableRegistryEntity populateAffectedComponents(final VariableRegistryDTO variableRegistryDto) { + final String groupId = variableRegistryDto.getProcessGroupId(); + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + if (processGroup == null) { + throw new ResourceNotFoundException("Could not find group with ID " + groupId); + } + + final VariableRegistryDTO registryDto = dtoFactory.populateAffectedComponents(variableRegistryDto, processGroup); + final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processGroup.getIdentifier())); + final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processGroup); + return entityFactory.createVariableRegistryEntity(registryDto, revision, permissions); + } + + @Override + public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) { + return getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups, NiFiUserUtils.getNiFiUser()); + } + + @Override + public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups, final NiFiUser user) { + final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups); final Set<String> serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet()); return serviceNodes.stream() - .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds)) + .map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds, user)) .collect(Collectors.toSet()); } @Override public ControllerServiceEntity getControllerService(final String controllerServiceId) { final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); - return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId)); + return createControllerServiceEntity(controllerService, Sets.newHashSet(controllerServiceId), NiFiUserUtils.getNiFiUser()); } @Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 3d78741..1a50d04 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -608,6 +608,11 @@ public abstract class ApplicationResource { serviceFacade.authorizeAccess(authorizer); serviceFacade.verifyRevision(revision, user); + // verify if necessary + if (verifier != null) { + verifier.run(); + } + return action.apply(revision, entity); } } @@ -657,6 +662,11 @@ public abstract class ApplicationResource { serviceFacade.authorizeAccess(authorizer); serviceFacade.verifyRevisions(revisions, user); + // verify if necessary + if (verifier != null) { + verifier.run(); + } + return action.apply(revisions, entity); } } @@ -820,16 +830,16 @@ public abstract class ApplicationResource { } } - /** - * Replicates the request to the given node - * - * @param method the HTTP method - * @param nodeUuid the UUID of the node to replicate the request to - * @return the response from the node - * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster - */ - protected Response replicate(final String method, final String nodeUuid) { - return replicate(method, getRequestParameters(), nodeUuid); + + private void ensureFlowInitialized() { + if (!flowController.isInitialized()) { + throw new IllegalClusterStateException("Cluster is still in the process of voting on the appropriate Data Flow."); + } + } + + protected Response replicate(final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) { + final URI path = getAbsolutePath(); + return replicate(path, method, entity, nodeUuid, headersToOverride); } /** @@ -845,22 +855,16 @@ public abstract class ApplicationResource { return replicate(method, entity, nodeUuid, null); } - private void ensureFlowInitialized() { - if (!flowController.isInitialized()) { - throw new IllegalClusterStateException("Cluster is still in the process of voting on the appropriate Data Flow."); - } - } - /** * Replicates the request to the given node * - * @param method the HTTP method - * @param entity the Entity to replicate + * @param method the HTTP method + * @param entity the Entity to replicate * @param nodeUuid the UUID of the node to replicate the request to * @return the response from the node * @throws UnknownNodeException if the nodeUuid given does not map to any node in the cluster */ - protected Response replicate(final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) { + protected Response replicate(final URI path, final String method, final Object entity, final String nodeUuid, final Map<String, String> headersToOverride) { // since we're cluster we must specify the cluster node identifier if (nodeUuid == null) { throw new IllegalArgumentException("The cluster node identifier must be specified."); @@ -873,7 +877,6 @@ public abstract class ApplicationResource { ensureFlowInitialized(); - final URI path = getAbsolutePath(); try { final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride); @@ -996,6 +999,12 @@ public abstract class ApplicationResource { } } + + protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException { + final URI path = getAbsolutePath(); + return replicateNodeResponse(path, method, entity, headersToOverride); + } + /** * Replicates the request to all nodes in the cluster using the provided method and entity. The headers * used will be those provided by the {@link #getHeaders()} method. The URI that will be used will be @@ -1009,10 +1018,9 @@ public abstract class ApplicationResource { * @throws InterruptedException if interrupted while replicating the request * @see #replicate(String, Object, Map) */ - protected NodeResponse replicateNodeResponse(final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException { + protected NodeResponse replicateNodeResponse(final URI path, final String method, final Object entity, final Map<String, String> headersToOverride) throws InterruptedException { ensureFlowInitialized(); - final URI path = getAbsolutePath(); final Map<String, String> headers = headersToOverride == null ? getHeaders() : getHeaders(headersToOverride); // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index 889676c..5d5a796 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -36,6 +36,8 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.nar.NarClassLoaders; import org.apache.nifi.util.NiFiProperties; @@ -61,6 +63,7 @@ import org.apache.nifi.web.api.dto.search.SearchResultsDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.entity.AboutEntity; import org.apache.nifi.web.api.entity.ActionEntity; +import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; import org.apache.nifi.web.api.entity.BannerEntity; import org.apache.nifi.web.api.entity.BulletinBoardEntity; import org.apache.nifi.web.api.entity.ClusteSummaryEntity; @@ -118,6 +121,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -388,7 +392,7 @@ public class FlowResource extends ApplicationResource { } // get all the controller services - final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(null); + final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(null, false, false); controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices); // create the response entity @@ -426,11 +430,10 @@ public class FlowResource extends ApplicationResource { } ) public Response getControllerServicesFromGroup( - @ApiParam( - value = "The process group id.", - required = true - ) - @PathParam("id") String groupId) throws InterruptedException { + @ApiParam(value = "The process group id.", required = true) @PathParam("id") String groupId, + @ApiParam("Whether or not to include parent/ancestory process groups") @QueryParam("includeAncestorGroups") @DefaultValue("true") boolean includeAncestorGroups, + @ApiParam("Whether or not to include descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups + ) throws InterruptedException { authorizeFlow(); @@ -439,7 +442,7 @@ public class FlowResource extends ApplicationResource { } // get all the controller services - final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId); + final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups); controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices); // create the response entity @@ -512,7 +515,7 @@ public class FlowResource extends ApplicationResource { @Produces(MediaType.APPLICATION_JSON) @Path("process-groups/{id}") @ApiOperation( - value = "Schedule or unschedule comopnents in the specified Process Group.", + value = "Schedule or unschedule components in the specified Process Group.", response = ScheduleComponentsEntity.class, authorizations = { @Authorization(value = "Read - /flow", type = ""), @@ -570,7 +573,7 @@ public class FlowResource extends ApplicationResource { // ensure authorized for each processor we will attempt to schedule group.findAllProcessors().stream() - .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS) + .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PROCESSORS : ProcessGroup.UNSCHEDULABLE_PROCESSORS) .filter(processor -> processor.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) .forEach(processor -> { componentIds.add(processor.getIdentifier()); @@ -578,7 +581,7 @@ public class FlowResource extends ApplicationResource { // ensure authorized for each input port we will attempt to schedule group.findAllInputPorts().stream() - .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) + .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) .filter(inputPort -> inputPort.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) .forEach(inputPort -> { componentIds.add(inputPort.getIdentifier()); @@ -586,7 +589,7 @@ public class FlowResource extends ApplicationResource { // ensure authorized for each output port we will attempt to schedule group.findAllOutputPorts().stream() - .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) + .filter(ScheduledState.RUNNING.equals(state) ? ProcessGroup.SCHEDULABLE_PORTS : ProcessGroup.UNSCHEDULABLE_PORTS) .filter(outputPort -> outputPort.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) .forEach(outputPort -> { componentIds.add(outputPort.getIdentifier()); @@ -640,7 +643,129 @@ public class FlowResource extends ApplicationResource { componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey()))); // update the process group - final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions); + final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions); + return generateOkResponse(entity).build(); + } + ); + } + + + @PUT + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("process-groups/{id}/controller-services") + @ApiOperation(value = "Enable or disable Controller Services in the specified Process Group.", + response = ActivateControllerServicesEntity.class, + authorizations = { + @Authorization(value = "Read - /flow", type = ""), + @Authorization(value = "Write - /{component-type}/{uuid} - For every service being enabled/disabled", type = "") + }) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response activateControllerServices( + @Context HttpServletRequest httpServletRequest, + @ApiParam(value = "The process group id.", required = true) + @PathParam("id") String id, + @ApiParam(value = "The request to schedule or unschedule. If the comopnents in the request are not specified, all authorized components will be considered.", required = true) + final ActivateControllerServicesEntity requestEntity) { + + // ensure the same id is being used + if (!id.equals(requestEntity.getId())) { + throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " + + "not equal the process group id of the requested resource (%s).", requestEntity.getId(), id)); + } + + final ControllerServiceState state; + if (requestEntity.getState() == null) { + throw new IllegalArgumentException("The controller service state must be specified."); + } else { + try { + state = ControllerServiceState.valueOf(requestEntity.getState()); + } catch (final IllegalArgumentException iae) { + throw new IllegalArgumentException(String.format("The controller service state must be one of [%s].", + StringUtils.join(EnumSet.of(ControllerServiceState.ENABLED, ControllerServiceState.DISABLED), ", "))); + } + } + + // ensure its a supported scheduled state + if (ControllerServiceState.DISABLING.equals(state) || ControllerServiceState.ENABLING.equals(state)) { + throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", + StringUtils.join(EnumSet.of(ControllerServiceState.ENABLED, ControllerServiceState.DISABLED), ", "))); + } + + // if the components are not specified, gather all components and their current revision + if (requestEntity.getComponents() == null) { + // get the current revisions for the components being updated + final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(id, group -> { + final Set<String> componentIds = new HashSet<>(); + + final Predicate<ControllerServiceNode> filter; + if (ControllerServiceState.ENABLED.equals(state)) { + filter = service -> !service.isActive() && service.isValid(); + } else { + filter = service -> service.isActive(); + } + + group.findAllControllerServices().stream() + .filter(filter) + .filter(service -> service.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())) + .forEach(service -> componentIds.add(service.getIdentifier())); + return componentIds; + }); + + // build the component mapping + final Map<String, RevisionDTO> componentsToSchedule = new HashMap<>(); + revisions.forEach(revision -> { + final RevisionDTO dto = new RevisionDTO(); + dto.setClientId(revision.getClientId()); + dto.setVersion(revision.getVersion()); + componentsToSchedule.put(revision.getComponentId(), dto); + }); + + // set the components and their current revision + requestEntity.setComponents(componentsToSchedule); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.PUT, requestEntity); + } + + final Map<String, RevisionDTO> requestComponentsToSchedule = requestEntity.getComponents(); + final Map<String, Revision> requestComponentRevisions = + requestComponentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey()))); + final Set<Revision> requestRevisions = new HashSet<>(requestComponentRevisions.values()); + + return withWriteLock( + serviceFacade, + requestEntity, + requestRevisions, + lookup -> { + // ensure access to the flow + authorizeFlow(); + + // ensure access to every component being scheduled + requestComponentsToSchedule.keySet().forEach(componentId -> { + final Authorizable authorizable = lookup.getControllerService(componentId).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }); + }, + () -> serviceFacade.verifyActivateControllerServices(id, state, requestComponentRevisions.keySet()), + (revisions, scheduleComponentsEntity) -> { + final ControllerServiceState serviceState = ControllerServiceState.valueOf(scheduleComponentsEntity.getState()); + + final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents(); + final Map<String, Revision> componentRevisions = + componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey()))); + + // update the controller services + final ActivateControllerServicesEntity entity = serviceFacade.activateControllerServices(id, serviceState, componentRevisions); return generateOkResponse(entity).build(); } );
