Repository: nifi Updated Branches: refs/heads/master f7f001eb9 -> 7a8dbb8b1
NIFI-4733: - Resolving logic issue in two phase commit when updating variable registry. This closes #2370 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a8dbb8b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a8dbb8b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a8dbb8b Branch: refs/heads/master Commit: 7a8dbb8b1512d7bd7a26e1b996a7ef859ccfcd4e Parents: f7f001e Author: Matt Gilman <[email protected]> Authored: Wed Jan 3 11:00:22 2018 -0500 Committer: Matt Gilman <[email protected]> Committed: Thu Jan 4 13:03:11 2018 -0500 ---------------------------------------------------------------------- .../nifi/web/api/ProcessGroupResource.java | 87 ++++++++++++++------ 1 file changed, 62 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7a8dbb8b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 7ff7885..b866677 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -74,6 +74,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.CopySnippetRequestEntity; import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity; +import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.FunnelsEntity; @@ -552,41 +553,43 @@ public class ProcessGroupResource extends ApplicationResource { public Response updateVariableRegistry( @Context final HttpServletRequest httpServletRequest, @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, - @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { + @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) { - if (requestEntity == null || requestEntity.getVariableRegistry() == null) { + if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) { throw new IllegalArgumentException("Variable Registry details must be specified."); } - if (requestEntity.getProcessGroupRevision() == null) { + if (requestVariableRegistryEntity.getProcessGroupRevision() == null) { throw new IllegalArgumentException("Process Group Revision must be specified."); } // ensure the same id is being used - final VariableRegistryDTO registryDto = requestEntity.getVariableRegistry(); - if (!groupId.equals(registryDto.getProcessGroupId())) { + final VariableRegistryDTO requestRegistryDto = requestVariableRegistryEntity.getVariableRegistry(); + if (!groupId.equals(requestRegistryDto.getProcessGroupId())) { throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " - + "not equal the process group id of the requested resource (%s).", registryDto.getProcessGroupId(), groupId)); + + "not equal the process group id of the requested resource (%s).", requestRegistryDto.getProcessGroupId(), groupId)); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, requestEntity); + return replicate(HttpMethod.PUT, requestVariableRegistryEntity); } // handle expects request (usually from the cluster manager) - final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); + final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId); return withWriteLock( serviceFacade, - requestEntity, + requestVariableRegistryEntity, requestRevision, lookup -> { Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable(); authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, null, - (revision, processGroupEntity) -> { + (revision, variableRegistryEntity) -> { + final VariableRegistryDTO variableRegistry = variableRegistryEntity.getVariableRegistry(); + // update the process group - final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, registryDto); + final VariableRegistryEntity entity = serviceFacade.updateVariableRegistry(revision, variableRegistry); return generateOkResponse(entity).build(); }); } @@ -597,7 +600,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The id of the process group. - * @param requestEntity the Variable Registry Entity + * @param requestVariableRegistryEntity the Variable Registry Entity * @return A Variable Registry Entry. */ @POST @@ -620,13 +623,13 @@ public class ProcessGroupResource extends ApplicationResource { public Response submitUpdateVariableRegistryRequest( @Context final HttpServletRequest httpServletRequest, @ApiParam(value = "The process group id.", required = true) @PathParam("id") final String groupId, - @ApiParam(value = "The process group configuration details.", required = true) final VariableRegistryEntity requestEntity) { + @ApiParam(value = "The variable registry configuration details.", required = true) final VariableRegistryEntity requestVariableRegistryEntity) { - if (requestEntity == null || requestEntity.getVariableRegistry() == null) { + if (requestVariableRegistryEntity == null || requestVariableRegistryEntity.getVariableRegistry() == null) { throw new IllegalArgumentException("Variable Registry details must be specified."); } - if (requestEntity.getProcessGroupRevision() == null) { + if (requestVariableRegistryEntity.getProcessGroupRevision() == null) { throw new IllegalArgumentException("Process Group Revision must be specified."); } @@ -641,14 +644,14 @@ public class ProcessGroupResource extends ApplicationResource { // 6. Re-Enable all previously Active Processors that Depended on the Controller Services // Determine the affected components (and their associated revisions) - final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry()); + final VariableRegistryEntity computedEntity = serviceFacade.populateAffectedComponents(requestVariableRegistryEntity.getVariableRegistry()); final VariableRegistryDTO computedRegistryDto = computedEntity.getVariableRegistry(); if (computedRegistryDto == null) { throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId)); } - final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); - final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry()); + final Set<AffectedComponentEntity> allAffectedComponents = serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry()); + final Set<AffectedComponentDTO> activeAffectedComponents = serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry()); final Map<String, List<AffectedComponentDTO>> activeAffectedComponentsByType = activeAffectedComponents.stream() .collect(Collectors.groupingBy(comp -> comp.getReferenceType())); @@ -698,7 +701,7 @@ public class ProcessGroupResource extends ApplicationResource { final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(user)); SecurityContextHolder.getContext().setAuthentication(authentication); - updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestEntity); + updateVariableRegistryReplicated(groupId, originalUri, activeAffectedProcessors, activeAffectedServices, updateRequest, requestVariableRegistryEntity); } catch (final Exception e) { logger.error("Failed to update variable registry", e); @@ -721,15 +724,19 @@ public class ProcessGroupResource extends ApplicationResource { return Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build(); } + final UpdateVariableRegistryRequestWrapper requestWrapper = + new UpdateVariableRegistryRequestWrapper(allAffectedComponents, activeAffectedProcessors, activeAffectedServices, requestVariableRegistryEntity); - final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); + final Revision requestRevision = getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId); return withWriteLock( serviceFacade, - requestEntity, + requestWrapper, requestRevision, authorizeAccess, null, - (revision, varRegistryEntity) -> updateVariableRegistryLocal(groupId, allAffectedComponents, activeAffectedProcessors, activeAffectedServices, user, requestEntity) + (revision, wrapper) -> + updateVariableRegistryLocal(groupId, wrapper.getAllAffectedComponents(), wrapper.getActiveAffectedProcessors(), + wrapper.getActiveAffectedServices(), user, revision, wrapper.getVariableRegistryEntity()) ); } @@ -1088,7 +1095,7 @@ public class ProcessGroupResource extends ApplicationResource { } private Response updateVariableRegistryLocal(final String groupId, final Set<AffectedComponentEntity> affectedComponents, final List<AffectedComponentDTO> affectedProcessors, - final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final VariableRegistryEntity requestEntity) { + final List<AffectedComponentDTO> affectedServices, final NiFiUser user, final Revision requestRevision, final VariableRegistryEntity requestEntity) { final Set<String> affectedProcessorIds = affectedProcessors == null ? Collections.emptySet() : affectedProcessors.stream() .map(component -> component.getId()) @@ -1105,8 +1112,6 @@ public class ProcessGroupResource extends ApplicationResource { updateRequest.getIdentifyRelevantComponentsStep().setComplete(true); final Pause pause = createPause(updateRequest); - final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); - final Runnable updateTask = new Runnable() { @Override public void run() { @@ -3474,6 +3479,38 @@ public class ProcessGroupResource extends ApplicationResource { ); } + private static class UpdateVariableRegistryRequestWrapper extends Entity { + private final Set<AffectedComponentEntity> allAffectedComponents; + private final List<AffectedComponentDTO> activeAffectedProcessors; + private final List<AffectedComponentDTO> activeAffectedServices; + private final VariableRegistryEntity variableRegistryEntity; + + public UpdateVariableRegistryRequestWrapper(final Set<AffectedComponentEntity> allAffectedComponents, final List<AffectedComponentDTO> activeAffectedProcessors, + final List<AffectedComponentDTO> activeAffectedServices, VariableRegistryEntity variableRegistryEntity) { + + this.allAffectedComponents = allAffectedComponents; + this.activeAffectedProcessors = activeAffectedProcessors; + this.activeAffectedServices = activeAffectedServices; + this.variableRegistryEntity = variableRegistryEntity; + } + + public Set<AffectedComponentEntity> getAllAffectedComponents() { + return allAffectedComponents; + } + + public List<AffectedComponentDTO> getActiveAffectedProcessors() { + return activeAffectedProcessors; + } + + public List<AffectedComponentDTO> getActiveAffectedServices() { + return activeAffectedServices; + } + + public VariableRegistryEntity getVariableRegistryEntity() { + return variableRegistryEntity; + } + } + // setters public void setServiceFacade(NiFiServiceFacade serviceFacade) {
