http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index a9167ae..0b634a1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -16,14 +16,54 @@ */ package org.apache.nifi.web.api; -import com.sun.jersey.api.core.ResourceContext; -import com.sun.jersey.multipart.FormDataParam; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriInfo; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.stream.StreamSource; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AuthorizableLookup; import org.apache.nifi.authorization.AuthorizeControllerServiceReference; @@ -38,26 +78,41 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.serialization.FlowEncodingVersion; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.framework.security.util.SslContextFactory; +import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; +import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; import org.apache.nifi.remote.util.SiteToSiteRestApiClient; import org.apache.nifi.util.BundleUtils; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.Revision; +import org.apache.nifi.web.api.dto.AffectedComponentDTO; import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.TemplateDTO; +import org.apache.nifi.web.api.dto.VariableRegistryDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO; +import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionsEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity; import org.apache.nifi.web.api.entity.FlowEntity; @@ -70,47 +125,36 @@ import org.apache.nifi.web.api.entity.LabelsEntity; import org.apache.nifi.web.api.entity.OutputPortsEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; +import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessGroupsEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorsEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; +import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; import org.apache.nifi.web.api.entity.TemplateEntity; +import org.apache.nifi.web.api.entity.VariableRegistryEntity; +import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.LongParameter; +import org.apache.nifi.web.util.Pause; +import org.apache.nifi.web.util.WebUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriInfo; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; -import javax.xml.transform.stream.StreamSource; -import java.io.InputStream; -import java.net.URI; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.core.ResourceContext; +import com.sun.jersey.multipart.FormDataParam; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing a Group. @@ -139,6 +183,22 @@ public class ProcessGroupResource extends ApplicationResource { private ConnectionResource connectionResource; private TemplateResource templateResource; private ControllerServiceResource controllerServiceResource; + private DtoFactory dtoFactory; + + private final ConcurrentMap<String, VariableRegistryUpdateRequest> varRegistryUpdateRequests = new ConcurrentHashMap<>(); + private static final int MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS = 100; + private static final long VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION = TimeUnit.MINUTES.toMillis(1L); + private final ExecutorService variableRegistryThreadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS, + new ArrayBlockingQueue<Runnable>(MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS), + new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + final Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setName("Variable Registry Update Thread"); + thread.setDaemon(true); + return thread; + } + }); /** * Populates the remaining fields in the specified process groups. @@ -164,6 +224,7 @@ public class ProcessGroupResource extends ApplicationResource { return processGroupEntity; } + /** * Populates the remaining content of the specified snippet. */ @@ -238,6 +299,49 @@ public class ProcessGroupResource extends ApplicationResource { return generateOkResponse(entity).build(); } + + /** + * Retrieves the Variable Registry for the group with the given ID + * + * @param groupId the ID of the Process Group + * @return the Variable Registry for the group + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/variable-registry") + @ApiOperation(value = "Gets a process group's variable registry", + response = VariableRegistryEntity.class, + notes = NON_GUARANTEED_ENDPOINT, + authorizations = { + @Authorization(value = "Read - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response getVariableRegistry( + @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, + @ApiParam(value = "Whether or not to include ancestor groups", required = false) @QueryParam("includeAncestorGroups") @DefaultValue("true") final boolean includeAncestorGroups) { + + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + // get this process group's variable registry + final VariableRegistryEntity entity = serviceFacade.getVariableRegistry(groupId, includeAncestorGroups); + return generateOkResponse(entity).build(); + } + /** * Updates the specified process group. * @@ -314,7 +418,7 @@ public class ProcessGroupResource extends ApplicationResource { Authorizable authorizable = lookup.getProcessGroup(id).getAuthorizable(); authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, - null, + () -> serviceFacade.verifyUpdateProcessGroup(requestProcessGroupDTO), (revision, processGroupEntity) -> { // update the process group final ProcessGroupEntity entity = serviceFacade.updateProcessGroup(revision, processGroupEntity.getComponent()); @@ -325,6 +429,854 @@ public class ProcessGroupResource extends ApplicationResource { ); } + + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{groupId}/variable-registry/update-requests/{updateId}") + @ApiOperation(value = "Gets a process group's variable registry", + response = VariableRegistryUpdateRequestEntity.class, + notes = NON_GUARANTEED_ENDPOINT, + authorizations = { + @Authorization(value = "Read - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response getVariableRegistryUpdateRequest( + @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId, + @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) { + + if (groupId == null || updateId == null) { + throw new IllegalArgumentException("Group ID and Update ID must both be specified."); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }); + + final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.get(updateId); + if (request == null) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId); + } + + if (!groupId.equals(request.getProcessGroupId())) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId); + } + + final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity(); + entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request)); + entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); + return generateOkResponse(entity).build(); + } + + + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("{groupId}/variable-registry/update-requests/{updateId}") + @ApiOperation(value = "Deletes an update request for a process group's variable registry. If the request is not yet complete, it will automatically be cancelled.", + response = VariableRegistryUpdateRequestEntity.class, + notes = NON_GUARANTEED_ENDPOINT, + authorizations = { + @Authorization(value = "Read - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response deleteVariableRegistryUpdateRequest( + @ApiParam(value = "The process group id.", required = true) @PathParam("groupId") final String groupId, + @ApiParam(value = "The ID of the Variable Registry Update Request", required = true) @PathParam("updateId") final String updateId) { + + if (groupId == null || updateId == null) { + throw new IllegalArgumentException("Group ID and Update ID must both be specified."); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.DELETE); + } + + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }); + + final VariableRegistryUpdateRequest request = varRegistryUpdateRequests.remove(updateId); + if (request == null) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId); + } + + if (!groupId.equals(request.getProcessGroupId())) { + throw new ResourceNotFoundException("Could not find a Variable Registry Update Request with identifier " + updateId + " for Process Group with identifier " + groupId); + } + + request.cancel(); + + final VariableRegistryUpdateRequestEntity entity = new VariableRegistryUpdateRequestEntity(); + entity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(request)); + entity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", updateId)); + return generateOkResponse(entity).build(); + } + + + @PUT + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/variable-registry") + @ApiOperation(value = "Updates the contents of a Process Group's variable Registry", response = VariableRegistryEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { + @Authorization(value = "Write - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response updateVariableRegistry( + @Context final HttpServletRequest httpServletRequest, + @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, + @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { + + if (requestEntity == null || requestEntity.getVariableRegistry() == null) { + throw new IllegalArgumentException("Variable Registry details must be specified."); + } + + if (requestEntity.getProcessGroupRevision() == null) { + throw new IllegalArgumentException("Process Group Revision must be specified."); + } + + // ensure the same id is being used + final VariableRegistryDTO registryDto = requestEntity.getVariableRegistry(); + if (!groupId.equals(registryDto.getProcessGroupId())) { + throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " + + "not equal the process group id of the requested resource (%s).", registryDto.getProcessGroupId(), groupId)); + } + + if (isReplicateRequest()) { + return replicate(HttpMethod.PUT, requestEntity); + } + + // handle expects request (usually from the cluster manager) + final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); + return withWriteLock( + serviceFacade, + requestEntity, + requestRevision, + lookup -> { + Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + (revision, processGroupEntity) -> { + // update the process group + final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, registryDto); + return generateOkResponse(entity).build(); + }); + } + + + /** + * Updates the variable registry for the specified process group. + * + * @param httpServletRequest request + * @param groupId The id of the process group. + * @param requestEntity the Variable Registry Entity + * @return A Variable Registry Entry. + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}/variable-registry/update-requests") + @ApiOperation(value = "Submits a request to update a process group's variable registry", + response = VariableRegistryUpdateRequestEntity.class, + notes = NON_GUARANTEED_ENDPOINT, + authorizations = { + @Authorization(value = "Write - /process-groups/{uuid}", type = "") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response submitUpdateVariableRegistryRequest( + @Context final HttpServletRequest httpServletRequest, + @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, + @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { + + if (requestEntity == null || requestEntity.getVariableRegistry() == null) { + throw new IllegalArgumentException("Variable Registry details must be specified."); + } + + if (requestEntity.getProcessGroupRevision() == null) { + throw new IllegalArgumentException("Process Group Revision must be specified."); + } + + // In order to update variables in a variable registry, we have to perform the following steps: + // 1. Determine Affected Components (this includes any Processors and Controller Services and any components that reference an affected Controller Service). + // 1a. Determine ID's of components + // 1b. Determine Revision's of associated components + // 2. Stop All Affected Processors + // 3. Disable All Affected Controller Services + // 4. Update the Variables + // 5. Re-Enable all Affected Controller Services (services only, not dependent components) + // 6. Re-Enable all Processors that Depended on the Controller Services + + // Determine the affected components (and their associated revisions) + final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry()); + final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry(); + if (computedRegistryDto == null) { + throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); + } + + final Set<AffectedComponentDTO> affectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); + + final Map<String, List<AffectedComponentDTO>> affectedComponentsByType = affectedComponents.stream() + .collect(Collectors.groupingBy(comp -> comp.getComponentType())); + + final List<AffectedComponentDTO> affectedProcessors = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); + final List<AffectedComponentDTO> affectedServices = affectedComponentsByType.get(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + + + if (isReplicateRequest()) { + // update the variable registry + final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId); + updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); + final URI originalUri = getAbsolutePath(); + + // Submit the task to be run in the background + final Runnable taskWrapper = () -> { + try { + updateVariableRegistryReplicated(groupId, originalUri, affectedProcessors, affectedServices, updateRequest, requestEntity); + } catch (final Exception e) { + logger.error("Failed to update variable registry", e); + updateRequest.setFailureReason("An unexpected error has occurred: " + e); + } + }; + + variableRegistryThreadPool.submit(taskWrapper); + + final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity(); + responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest)); + responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId())); + + final URI location = URI.create(responseEntity.getRequestDto().getUri()); + return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build(); + } + + + final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); + return withWriteLock( + serviceFacade, + requestEntity, + requestRevision, + lookup -> { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final Authorizable groupAuthorizable = lookup.getProcessGroup(groupId).getAuthorizable(); + groupAuthorizable.authorize(authorizer, RequestAction.WRITE, user); + + // For every component that is affected, the user must have READ permissions and WRITE permissions + // (because this action requires stopping the component). + if (affectedProcessors != null) { + for (final AffectedComponentDTO affectedComponent : affectedProcessors) { + final Authorizable authorizable = lookup.getProcessor(affectedComponent.getComponentId()).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.READ, user); + authorizable.authorize(authorizer, RequestAction.WRITE, user); + } + } + + if (affectedServices != null) { + for (final AffectedComponentDTO affectedComponent : affectedServices) { + final Authorizable authorizable = lookup.getControllerService(affectedComponent.getComponentId()).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.READ, user); + authorizable.authorize(authorizer, RequestAction.WRITE, user); + } + } + }, + null, + (revision, varRegistryEntity) -> { + return updateVariableRegistryLocal(groupId, affectedProcessors, affectedServices, requestEntity); + }); + } + + private Pause createPause(final VariableRegistryUpdateRequest updateRequest) { + return new Pause() { + @Override + public boolean pause() { + if (updateRequest.isComplete()) { + return false; + } + + try { + Thread.sleep(500); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return false; + } + + return !updateRequest.isComplete(); + } + }; + } + + private void updateVariableRegistryReplicated(final String groupId, final URI originalUri, final Collection<AffectedComponentDTO> affectedProcessors, + final Collection<AffectedComponentDTO> affectedServices, + final VariableRegistryUpdateRequest updateRequest, final VariableRegistryEntity requestEntity) { + + final NiFiProperties properties = getProperties(); + final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties)); + final int connectionTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeConnectionTimeout(), TimeUnit.MILLISECONDS); + final int readTimeout = (int) FormatUtils.getTimeDuration(properties.getClusterNodeReadTimeout(), TimeUnit.MILLISECONDS); + jerseyClient.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeout); + jerseyClient.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, readTimeout); + jerseyClient.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE); + + final Pause pause = createPause(updateRequest); + + // stop processors + if (affectedProcessors != null) { + logger.info("In order to update Variable Registry for Process Group with ID {}, " + + "replicating request to stop {} affected processors", groupId, affectedProcessors.size()); + + scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause, + affectedProcessors, ScheduledState.STOPPED, updateRequest.getStopProcessorsStep()); + } + + // disable controller services + if (affectedServices != null) { + logger.info("In order to update Variable Registry for Process Group with ID {}, " + + "replicating request to stop {} affected Controller Services", groupId, affectedServices.size()); + + activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause, + affectedServices, ControllerServiceState.DISABLED, updateRequest.getDisableServicesStep()); + } + + // apply updates + logger.info("In order to update Variable Registry for Process Group with ID {}, " + + "replicating request to apply updates to variable registry", groupId); + applyVariableRegistryUpdate(groupId, originalUri, jerseyClient, updateRequest, requestEntity); + + // re-enable controller services + if (affectedServices != null) { + logger.info("In order to update Variable Registry for Process Group with ID {}, " + + "replicating request to re-enable {} affected services", groupId, affectedServices.size()); + + activateControllerServices(groupId, originalUri, jerseyClient, updateRequest, pause, + affectedServices, ControllerServiceState.ENABLED, updateRequest.getEnableServicesStep()); + } + + // restart processors + if (affectedProcessors != null) { + logger.info("In order to update Variable Registry for Process Group with ID {}, " + + "replicating request to restart {} affected processors", groupId, affectedProcessors.size()); + + scheduleProcessors(groupId, originalUri, jerseyClient, updateRequest, pause, + affectedProcessors, ScheduledState.RUNNING, updateRequest.getStartProcessorsStep()); + } + + updateRequest.setComplete(true); + } + + /** + * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State. + * + * @param client the Jersey Client to use for making the request + * @param groupId the ID of the Process Group to poll + * @param processorIds the ID of all Processors whose state should be equal to the given desired state + * @param desiredState the desired state for all processors with the ID's given + * @param pause the Pause that can be used to wait between polling + * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state + */ + private boolean waitForProcessorStatus(final Client client, final URI originalUri, final String groupId, final Set<String> processorIds, final ScheduledState desiredState, final Pause pause) { + URI groupUri; + try { + groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/status", "recursive=true", originalUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + boolean continuePolling = true; + while (continuePolling) { + final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class); + if (response.getStatus() != Status.OK.getStatusCode()) { + return false; + } + + final ProcessGroupStatusEntity statusEntity = response.getEntity(ProcessGroupStatusEntity.class); + final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus(); + final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot(); + + if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) { + logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState); + return true; + } + + // Not all of the processors are in the desired state. Pause for a bit and poll again. + continuePolling = pause.pause(); + } + + return false; + } + + /** + * Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State. + * + * @param groupId the ID of the Process Group to poll + * @param processorIds the ID of all Processors whose state should be equal to the given desired state + * @param desiredState the desired state for all processors with the ID's given + * @param pause the Pause that can be used to wait between polling + * @return <code>true</code> if successful, <code>false</code> if unable to wait for processors to reach the desired state + */ + private boolean waitForLocalProcessorStatus(final String groupId, final Set<String> processorIds, final ScheduledState desiredState, final Pause pause) { + boolean continuePolling = true; + while (continuePolling) { + final ProcessGroupStatusEntity statusEntity = serviceFacade.getProcessGroupStatus(groupId, true); + final ProcessGroupStatusDTO statusDto = statusEntity.getProcessGroupStatus(); + final ProcessGroupStatusSnapshotDTO statusSnapshotDto = statusDto.getAggregateSnapshot(); + + if (isProcessorStatusEqual(statusSnapshotDto, processorIds, desiredState)) { + logger.debug("All {} processors of interest now have the desired state of {}", processorIds.size(), desiredState); + return true; + } + + // Not all of the processors are in the desired state. Pause for a bit and poll again. + continuePolling = pause.pause(); + } + + return false; + } + + private boolean isProcessorStatusEqual(final ProcessGroupStatusSnapshotDTO statusSnapshot, final Set<String> processorIds, final ScheduledState desiredState) { + final String desiredStateName = desiredState.name(); + + final boolean allProcessorsMatch = statusSnapshot.getProcessorStatusSnapshots().stream() + .map(entity -> entity.getProcessorStatusSnapshot()) + .filter(status -> processorIds.contains(status.getId())) + .allMatch(status -> { + final String runStatus = status.getRunStatus(); + final boolean stateMatches = desiredStateName.equalsIgnoreCase(runStatus); + if (!stateMatches) { + return false; + } + + if (desiredState == ScheduledState.STOPPED && status.getActiveThreadCount() != 0) { + return false; + } + + return true; + }); + + if (!allProcessorsMatch) { + return false; + } + + for (final ProcessGroupStatusSnapshotEntity childGroupEntity : statusSnapshot.getProcessGroupStatusSnapshots()) { + final ProcessGroupStatusSnapshotDTO childGroupStatus = childGroupEntity.getProcessGroupStatusSnapshot(); + final boolean allMatchChildLevel = isProcessorStatusEqual(childGroupStatus, processorIds, desiredState); + if (!allMatchChildLevel) { + return false; + } + } + + return true; + } + + + + /** + * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State. + * + * @param client the Jersey Client to use for making the HTTP Request + * @param groupId the ID of the Process Group to poll + * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state + * @param desiredState the desired state for all services with the ID's given + * @param pause the Pause that can be used to wait between polling + * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state + */ + private boolean waitForControllerServiceStatus(final Client client, final URI originalUri, final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState, + final Pause pause) { + URI groupUri; + try { + groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false,includeDescendantGroups=true", originalUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + boolean continuePolling = true; + while (continuePolling) { + final ClientResponse response = client.resource(groupUri).header("Content-Type", "application/json").get(ClientResponse.class); + if (response.getStatus() != Status.OK.getStatusCode()) { + return false; + } + + final ControllerServicesEntity controllerServicesEntity = response.getEntity(ControllerServicesEntity.class); + final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices(); + + final String desiredStateName = desiredState.name(); + final boolean allServicesMatch = serviceEntities.stream() + .map(entity -> entity.getComponent()) + .filter(service -> serviceIds.contains(service.getId())) + .map(service -> service.getState()) + .allMatch(state -> state.equals(desiredStateName)); + + if (allServicesMatch) { + logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState); + return true; + } + + // Not all of the processors are in the desired state. Pause for a bit and poll again. + continuePolling = pause.pause(); + } + + return false; + } + + + /** + * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State. + * + * @param groupId the ID of the Process Group to poll + * @param serviceIds the ID of all Controller Services whose state should be equal to the given desired state + * @param desiredState the desired state for all services with the ID's given + * @param pause the Pause that can be used to wait between polling + * @param user the user that is retrieving the controller services + * @return <code>true</code> if successful, <code>false</code> if unable to wait for services to reach the desired state + */ + private boolean waitForLocalControllerServiceStatus(final String groupId, final Set<String> serviceIds, final ControllerServiceState desiredState, final Pause pause, final NiFiUser user) { + boolean continuePolling = true; + while (continuePolling) { + final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true, user); + + final String desiredStateName = desiredState.name(); + final boolean allServicesMatch = serviceEntities.stream() + .map(entity -> entity.getComponent()) + .filter(service -> serviceIds.contains(service.getId())) + .map(service -> service.getState()) + .allMatch(state -> desiredStateName.equals(state)); + + if (allServicesMatch) { + logger.debug("All {} controller services of interest now have the desired state of {}", serviceIds.size(), desiredState); + return true; + } + + // Not all of the processors are in the desired state. Pause for a bit and poll again. + continuePolling = pause.pause(); + } + + return false; + } + + private VariableRegistryUpdateRequest createVariableRegistryUpdateRequest(final String groupId) { + final VariableRegistryUpdateRequest updateRequest = new VariableRegistryUpdateRequest(UUID.randomUUID().toString(), groupId); + + // before adding to the request map, purge any old requests. Must do this by creating a List of ID's + // and then removing those ID's one-at-a-time in order to avoid ConcurrentModificationException. + final Date oneMinuteAgo = new Date(System.currentTimeMillis() - VARIABLE_REGISTRY_UPDATE_REQUEST_EXPIRATION); + final List<String> completedRequestIds = varRegistryUpdateRequests.entrySet().stream() + .filter(entry -> entry.getValue().isComplete()) + .filter(entry -> entry.getValue().getLastUpdated().before(oneMinuteAgo)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + + completedRequestIds.stream().forEach(id -> varRegistryUpdateRequests.remove(id)); + + final int requestCount = varRegistryUpdateRequests.size(); + if (requestCount > MAX_VARIABLE_REGISTRY_UPDATE_REQUESTS) { + throw new IllegalStateException("There are already " + requestCount + " update requests for variable registries. " + + "Cannot issue any more requests until the older ones are deleted or expire"); + } + + this.varRegistryUpdateRequests.put(updateRequest.getRequestId(), updateRequest); + return updateRequest; + } + + private Response updateVariableRegistryLocal(final String groupId, final List<AffectedComponentDTO> affectedProcessors, final List<AffectedComponentDTO> affectedServices, + final VariableRegistryEntity requestEntity) { + + final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream() + .map(component -> component.getComponentId()) + .collect(Collectors.toSet()); + Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds); + + final Set<String> affectedServiceIds = affectedServices == null ? Collections.emptySet() : affectedServices.stream() + .map(component -> component.getComponentId()) + .collect(Collectors.toSet()); + Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds); + + // update the variable registry + final VariableRegistryUpdateRequest updateRequest = createVariableRegistryUpdateRequest(groupId); + updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); + final Pause pause = createPause(updateRequest); + + final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final Runnable updateTask = new Runnable() { + @Override + public void run() { + try { + // Stop processors + performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStopProcessorsStep(), "Stopping Processors", + () -> stopProcessors(user, updateRequest, groupId, processorRevisionMap, pause)); + + // Update revision map because this will have modified the revisions of our components. + final Map<String, Revision> updatedProcessorRevisionMap = getRevisions(groupId, affectedProcessorIds); + + // Disable controller services + performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getDisableServicesStep(), "Disabling Controller Services", + () -> disableControllerServices(user, updateRequest, groupId, serviceRevisionMap, pause)); + + // Update revision map because this will have modified the revisions of our components. + final Map<String, Revision> updatedServiceRevisionMap = getRevisions(groupId, affectedServiceIds); + + // Apply the updates + performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getApplyUpdatesStep(), "Applying updates to Variable Registry", + () -> serviceFacade.updateVariableRegistry(user, requestRevision, requestEntity.getVariableRegistry())); + + // Re-enable the controller services + performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getEnableServicesStep(), "Re-enabling Controller Services", + () -> enableControllerServices(user, groupId, updatedServiceRevisionMap, pause)); + + // Restart processors + performUpdateVariableRegistryStep(groupId, updateRequest, updateRequest.getStartProcessorsStep(), "Restarting Processors", + () -> startProcessors(user, groupId, updatedProcessorRevisionMap, pause)); + + // Set complete + updateRequest.setComplete(true); + updateRequest.setLastUpdated(new Date()); + } catch (final Exception e) { + logger.error("Failed to update Variable Registry for Proces Group with ID " + groupId, e); + updateRequest.setFailureReason("An unexpected error has occurred: " + e); + } + } + }; + + // Submit the task to be run in the background + variableRegistryThreadPool.submit(updateTask); + + final VariableRegistryUpdateRequestEntity responseEntity = new VariableRegistryUpdateRequestEntity(); + responseEntity.setRequestDto(dtoFactory.createVariableRegistryUpdateRequestDto(updateRequest)); + responseEntity.getRequestDto().setUri(generateResourceUri("process-groups", groupId, "variable-registry", "update-requests", updateRequest.getRequestId())); + + final URI location = URI.create(responseEntity.getRequestDto().getUri()); + return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build(); + } + + private Map<String, Revision> getRevisions(final String groupId, final Set<String> componentIds) { + final Set<Revision> processorRevisions = serviceFacade.getRevisionsFromGroup(groupId, group -> componentIds); + return processorRevisions.stream().collect(Collectors.toMap(revision -> revision.getComponentId(), Function.identity())); + } + + private void performUpdateVariableRegistryStep(final String groupId, final VariableRegistryUpdateRequest request, final VariableRegistryUpdateStep step, + final String stepDescription, final Runnable action) { + + if (request.isComplete()) { + logger.info("In updating Variable Registry for Process Group with ID {}" + + ", skipping the following step because the request has completed already: {}", groupId, stepDescription); + return; + } + + try { + logger.info("In order to update Variable Registry for Process Group with ID {}, {}", groupId, stepDescription); + + action.run(); + step.setComplete(true); + } catch (final Exception e) { + request.setComplete(true); + logger.error("Failed to update variable registry for Process Group with ID {}", groupId, e); + + step.setComplete(true); + step.setFailureReason(e.getMessage()); + request.setFailureReason("Failed to update Variable Registry because failed while performing step: " + stepDescription); + } + + request.setLastUpdated(new Date()); + } + + private void stopProcessors(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId, + final Map<String, Revision> processorRevisions, final Pause pause) { + + if (processorRevisions.isEmpty()) { + return; + } + + serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.STOPPED, processorRevisions.keySet()); + serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.STOPPED, processorRevisions); + waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.STOPPED, pause); + } + + private void startProcessors(final NiFiUser user, final String processGroupId, final Map<String, Revision> processorRevisions, final Pause pause) { + if (processorRevisions.isEmpty()) { + return; + } + + serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, processorRevisions.keySet()); + serviceFacade.scheduleComponents(user, processGroupId, ScheduledState.RUNNING, processorRevisions); + waitForLocalProcessorStatus(processGroupId, processorRevisions.keySet(), ScheduledState.RUNNING, pause); + } + + private void disableControllerServices(final NiFiUser user, final VariableRegistryUpdateRequest updateRequest, final String processGroupId, + final Map<String, Revision> serviceRevisions, final Pause pause) { + + if (serviceRevisions.isEmpty()) { + return; + } + + serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.DISABLED, serviceRevisions.keySet()); + serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.DISABLED, serviceRevisions); + waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.DISABLED, pause, user); + } + + private void enableControllerServices(final NiFiUser user, final String processGroupId, final Map<String, Revision> serviceRevisions, final Pause pause) { + if (serviceRevisions.isEmpty()) { + return; + } + + serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions.keySet()); + serviceFacade.activateControllerServices(user, processGroupId, ControllerServiceState.ENABLED, serviceRevisions); + waitForLocalControllerServiceStatus(processGroupId, serviceRevisions.keySet(), ControllerServiceState.ENABLED, pause, user); + } + + + private void scheduleProcessors(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest, + final Pause pause, final Collection<AffectedComponentDTO> affectedProcessors, final ScheduledState desiredState, final VariableRegistryUpdateStep updateStep) { + final Set<String> affectedProcessorIds = affectedProcessors.stream() + .map(component -> component.getComponentId()) + .collect(Collectors.toSet()); + + final Map<String, Revision> processorRevisionMap = getRevisions(groupId, affectedProcessorIds); + final Map<String, RevisionDTO> processorRevisionDtoMap = processorRevisionMap.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue()))); + + final ScheduleComponentsEntity stopProcessorsEntity = new ScheduleComponentsEntity(); + stopProcessorsEntity.setComponents(processorRevisionDtoMap); + stopProcessorsEntity.setId(groupId); + stopProcessorsEntity.setState(desiredState.name()); + + URI stopProcessorUri; + try { + stopProcessorUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId, null, originalUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + final ClientResponse stopProcessorResponse = jerseyClient.resource(stopProcessorUri) + .header("Content-Type", "application/json") + .entity(stopProcessorsEntity) + .put(ClientResponse.class); + + final int stopProcessorStatus = stopProcessorResponse.getStatus(); + if (stopProcessorStatus != Status.OK.getStatusCode()) { + updateRequest.getStopProcessorsStep().setFailureReason("Failed while " + updateStep.getDescription()); + updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); + return; + } + + updateRequest.setLastUpdated(new Date()); + final boolean processorsTransitioned = waitForProcessorStatus(jerseyClient, originalUri, groupId, affectedProcessorIds, desiredState, pause); + if (processorsTransitioned) { + updateStep.setComplete(true); + } else { + updateStep.setFailureReason("Failed while " + updateStep.getDescription()); + updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); + return; + } + } + + private void activateControllerServices(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest, + final Pause pause, final Collection<AffectedComponentDTO> affectedServices, final ControllerServiceState desiredState, final VariableRegistryUpdateStep updateStep) { + + final Set<String> affectedServiceIds = affectedServices.stream() + .map(component -> component.getComponentId()) + .collect(Collectors.toSet()); + + final Map<String, Revision> serviceRevisionMap = getRevisions(groupId, affectedServiceIds); + final Map<String, RevisionDTO> serviceRevisionDtoMap = serviceRevisionMap.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> dtoFactory.createRevisionDTO(entry.getValue()))); + + final ActivateControllerServicesEntity disableServicesEntity = new ActivateControllerServicesEntity(); + disableServicesEntity.setComponents(serviceRevisionDtoMap); + disableServicesEntity.setId(groupId); + disableServicesEntity.setState(desiredState.name()); + + URI disableServicesUri; + try { + disableServicesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", null, originalUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + final ClientResponse disableServicesResponse = jerseyClient.resource(disableServicesUri) + .header("Content-Type", "application/json") + .entity(disableServicesEntity) + .put(ClientResponse.class); + + final int disableServicesStatus = disableServicesResponse.getStatus(); + if (disableServicesStatus != Status.OK.getStatusCode()) { + updateStep.setFailureReason("Failed while " + updateStep.getDescription()); + updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); + return; + } + + updateRequest.setLastUpdated(new Date()); + if (waitForControllerServiceStatus(jerseyClient, originalUri, groupId, affectedServiceIds, desiredState, pause)) { + updateStep.setComplete(true); + } else { + updateStep.setFailureReason("Failed while " + updateStep.getDescription()); + updateRequest.setFailureReason("Failed while " + updateStep.getDescription()); + return; + } + } + + + private void applyVariableRegistryUpdate(final String groupId, final URI originalUri, final Client jerseyClient, final VariableRegistryUpdateRequest updateRequest, + final VariableRegistryEntity updateEntity) { + + URI applyUpdatesUri; + try { + applyUpdatesUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), + originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/variable-registry", null, originalUri.getFragment()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + final ClientResponse applyUpdatesResponse = jerseyClient.resource(applyUpdatesUri) + .header("Content-Type", "application/json") + .entity(updateEntity) + .put(ClientResponse.class); + + final int applyUpdatesStatus = applyUpdatesResponse.getStatus(); + updateRequest.setLastUpdated(new Date()); + if (applyUpdatesStatus != Status.OK.getStatusCode()) { + updateRequest.getApplyUpdatesStep().setFailureReason("Failed to apply updates to the Variable Registry"); + updateRequest.setFailureReason("Failed to apply updates to the Variable Registry"); + return; + } + } + /** * Removes the specified process group reference. * @@ -2426,4 +3378,8 @@ public class ProcessGroupResource extends ApplicationResource { public void setAuthorizer(Authorizer authorizer) { this.authorizer = authorizer; } + + public void setDtoFactory(DtoFactory dtoFactory) { + this.dtoFactory = dtoFactory; + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 4332a0c..a445e49 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,6 +16,33 @@ */ package org.apache.nifi.web.api.dto; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -47,6 +74,7 @@ import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.User; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ComponentAuthorizable; +import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; @@ -110,6 +138,9 @@ import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; import org.apache.nifi.provenance.lineage.LineageEdge; import org.apache.nifi.provenance.lineage.LineageNode; import org.apache.nifi.provenance.lineage.ProvenanceEventLineageNode; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.registry.variable.VariableRegistryUpdateRequest; +import org.apache.nifi.registry.variable.VariableRegistryUpdateStep; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; @@ -161,35 +192,10 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.TenantEntity; +import org.apache.nifi.web.api.entity.VariableEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.revision.RevisionManager; -import javax.ws.rs.WebApplicationException; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TimeZone; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - public final class DtoFactory { @SuppressWarnings("rawtypes") @@ -1712,9 +1718,34 @@ public final class DtoFactory { * @return dto */ public PermissionsDTO createPermissionsDto(final Authorizable authorizable) { + return createPermissionsDto(authorizable, NiFiUserUtils.getNiFiUser()); + } + + /** + * Creates the PermissionsDTO based on the specified Authorizable for the given user + * + * @param authorizable authorizable + * @param user the NiFi User for which the Permissions are being created + * @return dto + */ + public PermissionsDTO createPermissionsDto(final Authorizable authorizable, final NiFiUser user) { final PermissionsDTO dto = new PermissionsDTO(); - dto.setCanRead(authorizable.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser())); - dto.setCanWrite(authorizable.isAuthorized(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser())); + dto.setCanRead(authorizable.isAuthorized(authorizer, RequestAction.READ, user)); + dto.setCanWrite(authorizable.isAuthorized(authorizer, RequestAction.WRITE, user)); + return dto; + } + + public AffectedComponentDTO createAffectedComponentDto(final ConfiguredComponent component) { + final AffectedComponentDTO dto = new AffectedComponentDTO(); + dto.setComponentId(component.getIdentifier()); + dto.setParentGroupId(component.getProcessGroupIdentifier()); + + if (component instanceof ProcessorNode) { + dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); + } else if (component instanceof ControllerServiceNode) { + dto.setComponentType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE); + } + return dto; } @@ -1998,6 +2029,10 @@ public final class DtoFactory { dto.setComments(group.getComments()); dto.setName(group.getName()); + final Map<String, String> variables = group.getVariableRegistry().getVariableMap().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue())); + group.setVariables(variables); + final ProcessGroup parentGroup = group.getParent(); if (parentGroup != null) { dto.setParentGroupId(parentGroup.getIdentifier()); @@ -2079,6 +2114,128 @@ public final class DtoFactory { return deprecationNotice == null ? null : deprecationNotice.reason(); } + + public VariableRegistryDTO createVariableRegistryDto(final ProcessGroup processGroup) { + final ComponentVariableRegistry variableRegistry = processGroup.getVariableRegistry(); + + final List<String> variableNames = variableRegistry.getVariableMap().keySet().stream() + .map(descriptor -> descriptor.getName()) + .collect(Collectors.toList()); + + final Set<VariableEntity> variableEntities = new LinkedHashSet<>(); + + for (final String variableName : variableNames) { + final VariableDTO variableDto = new VariableDTO(); + variableDto.setName(variableName); + variableDto.setValue(variableRegistry.getVariableValue(variableName)); + variableDto.setProcessGroupId(processGroup.getIdentifier()); + + final Set<ConfiguredComponent> affectedComponents = processGroup.getComponentsAffectedByVariable(variableName); + final Set<AffectedComponentDTO> affectedComponentDtos = affectedComponents.stream() + .map(component -> createAffectedComponentDto(component)) + .collect(Collectors.toSet()); + + boolean canWrite = true; + for (final ConfiguredComponent component : affectedComponents) { + final PermissionsDTO permissions = createPermissionsDto(component); + if (!permissions.getCanRead() || !permissions.getCanWrite()) { + canWrite = false; + break; + } + } + + variableDto.setAffectedComponents(affectedComponentDtos); + + final VariableEntity variableEntity = new VariableEntity(); + variableEntity.setVariable(variableDto); + variableEntity.setCanWrite(canWrite); + + variableEntities.add(variableEntity); + } + + final VariableRegistryDTO registryDto = new VariableRegistryDTO(); + registryDto.setProcessGroupId(processGroup.getIdentifier()); + registryDto.setVariables(variableEntities); + + return registryDto; + } + + public VariableRegistryUpdateRequestDTO createVariableRegistryUpdateRequestDto(final VariableRegistryUpdateRequest request) { + final VariableRegistryUpdateRequestDTO dto = new VariableRegistryUpdateRequestDTO(); + dto.setComplete(request.isComplete()); + dto.setFailureReason(request.getFailureReason()); + dto.setLastUpdated(request.getLastUpdated()); + dto.setProcessGroupId(request.getProcessGroupId()); + dto.setRequestId(request.getRequestId()); + dto.setSubmissionTime(request.getSubmissionTime()); + + final List<VariableRegistryUpdateStepDTO> updateSteps = new ArrayList<>(); + updateSteps.add(createVariableRegistryUpdateStepDto(request.getIdentifyRelevantComponentsStep())); + updateSteps.add(createVariableRegistryUpdateStepDto(request.getStopProcessorsStep())); + updateSteps.add(createVariableRegistryUpdateStepDto(request.getDisableServicesStep())); + updateSteps.add(createVariableRegistryUpdateStepDto(request.getApplyUpdatesStep())); + updateSteps.add(createVariableRegistryUpdateStepDto(request.getEnableServicesStep())); + updateSteps.add(createVariableRegistryUpdateStepDto(request.getStartProcessorsStep())); + dto.setUpdateSteps(updateSteps); + + return dto; + } + + public VariableRegistryUpdateStepDTO createVariableRegistryUpdateStepDto(final VariableRegistryUpdateStep step) { + final VariableRegistryUpdateStepDTO dto = new VariableRegistryUpdateStepDTO(); + dto.setComplete(step.isComplete()); + dto.setDescription(step.getDescription()); + dto.setFailureReason(step.getFailureReason()); + return dto; + } + + + public VariableRegistryDTO populateAffectedComponents(final VariableRegistryDTO variableRegistry, final ProcessGroup group) { + if (!group.getIdentifier().equals(variableRegistry.getProcessGroupId())) { + throw new IllegalArgumentException("Variable Registry does not have the same Group ID as the given Process Group"); + } + + final Set<VariableEntity> variableEntities = new LinkedHashSet<>(); + + for (final VariableEntity inputEntity : variableRegistry.getVariables()) { + final VariableEntity entity = new VariableEntity(); + + final VariableDTO inputDto = inputEntity.getVariable(); + final VariableDTO variableDto = new VariableDTO(); + variableDto.setName(inputDto.getName()); + variableDto.setValue(inputDto.getValue()); + variableDto.setProcessGroupId(group.getIdentifier()); + + final Set<ConfiguredComponent> affectedComponents = group.getComponentsAffectedByVariable(variableDto.getName()); + final Set<AffectedComponentDTO> affectedComponentDtos = affectedComponents.stream() + .map(component -> createAffectedComponentDto(component)) + .collect(Collectors.toSet()); + + boolean canWrite = true; + for (final ConfiguredComponent component : affectedComponents) { + final PermissionsDTO permissions = createPermissionsDto(component); + if (!permissions.getCanRead() || !permissions.getCanWrite()) { + canWrite = false; + break; + } + } + + variableDto.setAffectedComponents(affectedComponentDtos); + + entity.setCanWrite(canWrite); + entity.setVariable(inputDto); + + variableEntities.add(entity); + } + + final VariableRegistryDTO registryDto = new VariableRegistryDTO(); + registryDto.setProcessGroupId(group.getIdentifier()); + registryDto.setVariables(variableEntities); + + return registryDto; + } + + /** * Gets the capability description from the specified class. */ @@ -3016,6 +3173,10 @@ public final class DtoFactory { copy.setActiveRemotePortCount(original.getActiveRemotePortCount()); copy.setInactiveRemotePortCount(original.getInactiveRemotePortCount()); + if (original.getVariables() != null) { + copy.setVariables(new HashMap<>(original.getVariables())); + } + return copy; } http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java index 41249ba..a7f370a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java @@ -65,6 +65,7 @@ import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.entity.TenantEntity; import org.apache.nifi.web.api.entity.UserEntity; import org.apache.nifi.web.api.entity.UserGroupEntity; +import org.apache.nifi.web.api.entity.VariableRegistryEntity; import java.util.Date; import java.util.List; @@ -431,6 +432,18 @@ public final class EntityFactory { return entity; } + public VariableRegistryEntity createVariableRegistryEntity(final VariableRegistryDTO dto, final RevisionDTO revision, final PermissionsDTO permissions) { + final VariableRegistryEntity entity = new VariableRegistryEntity(); + entity.setProcessGroupRevision(revision); + if (dto != null) { + if (permissions != null && permissions.getCanRead()) { + entity.setVariableRegistry(dto); + } + } + + return entity; + } + public ControllerServiceEntity createControllerServiceEntity(final ControllerServiceDTO dto, final RevisionDTO revision, final PermissionsDTO permissions, final List<BulletinEntity> bulletins) { final ControllerServiceEntity entity = new ControllerServiceEntity(); entity.setRevision(revision); http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java index fa92425..0409e95 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java @@ -63,7 +63,7 @@ public interface ControllerServiceDAO { * * @return The controller services */ - Set<ControllerServiceNode> getControllerServices(String groupId); + Set<ControllerServiceNode> getControllerServices(String groupId, boolean includeAncestorGroups, boolean includeDescendantGroups); /** * Updates the specified controller service. http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java index 155b36e..d7ca806 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java @@ -16,11 +16,14 @@ */ package org.apache.nifi.web.dao; +import java.util.Set; +import java.util.concurrent.Future; + import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.web.api.dto.ProcessGroupDTO; - -import java.util.Set; +import org.apache.nifi.web.api.dto.VariableRegistryDTO; public interface ProcessGroupDAO { @@ -65,12 +68,32 @@ public interface ProcessGroupDAO { void verifyScheduleComponents(String groupId, ScheduledState state, Set<String> componentIds); /** + * Verifies the specified controller services can be modified + * + * @param groupId the ID of the process group + * @param state the desired state + * @param serviceIds the ID's of the controller services + */ + void verifyActivateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds); + + /** * Schedules the components in the specified process group. * * @param groupId id * @param state scheduled state + * + * @return a Future that can be used to wait for the services to finish starting or stopping */ - void scheduleComponents(String groupId, ScheduledState state, Set<String> componentIds); + Future<Void> scheduleComponents(String groupId, ScheduledState state, Set<String> componentIds); + + /** + * Enables or disables the controller services in the specified process group + * + * @param groupId the id of the group + * @param state the desired state + * @param serviceIds the ID's of the services to enable or disable + */ + Future<Void> activateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds); /** * Updates the specified process group. @@ -81,6 +104,21 @@ public interface ProcessGroupDAO { ProcessGroup updateProcessGroup(ProcessGroupDTO processGroup); /** + * Updates the specified variable registry + * + * @param variableRegistry the Variable Registry + * @return the Process Group that was updated + */ + ProcessGroup updateVariableRegistry(VariableRegistryDTO variableRegistry); + + /** + * Verifies that the specified updates to a current Process Group can be applied at this time + * + * @param processGroup the DTO That describes the changes to occur + */ + void verifyUpdate(ProcessGroupDTO processGroup); + + /** * Verifies the specified process group can be removed. * * @param groupId id http://git-wip-us.apache.org/repos/asf/nifi/blob/5cd8e93b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java index 36cf85b..0f9ec7a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java @@ -124,7 +124,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro } @Override - public Set<ControllerServiceNode> getControllerServices(final String groupId) { + public Set<ControllerServiceNode> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) { if (groupId == null) { return flowController.getRootControllerServices(); } else { @@ -134,7 +134,12 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro throw new ResourceNotFoundException("Could not find Process Group with ID " + groupId); } - return procGroup.getControllerServices(true); + final Set<ControllerServiceNode> serviceNodes = procGroup.getControllerServices(includeAncestorGroups); + if (includeDescendantGroups) { + serviceNodes.addAll(procGroup.findAllControllerServices()); + } + + return serviceNodes; } }
