Repository: nifi
Updated Branches:
  refs/heads/master f7f001eb9 -> 7a8dbb8b1


NIFI-4733:
- Resolving logic issue in two phase commit when updating variable registry. 
This closes #2370


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a8dbb8b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a8dbb8b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a8dbb8b

Branch: refs/heads/master
Commit: 7a8dbb8b1512d7bd7a26e1b996a7ef859ccfcd4e
Parents: f7f001e
Author: Matt Gilman <[email protected]>
Authored: Wed Jan 3 11:00:22 2018 -0500
Committer: Matt Gilman <[email protected]>
Committed: Thu Jan 4 13:03:11 2018 -0500

----------------------------------------------------------------------
 .../nifi/web/api/ProcessGroupResource.java      | 87 ++++++++++++++------
 1 file changed, 62 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7a8dbb8b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index 7ff7885..b866677 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -74,6 +74,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.CopySnippetRequestEntity;
 import org.apache.nifi.web.api.entity.CreateTemplateRequestEntity;
+import org.apache.nifi.web.api.entity.Entity;
 import org.apache.nifi.web.api.entity.FlowEntity;
 import org.apache.nifi.web.api.entity.FunnelEntity;
 import org.apache.nifi.web.api.entity.FunnelsEntity;
@@ -552,41 +553,43 @@ public class ProcessGroupResource extends 
ApplicationResource {
     public Response updateVariableRegistry(
         @Context final HttpServletRequest httpServletRequest,
         @ApiParam(value = "The process group id.", required = true) 
@PathParam("id") final String groupId,
-        @ApiParam(value = "The process group configuration details.", required 
= true) final VariableRegistryEntity requestEntity) {
+        @ApiParam(value = "The variable registry configuration details.", 
required = true) final VariableRegistryEntity requestVariableRegistryEntity) {
 
-        if (requestEntity == null || requestEntity.getVariableRegistry() == 
null) {
+        if (requestVariableRegistryEntity == null || 
requestVariableRegistryEntity.getVariableRegistry() == null) {
             throw new IllegalArgumentException("Variable Registry details must 
be specified.");
         }
 
-        if (requestEntity.getProcessGroupRevision() == null) {
+        if (requestVariableRegistryEntity.getProcessGroupRevision() == null) {
             throw new IllegalArgumentException("Process Group Revision must be 
specified.");
         }
 
         // ensure the same id is being used
-        final VariableRegistryDTO registryDto = 
requestEntity.getVariableRegistry();
-        if (!groupId.equals(registryDto.getProcessGroupId())) {
+        final VariableRegistryDTO requestRegistryDto = 
requestVariableRegistryEntity.getVariableRegistry();
+        if (!groupId.equals(requestRegistryDto.getProcessGroupId())) {
             throw new IllegalArgumentException(String.format("The process 
group id (%s) in the request body does "
-                + "not equal the process group id of the requested resource 
(%s).", registryDto.getProcessGroupId(), groupId));
+                + "not equal the process group id of the requested resource 
(%s).", requestRegistryDto.getProcessGroupId(), groupId));
         }
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.PUT, requestEntity);
+            return replicate(HttpMethod.PUT, requestVariableRegistryEntity);
         }
 
         // handle expects request (usually from the cluster manager)
-        final Revision requestRevision = 
getRevision(requestEntity.getProcessGroupRevision(), groupId);
+        final Revision requestRevision = 
getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId);
         return withWriteLock(
             serviceFacade,
-            requestEntity,
+            requestVariableRegistryEntity,
             requestRevision,
             lookup -> {
                 Authorizable authorizable = 
lookup.getProcessGroup(groupId).getAuthorizable();
                 authorizable.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
             },
             null,
-            (revision, processGroupEntity) -> {
+            (revision, variableRegistryEntity) -> {
+                final VariableRegistryDTO variableRegistry = 
variableRegistryEntity.getVariableRegistry();
+
                 // update the process group
-                final VariableRegistryEntity entity = 
serviceFacade.updateVariableRegistry(revision, registryDto);
+                final VariableRegistryEntity entity = 
serviceFacade.updateVariableRegistry(revision, variableRegistry);
                 return generateOkResponse(entity).build();
             });
     }
@@ -597,7 +600,7 @@ public class ProcessGroupResource extends 
ApplicationResource {
      *
      * @param httpServletRequest request
      * @param groupId The id of the process group.
-     * @param requestEntity the Variable Registry Entity
+     * @param requestVariableRegistryEntity the Variable Registry Entity
      * @return A Variable Registry Entry.
      */
     @POST
@@ -620,13 +623,13 @@ public class ProcessGroupResource extends 
ApplicationResource {
     public Response submitUpdateVariableRegistryRequest(
         @Context final HttpServletRequest httpServletRequest,
         @ApiParam(value = "The process group id.", required = true) 
@PathParam("id") final String groupId,
-        @ApiParam(value = "The process group configuration details.", required 
= true) final VariableRegistryEntity requestEntity) {
+        @ApiParam(value = "The variable registry configuration details.", 
required = true) final VariableRegistryEntity requestVariableRegistryEntity) {
 
-        if (requestEntity == null || requestEntity.getVariableRegistry() == 
null) {
+        if (requestVariableRegistryEntity == null || 
requestVariableRegistryEntity.getVariableRegistry() == null) {
             throw new IllegalArgumentException("Variable Registry details must 
be specified.");
         }
 
-        if (requestEntity.getProcessGroupRevision() == null) {
+        if (requestVariableRegistryEntity.getProcessGroupRevision() == null) {
             throw new IllegalArgumentException("Process Group Revision must be 
specified.");
         }
 
@@ -641,14 +644,14 @@ public class ProcessGroupResource extends 
ApplicationResource {
         // 6. Re-Enable all previously Active Processors that Depended on the 
Controller Services
 
         // Determine the affected components (and their associated revisions)
-        final VariableRegistryEntity computedEntity = 
serviceFacade.populateAffectedComponents(requestEntity.getVariableRegistry());
+        final VariableRegistryEntity computedEntity = 
serviceFacade.populateAffectedComponents(requestVariableRegistryEntity.getVariableRegistry());
         final VariableRegistryDTO computedRegistryDto = 
computedEntity.getVariableRegistry();
         if (computedRegistryDto == null) {
             throw new ResourceNotFoundException(String.format("Unable to 
locate group with id '%s'.", groupId));
         }
 
-        final Set<AffectedComponentEntity> allAffectedComponents = 
serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
-        final Set<AffectedComponentDTO> activeAffectedComponents = 
serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestEntity.getVariableRegistry());
+        final Set<AffectedComponentEntity> allAffectedComponents = 
serviceFacade.getComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry());
+        final Set<AffectedComponentDTO> activeAffectedComponents = 
serviceFacade.getActiveComponentsAffectedByVariableRegistryUpdate(requestVariableRegistryEntity.getVariableRegistry());
 
         final Map<String, List<AffectedComponentDTO>> 
activeAffectedComponentsByType = activeAffectedComponents.stream()
             .collect(Collectors.groupingBy(comp -> comp.getReferenceType()));
@@ -698,7 +701,7 @@ public class ProcessGroupResource extends 
ApplicationResource {
                     final Authentication authentication = new 
NiFiAuthenticationToken(new NiFiUserDetails(user));
                     
SecurityContextHolder.getContext().setAuthentication(authentication);
 
-                    updateVariableRegistryReplicated(groupId, originalUri, 
activeAffectedProcessors, activeAffectedServices, updateRequest, requestEntity);
+                    updateVariableRegistryReplicated(groupId, originalUri, 
activeAffectedProcessors, activeAffectedServices, updateRequest, 
requestVariableRegistryEntity);
                 } catch (final Exception e) {
                     logger.error("Failed to update variable registry", e);
 
@@ -721,15 +724,19 @@ public class ProcessGroupResource extends 
ApplicationResource {
             return 
Response.status(Status.ACCEPTED).location(location).entity(responseEntity).build();
         }
 
+        final UpdateVariableRegistryRequestWrapper requestWrapper =
+                new 
UpdateVariableRegistryRequestWrapper(allAffectedComponents, 
activeAffectedProcessors, activeAffectedServices, 
requestVariableRegistryEntity);
 
-        final Revision requestRevision = 
getRevision(requestEntity.getProcessGroupRevision(), groupId);
+        final Revision requestRevision = 
getRevision(requestVariableRegistryEntity.getProcessGroupRevision(), groupId);
         return withWriteLock(
             serviceFacade,
-            requestEntity,
+            requestWrapper,
             requestRevision,
             authorizeAccess,
             null,
-            (revision, varRegistryEntity) -> 
updateVariableRegistryLocal(groupId, allAffectedComponents, 
activeAffectedProcessors, activeAffectedServices, user, requestEntity)
+            (revision, wrapper) ->
+                    updateVariableRegistryLocal(groupId, 
wrapper.getAllAffectedComponents(), wrapper.getActiveAffectedProcessors(),
+                        wrapper.getActiveAffectedServices(), user, revision, 
wrapper.getVariableRegistryEntity())
         );
     }
 
@@ -1088,7 +1095,7 @@ public class ProcessGroupResource extends 
ApplicationResource {
     }
 
     private Response updateVariableRegistryLocal(final String groupId, final 
Set<AffectedComponentEntity> affectedComponents, final 
List<AffectedComponentDTO> affectedProcessors,
-                                                 final 
List<AffectedComponentDTO> affectedServices, final NiFiUser user, final 
VariableRegistryEntity requestEntity) {
+                                                 final 
List<AffectedComponentDTO> affectedServices, final NiFiUser user, final 
Revision requestRevision, final VariableRegistryEntity requestEntity) {
 
         final Set<String> affectedProcessorIds = affectedProcessors == null ? 
Collections.emptySet() : affectedProcessors.stream()
             .map(component -> component.getId())
@@ -1105,8 +1112,6 @@ public class ProcessGroupResource extends 
ApplicationResource {
         updateRequest.getIdentifyRelevantComponentsStep().setComplete(true);
         final Pause pause = createPause(updateRequest);
 
-        final Revision requestRevision = 
getRevision(requestEntity.getProcessGroupRevision(), groupId);
-
         final Runnable updateTask = new Runnable() {
             @Override
             public void run() {
@@ -3474,6 +3479,38 @@ public class ProcessGroupResource extends 
ApplicationResource {
         );
     }
 
+    private static class UpdateVariableRegistryRequestWrapper extends Entity {
+        private final Set<AffectedComponentEntity> allAffectedComponents;
+        private final List<AffectedComponentDTO> activeAffectedProcessors;
+        private final List<AffectedComponentDTO> activeAffectedServices;
+        private final VariableRegistryEntity variableRegistryEntity;
+
+        public UpdateVariableRegistryRequestWrapper(final 
Set<AffectedComponentEntity> allAffectedComponents, final 
List<AffectedComponentDTO> activeAffectedProcessors,
+                                                    final 
List<AffectedComponentDTO> activeAffectedServices, VariableRegistryEntity 
variableRegistryEntity) {
+
+            this.allAffectedComponents = allAffectedComponents;
+            this.activeAffectedProcessors = activeAffectedProcessors;
+            this.activeAffectedServices = activeAffectedServices;
+            this.variableRegistryEntity = variableRegistryEntity;
+        }
+
+        public Set<AffectedComponentEntity> getAllAffectedComponents() {
+            return allAffectedComponents;
+        }
+
+        public List<AffectedComponentDTO> getActiveAffectedProcessors() {
+            return activeAffectedProcessors;
+        }
+
+        public List<AffectedComponentDTO> getActiveAffectedServices() {
+            return activeAffectedServices;
+        }
+
+        public VariableRegistryEntity getVariableRegistryEntity() {
+            return variableRegistryEntity;
+        }
+    }
+
     // setters
 
     public void setServiceFacade(NiFiServiceFacade serviceFacade) {

Reply via email to