http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 3ae7201..a957544 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -234,7 +234,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param id The id of the process group. - * @param processGroupEntity A processGroupEntity. + * @param requestProcessGroupEntity A processGroupEntity. * @return A processGroupEntity. */ @PUT @@ -267,41 +267,42 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The process group configuration details.", required = true - ) final ProcessGroupEntity processGroupEntity) { + ) final ProcessGroupEntity requestProcessGroupEntity) { - if (processGroupEntity == null || processGroupEntity.getComponent() == null) { + if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) { throw new IllegalArgumentException("Process group details must be specified."); } - if (processGroupEntity.getRevision() == null) { + if (requestProcessGroupEntity.getRevision() == null) { throw new IllegalArgumentException("Revision must be specified."); } // ensure the same id is being used - final ProcessGroupDTO requestProcessGroupDTO = processGroupEntity.getComponent(); + final ProcessGroupDTO requestProcessGroupDTO = requestProcessGroupEntity.getComponent(); if (!id.equals(requestProcessGroupDTO.getId())) { throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " + "not equal the process group id of the requested resource (%s).", requestProcessGroupDTO.getId(), id)); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, processGroupEntity); + return replicate(HttpMethod.PUT, requestProcessGroupEntity); } // handle expects request (usually from the cluster manager) - final Revision revision = getRevision(processGroupEntity, id); + final Revision requestRevision = getRevision(requestProcessGroupEntity, id); return withWriteLock( - serviceFacade, - revision, - lookup -> { - Authorizable authorizable = lookup.getProcessGroup(id).getAuthorizable(); - authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }, - null, - () -> { - // update the process group - final ProcessGroupEntity entity = serviceFacade.updateProcessGroup(revision, requestProcessGroupDTO); - populateRemainingProcessGroupEntityContent(entity); + serviceFacade, + requestProcessGroupEntity, + requestRevision, + lookup -> { + Authorizable authorizable = lookup.getProcessGroup(id).getAuthorizable(); + authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + (revision, processGroupEntity) -> { + // update the process group + final ProcessGroupEntity entity = serviceFacade.updateProcessGroup(revision, processGroupEntity.getComponent()); + populateRemainingProcessGroupEntityContent(entity); return clusterContext(generateOkResponse(entity)).build(); } @@ -361,33 +362,37 @@ public class ProcessGroupResource extends ApplicationResource { return replicate(HttpMethod.DELETE); } + final ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity(); + requestProcessGroupEntity.setId(id); + // handle expects request (usually from the cluster manager) - final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); return withWriteLock( - serviceFacade, - revision, - lookup -> { - final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final ProcessGroupAuthorizable processGroupAuthorizable = lookup.getProcessGroup(id); - - // ensure write to the process group - final Authorizable processGroup = processGroupAuthorizable.getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, user); - - // ensure write to all encapsulated components - processGroupAuthorizable.getEncapsulatedAuthorizables().forEach(encaupsulatedAuthorizable -> { - encaupsulatedAuthorizable.authorize(authorizer, RequestAction.WRITE, user); - }); - }, - () -> serviceFacade.verifyDeleteProcessGroup(id), - () -> { - // delete the process group - final ProcessGroupEntity entity = serviceFacade.deleteProcessGroup(revision, id); - - // create the response - return clusterContext(generateOkResponse(entity)).build(); - } - ); + serviceFacade, + requestProcessGroupEntity, + requestRevision, + lookup -> { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final ProcessGroupAuthorizable processGroupAuthorizable = lookup.getProcessGroup(id); + + // ensure write to the process group + final Authorizable processGroup = processGroupAuthorizable.getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, user); + + // ensure write to all encapsulated components + processGroupAuthorizable.getEncapsulatedAuthorizables().forEach(encaupsulatedAuthorizable -> { + encaupsulatedAuthorizable.authorize(authorizer, RequestAction.WRITE, user); + }); + }, + () -> serviceFacade.verifyDeleteProcessGroup(id), + (revision, processGroupEntity) -> { + // delete the process group + final ProcessGroupEntity entity = serviceFacade.deleteProcessGroup(revision, processGroupEntity.getId()); + + // create the response + return clusterContext(generateOkResponse(entity)).build(); + } + ); } /** @@ -395,7 +400,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The group id - * @param processGroupEntity A processGroupEntity + * @param requestProcessGroupEntity A processGroupEntity * @return A processGroupEntity */ @POST @@ -428,54 +433,52 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The process group configuration details.", required = true - ) final ProcessGroupEntity processGroupEntity) { + ) final ProcessGroupEntity requestProcessGroupEntity) { - if (processGroupEntity == null || processGroupEntity.getComponent() == null) { + if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) { throw new IllegalArgumentException("Process group details must be specified."); } - if (processGroupEntity.getRevision() == null || (processGroupEntity.getRevision().getVersion() == null || processGroupEntity.getRevision().getVersion() != 0)) { + if (requestProcessGroupEntity.getRevision() == null || (requestProcessGroupEntity.getRevision().getVersion() == null || requestProcessGroupEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Process group."); } - if (processGroupEntity.getComponent().getId() != null) { + if (requestProcessGroupEntity.getComponent().getId() != null) { throw new IllegalArgumentException("Process group ID cannot be specified."); } - if (processGroupEntity.getComponent().getParentGroupId() != null && !groupId.equals(processGroupEntity.getComponent().getParentGroupId())) { + if (requestProcessGroupEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestProcessGroupEntity.getComponent().getParentGroupId())) { throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", - processGroupEntity.getComponent().getParentGroupId(), groupId)); + requestProcessGroupEntity.getComponent().getParentGroupId(), groupId)); } - processGroupEntity.getComponent().setParentGroupId(groupId); + requestProcessGroupEntity.getComponent().setParentGroupId(groupId); if (isReplicateRequest()) { - return replicate(HttpMethod.POST, processGroupEntity); - } - - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); + return replicate(HttpMethod.POST, requestProcessGroupEntity); } - // set the processor id as appropriate - processGroupEntity.getComponent().setId(generateUuid()); - - // create the process group contents - final Revision revision = getRevision(processGroupEntity, processGroupEntity.getComponent().getId()); - final ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupEntity.getComponent()); - populateRemainingProcessGroupEntityContent(entity); - - // generate a 201 created response - String uri = entity.getUri(); - return clusterContext(generateCreatedResponse(URI.create(uri), entity)).build(); + return withWriteLock( + serviceFacade, + requestProcessGroupEntity, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + processGroupGroupEntity -> { + // set the processor id as appropriate + processGroupGroupEntity.getComponent().setId(generateUuid()); + + // create the process group contents + final Revision revision = getRevision(processGroupGroupEntity, processGroupGroupEntity.getComponent().getId()); + final ProcessGroupEntity entity = serviceFacade.createProcessGroup(revision, groupId, processGroupGroupEntity.getComponent()); + populateRemainingProcessGroupEntityContent(entity); + + // generate a 201 created response + String uri = entity.getUri(); + return clusterContext(generateCreatedResponse(URI.create(uri), entity)).build(); + } + ); } /** @@ -610,36 +613,34 @@ public class ProcessGroupResource extends ApplicationResource { return replicate(HttpMethod.POST, processorEntity); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - - final ProcessorConfigDTO config = requestProcessor.getConfig(); - if (config != null && config.getProperties() != null) { - final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getProcessorByType(requestProcessor.getType()); - AuthorizeControllerServiceReference.authorizeControllerServiceReferences(config.getProperties(), authorizable, authorizer, lookup); + return withWriteLock( + serviceFacade, + processorEntity, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + + final ProcessorConfigDTO config = requestProcessor.getConfig(); + if (config != null && config.getProperties() != null) { + final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getProcessorByType(requestProcessor.getType()); + AuthorizeControllerServiceReference.authorizeControllerServiceReferences(config.getProperties(), authorizable, authorizer, lookup); + } + }, + null, + procEntity -> { + // set the processor id as appropriate + requestProcessor.setId(generateUuid()); + + // create the new processor + final Revision revision = getRevision(processorEntity, requestProcessor.getId()); + final ProcessorEntity entity = serviceFacade.createProcessor(revision, groupId, requestProcessor); + processorResource.populateRemainingProcessorEntityContent(entity); + + // generate a 201 created response + String uri = entity.getUri(); + return clusterContext(generateCreatedResponse(URI.create(uri), entity)).build(); } - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } - - // set the processor id as appropriate - requestProcessor.setId(generateUuid()); - - // create the new processor - final Revision revision = getRevision(processorEntity, requestProcessor.getId()); - final ProcessorEntity entity = serviceFacade.createProcessor(revision, groupId, requestProcessor); - processorResource.populateRemainingProcessorEntityContent(entity); - - // generate a 201 created response - String uri = entity.getUri(); - return clusterContext(generateCreatedResponse(URI.create(uri), entity)).build(); + ); } /** @@ -705,7 +706,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The group id - * @param portEntity A inputPortEntity. + * @param requestPortEntity A inputPortEntity. * @return A inputPortEntity. */ @POST @@ -738,53 +739,51 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The input port configuration details.", required = true - ) final PortEntity portEntity) { + ) final PortEntity requestPortEntity) { - if (portEntity == null || portEntity.getComponent() == null) { + if (requestPortEntity == null || requestPortEntity.getComponent() == null) { throw new IllegalArgumentException("Port details must be specified."); } - if (portEntity.getRevision() == null || (portEntity.getRevision().getVersion() == null || portEntity.getRevision().getVersion() != 0)) { + if (requestPortEntity.getRevision() == null || (requestPortEntity.getRevision().getVersion() == null || requestPortEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Input port."); } - if (portEntity.getComponent().getId() != null) { + if (requestPortEntity.getComponent().getId() != null) { throw new IllegalArgumentException("Input port ID cannot be specified."); } - if (portEntity.getComponent().getParentGroupId() != null && !groupId.equals(portEntity.getComponent().getParentGroupId())) { + if (requestPortEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestPortEntity.getComponent().getParentGroupId())) { throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", - portEntity.getComponent().getParentGroupId(), groupId)); + requestPortEntity.getComponent().getParentGroupId(), groupId)); } - portEntity.getComponent().setParentGroupId(groupId); + requestPortEntity.getComponent().setParentGroupId(groupId); if (isReplicateRequest()) { - return replicate(HttpMethod.POST, portEntity); - } - - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); + return replicate(HttpMethod.POST, requestPortEntity); } - // set the processor id as appropriate - portEntity.getComponent().setId(generateUuid()); - - // create the input port and generate the json - final Revision revision = getRevision(portEntity, portEntity.getComponent().getId()); - final PortEntity entity = serviceFacade.createInputPort(revision, groupId, portEntity.getComponent()); - inputPortResource.populateRemainingInputPortEntityContent(entity); - - // build the response - return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + return withWriteLock( + serviceFacade, + requestPortEntity, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + portEntity -> { + // set the processor id as appropriate + portEntity.getComponent().setId(generateUuid()); + + // create the input port and generate the json + final Revision revision = getRevision(portEntity, portEntity.getComponent().getId()); + final PortEntity entity = serviceFacade.createInputPort(revision, groupId, portEntity.getComponent()); + inputPortResource.populateRemainingInputPortEntityContent(entity); + + // build the response + return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + } + ); } /** @@ -848,7 +847,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The group id - * @param portEntity A outputPortEntity. + * @param requestPortEntity A outputPortEntity. * @return A outputPortEntity. */ @POST @@ -881,53 +880,51 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The output port configuration.", required = true - ) final PortEntity portEntity) { + ) final PortEntity requestPortEntity) { - if (portEntity == null || portEntity.getComponent() == null) { + if (requestPortEntity == null || requestPortEntity.getComponent() == null) { throw new IllegalArgumentException("Port details must be specified."); } - if (portEntity.getRevision() == null || (portEntity.getRevision().getVersion() == null || portEntity.getRevision().getVersion() != 0)) { + if (requestPortEntity.getRevision() == null || (requestPortEntity.getRevision().getVersion() == null || requestPortEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Output port."); } - if (portEntity.getComponent().getId() != null) { + if (requestPortEntity.getComponent().getId() != null) { throw new IllegalArgumentException("Output port ID cannot be specified."); } - if (portEntity.getComponent().getParentGroupId() != null && !groupId.equals(portEntity.getComponent().getParentGroupId())) { + if (requestPortEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestPortEntity.getComponent().getParentGroupId())) { throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", - portEntity.getComponent().getParentGroupId(), groupId)); + requestPortEntity.getComponent().getParentGroupId(), groupId)); } - portEntity.getComponent().setParentGroupId(groupId); + requestPortEntity.getComponent().setParentGroupId(groupId); if (isReplicateRequest()) { - return replicate(HttpMethod.POST, portEntity); + return replicate(HttpMethod.POST, requestPortEntity); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } - - // set the processor id as appropriate - portEntity.getComponent().setId(generateUuid()); - - // create the output port and generate the json - final Revision revision = getRevision(portEntity, portEntity.getComponent().getId()); - final PortEntity entity = serviceFacade.createOutputPort(revision, groupId, portEntity.getComponent()); - outputPortResource.populateRemainingOutputPortEntityContent(entity); - - // build the response - return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + return withWriteLock( + serviceFacade, + requestPortEntity, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + portEntity -> { + // set the processor id as appropriate + portEntity.getComponent().setId(generateUuid()); + + // create the output port and generate the json + final Revision revision = getRevision(portEntity, portEntity.getComponent().getId()); + final PortEntity entity = serviceFacade.createOutputPort(revision, groupId, portEntity.getComponent()); + outputPortResource.populateRemainingOutputPortEntityContent(entity); + + // build the response + return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + } + ); } /** @@ -992,7 +989,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The group id - * @param funnelEntity A funnelEntity. + * @param requestFunnelEntity A funnelEntity. * @return A funnelEntity. */ @POST @@ -1025,53 +1022,51 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The funnel configuration details.", required = true - ) final FunnelEntity funnelEntity) { + ) final FunnelEntity requestFunnelEntity) { - if (funnelEntity == null || funnelEntity.getComponent() == null) { + if (requestFunnelEntity == null || requestFunnelEntity.getComponent() == null) { throw new IllegalArgumentException("Funnel details must be specified."); } - if (funnelEntity.getRevision() == null || (funnelEntity.getRevision().getVersion() == null || funnelEntity.getRevision().getVersion() != 0)) { + if (requestFunnelEntity.getRevision() == null || (requestFunnelEntity.getRevision().getVersion() == null || requestFunnelEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Funnel."); } - if (funnelEntity.getComponent().getId() != null) { + if (requestFunnelEntity.getComponent().getId() != null) { throw new IllegalArgumentException("Funnel ID cannot be specified."); } - if (funnelEntity.getComponent().getParentGroupId() != null && !groupId.equals(funnelEntity.getComponent().getParentGroupId())) { + if (requestFunnelEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestFunnelEntity.getComponent().getParentGroupId())) { throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", - funnelEntity.getComponent().getParentGroupId(), groupId)); + requestFunnelEntity.getComponent().getParentGroupId(), groupId)); } - funnelEntity.getComponent().setParentGroupId(groupId); + requestFunnelEntity.getComponent().setParentGroupId(groupId); if (isReplicateRequest()) { - return replicate(HttpMethod.POST, funnelEntity); - } - - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); + return replicate(HttpMethod.POST, requestFunnelEntity); } - if (validationPhase) { - return generateContinueResponse().build(); - } - - // set the processor id as appropriate - funnelEntity.getComponent().setId(generateUuid()); - - // create the funnel and generate the json - final Revision revision = getRevision(funnelEntity, funnelEntity.getComponent().getId()); - final FunnelEntity entity = serviceFacade.createFunnel(revision, groupId, funnelEntity.getComponent()); - funnelResource.populateRemainingFunnelEntityContent(entity); - // build the response - return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + return withWriteLock( + serviceFacade, + requestFunnelEntity, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + funnelEntity -> { + // set the processor id as appropriate + funnelEntity.getComponent().setId(generateUuid()); + + // create the funnel and generate the json + final Revision revision = getRevision(funnelEntity, funnelEntity.getComponent().getId()); + final FunnelEntity entity = serviceFacade.createFunnel(revision, groupId, funnelEntity.getComponent()); + funnelResource.populateRemainingFunnelEntityContent(entity); + + // build the response + return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + } + ); } /** @@ -1136,7 +1131,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The group id - * @param labelEntity A labelEntity. + * @param requestLabelEntity A labelEntity. * @return A labelEntity. */ @POST @@ -1169,53 +1164,51 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The label configuration details.", required = true - ) final LabelEntity labelEntity) { + ) final LabelEntity requestLabelEntity) { - if (labelEntity == null || labelEntity.getComponent() == null) { + if (requestLabelEntity == null || requestLabelEntity.getComponent() == null) { throw new IllegalArgumentException("Label details must be specified."); } - if (labelEntity.getRevision() == null || (labelEntity.getRevision().getVersion() == null || labelEntity.getRevision().getVersion() != 0)) { + if (requestLabelEntity.getRevision() == null || (requestLabelEntity.getRevision().getVersion() == null || requestLabelEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Label."); } - if (labelEntity.getComponent().getId() != null) { + if (requestLabelEntity.getComponent().getId() != null) { throw new IllegalArgumentException("Label ID cannot be specified."); } - if (labelEntity.getComponent().getParentGroupId() != null && !groupId.equals(labelEntity.getComponent().getParentGroupId())) { + if (requestLabelEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestLabelEntity.getComponent().getParentGroupId())) { throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", - labelEntity.getComponent().getParentGroupId(), groupId)); + requestLabelEntity.getComponent().getParentGroupId(), groupId)); } - labelEntity.getComponent().setParentGroupId(groupId); + requestLabelEntity.getComponent().setParentGroupId(groupId); if (isReplicateRequest()) { - return replicate(HttpMethod.POST, labelEntity); - } - - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); + return replicate(HttpMethod.POST, requestLabelEntity); } - // set the processor id as appropriate - labelEntity.getComponent().setId(generateUuid()); - - // create the label and generate the json - final Revision revision = getRevision(labelEntity, labelEntity.getComponent().getId()); - final LabelEntity entity = serviceFacade.createLabel(revision, groupId, labelEntity.getComponent()); - labelResource.populateRemainingLabelEntityContent(entity); - - // build the response - return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + return withWriteLock( + serviceFacade, + requestLabelEntity, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + labelEntity -> { + // set the processor id as appropriate + labelEntity.getComponent().setId(generateUuid()); + + // create the label and generate the json + final Revision revision = getRevision(labelEntity, labelEntity.getComponent().getId()); + final LabelEntity entity = serviceFacade.createLabel(revision, groupId, labelEntity.getComponent()); + labelResource.populateRemainingLabelEntityContent(entity); + + // build the response + return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + } + ); } /** @@ -1280,7 +1273,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The group id - * @param remoteProcessGroupEntity A remoteProcessGroupEntity. + * @param requestRemoteProcessGroupEntity A remoteProcessGroupEntity. * @return A remoteProcessGroupEntity. */ @POST @@ -1313,84 +1306,84 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The remote process group configuration details.", required = true - ) final RemoteProcessGroupEntity remoteProcessGroupEntity) { + ) final RemoteProcessGroupEntity requestRemoteProcessGroupEntity) { - if (remoteProcessGroupEntity == null || remoteProcessGroupEntity.getComponent() == null) { + if (requestRemoteProcessGroupEntity == null || requestRemoteProcessGroupEntity.getComponent() == null) { throw new IllegalArgumentException("Remote process group details must be specified."); } - if (remoteProcessGroupEntity.getRevision() == null || (remoteProcessGroupEntity.getRevision().getVersion() == null || remoteProcessGroupEntity.getRevision().getVersion() != 0)) { + if (requestRemoteProcessGroupEntity.getRevision() == null || (requestRemoteProcessGroupEntity.getRevision().getVersion() == null || requestRemoteProcessGroupEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Remote process group."); } - final RemoteProcessGroupDTO requestProcessGroupDTO = remoteProcessGroupEntity.getComponent(); + final RemoteProcessGroupDTO requestRemoteProcessGroupDTO = requestRemoteProcessGroupEntity.getComponent(); - if (requestProcessGroupDTO.getId() != null) { + if (requestRemoteProcessGroupDTO.getId() != null) { throw new IllegalArgumentException("Remote process group ID cannot be specified."); } - if (requestProcessGroupDTO.getTargetUri() == null) { + if (requestRemoteProcessGroupDTO.getTargetUri() == null) { throw new IllegalArgumentException("The URI of the process group must be specified."); } - if (requestProcessGroupDTO.getParentGroupId() != null && !groupId.equals(requestProcessGroupDTO.getParentGroupId())) { + if (requestRemoteProcessGroupDTO.getParentGroupId() != null && !groupId.equals(requestRemoteProcessGroupDTO.getParentGroupId())) { throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", - requestProcessGroupDTO.getParentGroupId(), groupId)); + requestRemoteProcessGroupDTO.getParentGroupId(), groupId)); } - requestProcessGroupDTO.setParentGroupId(groupId); + requestRemoteProcessGroupDTO.setParentGroupId(groupId); if (isReplicateRequest()) { - return replicate(HttpMethod.POST, remoteProcessGroupEntity); - } - - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); + return replicate(HttpMethod.POST, requestRemoteProcessGroupEntity); } - // set the processor id as appropriate - requestProcessGroupDTO.setId(generateUuid()); - - // parse the uri - final URI uri; - try { - uri = URI.create(requestProcessGroupDTO.getTargetUri()); - } catch (final IllegalArgumentException e) { - throw new IllegalArgumentException("The specified remote process group URL is malformed: " + requestProcessGroupDTO.getTargetUri()); - } - - // validate each part of the uri - if (uri.getScheme() == null || uri.getHost() == null) { - throw new IllegalArgumentException("The specified remote process group URL is malformed: " + requestProcessGroupDTO.getTargetUri()); - } - - if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) { - throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + requestProcessGroupDTO.getTargetUri()); - } - - // normalize the uri to the other controller - String controllerUri = uri.toString(); - if (controllerUri.endsWith("/")) { - controllerUri = StringUtils.substringBeforeLast(controllerUri, "/"); - } - - // since the uri is valid, use the normalized version - requestProcessGroupDTO.setTargetUri(controllerUri); - - // create the remote process group - final Revision revision = getRevision(remoteProcessGroupEntity, requestProcessGroupDTO.getId()); - final RemoteProcessGroupEntity entity = serviceFacade.createRemoteProcessGroup(revision, groupId, requestProcessGroupDTO); - remoteProcessGroupResource.populateRemainingRemoteProcessGroupEntityContent(entity); - - return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + return withWriteLock( + serviceFacade, + requestRemoteProcessGroupEntity, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + remoteProcessGroupEntity -> { + final RemoteProcessGroupDTO remoteProcessGroupDTO = remoteProcessGroupEntity.getComponent(); + + // set the processor id as appropriate + remoteProcessGroupDTO.setId(generateUuid()); + + // parse the uri + final URI uri; + try { + uri = URI.create(remoteProcessGroupDTO.getTargetUri()); + } catch (final IllegalArgumentException e) { + throw new IllegalArgumentException("The specified remote process group URL is malformed: " + remoteProcessGroupDTO.getTargetUri()); + } + + // validate each part of the uri + if (uri.getScheme() == null || uri.getHost() == null) { + throw new IllegalArgumentException("The specified remote process group URL is malformed: " + remoteProcessGroupDTO.getTargetUri()); + } + + if (!(uri.getScheme().equalsIgnoreCase("http") || uri.getScheme().equalsIgnoreCase("https"))) { + throw new IllegalArgumentException("The specified remote process group URL is invalid because it is not http or https: " + remoteProcessGroupDTO.getTargetUri()); + } + + // normalize the uri to the other controller + String controllerUri = uri.toString(); + if (controllerUri.endsWith("/")) { + controllerUri = StringUtils.substringBeforeLast(controllerUri, "/"); + } + + // since the uri is valid, use the normalized version + remoteProcessGroupDTO.setTargetUri(controllerUri); + + // create the remote process group + final Revision revision = getRevision(remoteProcessGroupEntity, remoteProcessGroupDTO.getId()); + final RemoteProcessGroupEntity entity = serviceFacade.createRemoteProcessGroup(revision, groupId, remoteProcessGroupDTO); + remoteProcessGroupResource.populateRemainingRemoteProcessGroupEntityContent(entity); + + return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + } + ); } /** @@ -1462,7 +1455,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The group id - * @param connectionEntity A connectionEntity. + * @param requestConnectionEntity A connectionEntity. * @return A connectionEntity. */ @POST @@ -1497,82 +1490,81 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The connection configuration details.", required = true - ) final ConnectionEntity connectionEntity) { + ) final ConnectionEntity requestConnectionEntity) { - if (connectionEntity == null || connectionEntity.getComponent() == null) { + if (requestConnectionEntity == null || requestConnectionEntity.getComponent() == null) { throw new IllegalArgumentException("Connection details must be specified."); } - if (connectionEntity.getRevision() == null || (connectionEntity.getRevision().getVersion() == null || connectionEntity.getRevision().getVersion() != 0)) { + if (requestConnectionEntity.getRevision() == null || (requestConnectionEntity.getRevision().getVersion() == null || requestConnectionEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Connection."); } - if (connectionEntity.getComponent().getId() != null) { + if (requestConnectionEntity.getComponent().getId() != null) { throw new IllegalArgumentException("Connection ID cannot be specified."); } - if (connectionEntity.getComponent().getParentGroupId() != null && !groupId.equals(connectionEntity.getComponent().getParentGroupId())) { + if (requestConnectionEntity.getComponent().getParentGroupId() != null && !groupId.equals(requestConnectionEntity.getComponent().getParentGroupId())) { throw new IllegalArgumentException(String.format("If specified, the parent process group id %s must be the same as specified in the URI %s", - connectionEntity.getComponent().getParentGroupId(), groupId)); + requestConnectionEntity.getComponent().getParentGroupId(), groupId)); } - connectionEntity.getComponent().setParentGroupId(groupId); + requestConnectionEntity.getComponent().setParentGroupId(groupId); // get the connection - final ConnectionDTO connection = connectionEntity.getComponent(); + final ConnectionDTO requestConnection = requestConnectionEntity.getComponent(); - if (connection.getSource() == null || connection.getSource().getId() == null) { + if (requestConnection.getSource() == null || requestConnection.getSource().getId() == null) { throw new IllegalArgumentException("The source of the connection must be specified."); } - if (connection.getDestination() == null || connection.getDestination().getId() == null) { + if (requestConnection.getDestination() == null || requestConnection.getDestination().getId() == null) { throw new IllegalArgumentException("The destination of the connection must be specified."); } if (isReplicateRequest()) { - return replicate(HttpMethod.POST, connectionEntity); + return replicate(HttpMethod.POST, requestConnectionEntity); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - // ensure write access to the group - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - - // ensure write access to the source - final Authorizable source = lookup.getConnectable(connection.getSource().getId()); - if (source == null) { - throw new ResourceNotFoundException("Cannot find source component with ID [" + connection.getSource().getId() + "]"); - } - source.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - - // ensure write access to the destination - final Authorizable destination = lookup.getConnectable(connection.getDestination().getId()); - if (destination == null) { - throw new ResourceNotFoundException("Cannot find destination component with ID [" + connection.getDestination().getId() + "]"); + return withWriteLock( + serviceFacade, + requestConnectionEntity, + lookup -> { + // ensure write access to the group + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + + // ensure write access to the source + final Authorizable source = lookup.getConnectable(requestConnection.getSource().getId()); + if (source == null) { + throw new ResourceNotFoundException("Cannot find source component with ID [" + requestConnection.getSource().getId() + "]"); + } + source.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + + // ensure write access to the destination + final Authorizable destination = lookup.getConnectable(requestConnection.getDestination().getId()); + if (destination == null) { + throw new ResourceNotFoundException("Cannot find destination component with ID [" + requestConnection.getDestination().getId() + "]"); + } + + destination.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + () -> serviceFacade.verifyCreateConnection(groupId, requestConnection), + connectionEntity -> { + final ConnectionDTO connection = connectionEntity.getComponent(); + + // set the processor id as appropriate + connection.setId(generateUuid()); + + // create the new relationship target + final Revision revision = getRevision(connectionEntity, connection.getId()); + final ConnectionEntity entity = serviceFacade.createConnection(revision, groupId, connection); + connectionResource.populateRemainingConnectionEntityContent(entity); + + // extract the href and build the response + String uri = entity.getUri(); + return clusterContext(generateCreatedResponse(URI.create(uri), entity)).build(); } - - destination.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - serviceFacade.verifyCreateConnection(groupId, connection); - return generateContinueResponse().build(); - } - - // set the processor id as appropriate - connection.setId(generateUuid()); - - // create the new relationship target - final Revision revision = getRevision(connectionEntity, connection.getId()); - final ConnectionEntity entity = serviceFacade.createConnection(revision, groupId, connection); - connectionResource.populateRemainingConnectionEntityContent(entity); - - // extract the href and build the response - String uri = entity.getUri(); - return clusterContext(generateCreatedResponse(URI.create(uri), entity)).build(); + ); } /** @@ -1640,7 +1632,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The group id - * @param copySnippetEntity The copy snippet request + * @param requestCopySnippetEntity The copy snippet request * @return A flowSnippetEntity. */ @POST @@ -1674,50 +1666,48 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The copy snippet request.", required = true - ) CopySnippetRequestEntity copySnippetEntity) { + ) CopySnippetRequestEntity requestCopySnippetEntity) { // ensure the position has been specified - if (copySnippetEntity == null || copySnippetEntity.getOriginX() == null || copySnippetEntity.getOriginY() == null) { + if (requestCopySnippetEntity == null || requestCopySnippetEntity.getOriginX() == null || requestCopySnippetEntity.getOriginY() == null) { throw new IllegalArgumentException("The origin position (x, y) must be specified"); } - if (copySnippetEntity.getSnippetId() == null) { + if (requestCopySnippetEntity.getSnippetId() == null) { throw new IllegalArgumentException("The snippet id must be specified."); } if (isReplicateRequest()) { - return replicate(HttpMethod.POST, copySnippetEntity); - } - - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - authorizeSnippetUsage(lookup, groupId, copySnippetEntity.getSnippetId()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } - - // copy the specified snippet - final FlowEntity flowEntity = serviceFacade.copySnippet( - groupId, copySnippetEntity.getSnippetId(), copySnippetEntity.getOriginX(), copySnippetEntity.getOriginY(), getIdGenerationSeed().orElse(null)); - - // get the snippet - final FlowDTO flow = flowEntity.getFlow(); - - // prune response as necessary - for (ProcessGroupEntity childGroupEntity : flow.getProcessGroups()) { - childGroupEntity.getComponent().setContents(null); + return replicate(HttpMethod.POST, requestCopySnippetEntity); } - // create the response entity - populateRemainingSnippetContent(flow); - - // generate the response - return clusterContext(generateCreatedResponse(getAbsolutePath(), flowEntity)).build(); + return withWriteLock( + serviceFacade, + requestCopySnippetEntity, + lookup -> { + authorizeSnippetUsage(lookup, groupId, requestCopySnippetEntity.getSnippetId()); + }, + null, + copySnippetRequestEntity -> { + // copy the specified snippet + final FlowEntity flowEntity = serviceFacade.copySnippet( + groupId, copySnippetRequestEntity.getSnippetId(), copySnippetRequestEntity.getOriginX(), copySnippetRequestEntity.getOriginY(), getIdGenerationSeed().orElse(null)); + + // get the snippet + final FlowDTO flow = flowEntity.getFlow(); + + // prune response as necessary + for (ProcessGroupEntity childGroupEntity : flow.getProcessGroups()) { + childGroupEntity.getComponent().setContents(null); + } + + // create the response entity + populateRemainingSnippetContent(flow); + + // generate the response + return clusterContext(generateCreatedResponse(getAbsolutePath(), flowEntity)).build(); + } + ); } // ----------------- @@ -1732,7 +1722,7 @@ public class ProcessGroupResource extends ApplicationResource { * * @param httpServletRequest request * @param groupId The group id - * @param instantiateTemplateRequestEntity The instantiate template request + * @param requestInstantiateTemplateRequestEntity The instantiate template request * @return A flowEntity. */ @POST @@ -1766,49 +1756,47 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The instantiate template request.", required = true - ) InstantiateTemplateRequestEntity instantiateTemplateRequestEntity) { + ) InstantiateTemplateRequestEntity requestInstantiateTemplateRequestEntity) { // ensure the position has been specified - if (instantiateTemplateRequestEntity == null || instantiateTemplateRequestEntity.getOriginX() == null || instantiateTemplateRequestEntity.getOriginY() == null) { + if (requestInstantiateTemplateRequestEntity == null || requestInstantiateTemplateRequestEntity.getOriginX() == null || requestInstantiateTemplateRequestEntity.getOriginY() == null) { throw new IllegalArgumentException("The origin position (x, y) must be specified"); } if (isReplicateRequest()) { - return replicate(HttpMethod.POST, instantiateTemplateRequestEntity); + return replicate(HttpMethod.POST, requestInstantiateTemplateRequestEntity); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - - final Authorizable template = lookup.getTemplate(instantiateTemplateRequestEntity.getTemplateId()); - template.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } - - // create the template and generate the json - final FlowEntity entity = serviceFacade.createTemplateInstance(groupId, instantiateTemplateRequestEntity.getOriginX(), - instantiateTemplateRequestEntity.getOriginY(), instantiateTemplateRequestEntity.getTemplateId(), getIdGenerationSeed().orElse(null)); - - final FlowDTO flowSnippet = entity.getFlow(); - - // prune response as necessary - for (ProcessGroupEntity childGroupEntity : flowSnippet.getProcessGroups()) { - childGroupEntity.getComponent().setContents(null); - } - - // create the response entity - populateRemainingSnippetContent(flowSnippet); - - // generate the response - return clusterContext(generateCreatedResponse(getAbsolutePath(), entity)).build(); + return withWriteLock( + serviceFacade, + requestInstantiateTemplateRequestEntity, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + + final Authorizable template = lookup.getTemplate(requestInstantiateTemplateRequestEntity.getTemplateId()); + template.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }, + null, + instantiateTemplateRequestEntity -> { + // create the template and generate the json + final FlowEntity entity = serviceFacade.createTemplateInstance(groupId, instantiateTemplateRequestEntity.getOriginX(), + instantiateTemplateRequestEntity.getOriginY(), instantiateTemplateRequestEntity.getTemplateId(), getIdGenerationSeed().orElse(null)); + + final FlowDTO flowSnippet = entity.getFlow(); + + // prune response as necessary + for (ProcessGroupEntity childGroupEntity : flowSnippet.getProcessGroups()) { + childGroupEntity.getComponent().setContents(null); + } + + // create the response entity + populateRemainingSnippetContent(flowSnippet); + + // generate the response + return clusterContext(generateCreatedResponse(getAbsolutePath(), entity)).build(); + } + ); } // --------- @@ -1828,7 +1816,7 @@ public class ProcessGroupResource extends ApplicationResource { * Creates a new template based off of the specified template. * * @param httpServletRequest request - * @param createTemplateRequestEntity request to create the template + * @param requestCreateTemplateRequestEntity request to create the template * @return A templateEntity */ @POST @@ -1862,40 +1850,37 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The create template request.", required = true - ) final CreateTemplateRequestEntity createTemplateRequestEntity) { + ) final CreateTemplateRequestEntity requestCreateTemplateRequestEntity) { - if (createTemplateRequestEntity.getSnippetId() == null) { + if (requestCreateTemplateRequestEntity.getSnippetId() == null) { throw new IllegalArgumentException("The snippet identifier must be specified."); } if (isReplicateRequest()) { - return replicate(HttpMethod.POST, createTemplateRequestEntity); + return replicate(HttpMethod.POST, requestCreateTemplateRequestEntity); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - authorizeSnippetUsage(lookup, groupId, createTemplateRequestEntity.getSnippetId()); - }); - } - if (validationPhase) { - serviceFacade.verifyCanAddTemplate(groupId, createTemplateRequestEntity.getName()); - return generateContinueResponse().build(); - } - - // create the template and generate the json - final TemplateDTO template = serviceFacade.createTemplate(createTemplateRequestEntity.getName(), createTemplateRequestEntity.getDescription(), - createTemplateRequestEntity.getSnippetId(), groupId, getIdGenerationSeed()); - templateResource.populateRemainingTemplateContent(template); - - // build the response entity - final TemplateEntity entity = new TemplateEntity(); - entity.setTemplate(template); - - // build the response - return clusterContext(generateCreatedResponse(URI.create(template.getUri()), entity)).build(); + return withWriteLock( + serviceFacade, + requestCreateTemplateRequestEntity, + lookup -> { + authorizeSnippetUsage(lookup, groupId, requestCreateTemplateRequestEntity.getSnippetId()); + }, + () -> serviceFacade.verifyCanAddTemplate(groupId, requestCreateTemplateRequestEntity.getName()), + createTemplateRequestEntity -> { + // create the template and generate the json + final TemplateDTO template = serviceFacade.createTemplate(createTemplateRequestEntity.getName(), createTemplateRequestEntity.getDescription(), + createTemplateRequestEntity.getSnippetId(), groupId, getIdGenerationSeed()); + templateResource.populateRemainingTemplateContent(template); + + // build the response entity + final TemplateEntity entity = new TemplateEntity(); + entity.setTemplate(template); + + // build the response + return clusterContext(generateCreatedResponse(URI.create(template.getUri()), entity)).build(); + } + ); } /** @@ -1991,7 +1976,7 @@ public class ProcessGroupResource extends ApplicationResource { * Imports the specified template. * * @param httpServletRequest request - * @param templateEntity A templateEntity. + * @param requestTemplateEntity A templateEntity. * @return A templateEntity. */ @POST @@ -2020,53 +2005,52 @@ public class ProcessGroupResource extends ApplicationResource { required = true ) @PathParam("id") final String groupId, - final TemplateEntity templateEntity) { + final TemplateEntity requestTemplateEntity) { // verify the template was specified - if (templateEntity == null || templateEntity.getTemplate() == null || templateEntity.getTemplate().getSnippet() == null) { + if (requestTemplateEntity == null || requestTemplateEntity.getTemplate() == null || requestTemplateEntity.getTemplate().getSnippet() == null) { throw new IllegalArgumentException("Template details must be specified."); } if (isReplicateRequest()) { - return replicate(HttpMethod.POST, templateEntity); + return replicate(HttpMethod.POST, requestTemplateEntity); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - serviceFacade.verifyCanAddTemplate(groupId, templateEntity.getTemplate().getName()); - serviceFacade.verifyComponentTypes(templateEntity.getTemplate().getSnippet()); - return generateContinueResponse().build(); - } - - try { - // import the template - final TemplateDTO template = serviceFacade.importTemplate(templateEntity.getTemplate(), groupId, getIdGenerationSeed()); - templateResource.populateRemainingTemplateContent(template); - - // build the response entity - TemplateEntity entity = new TemplateEntity(); - entity.setTemplate(template); - - // build the response - return clusterContext(generateCreatedResponse(URI.create(template.getUri()), entity)).build(); - } catch (IllegalArgumentException | IllegalStateException e) { - logger.info("Unable to import template: " + e); - String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"%s\"/>", Response.Status.BAD_REQUEST.getStatusCode(), e.getMessage()); - return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build(); - } catch (Exception e) { - logger.warn("An error occurred while importing a template.", e); - String responseXml - = String.format("<errorResponse status=\"%s\" statusText=\"Unable to import the specified template: %s\"/>", Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()); - return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build(); - } + return withWriteLock( + serviceFacade, + requestTemplateEntity, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + () -> { + serviceFacade.verifyCanAddTemplate(groupId, requestTemplateEntity.getTemplate().getName()); + serviceFacade.verifyComponentTypes(requestTemplateEntity.getTemplate().getSnippet()); + }, + templateEntity -> { + try { + // import the template + final TemplateDTO template = serviceFacade.importTemplate(templateEntity.getTemplate(), groupId, getIdGenerationSeed()); + templateResource.populateRemainingTemplateContent(template); + + // build the response entity + TemplateEntity entity = new TemplateEntity(); + entity.setTemplate(template); + + // build the response + return clusterContext(generateCreatedResponse(URI.create(template.getUri()), entity)).build(); + } catch (IllegalArgumentException | IllegalStateException e) { + logger.info("Unable to import template: " + e); + String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"%s\"/>", Response.Status.BAD_REQUEST.getStatusCode(), e.getMessage()); + return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build(); + } catch (Exception e) { + logger.warn("An error occurred while importing a template.", e); + String responseXml = String.format("<errorResponse status=\"%s\" statusText=\"Unable to import the specified template: %s\"/>", + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()); + return Response.status(Response.Status.OK).entity(responseXml).type("application/xml").build(); + } + } + ); } // ------------------- @@ -2077,7 +2061,7 @@ public class ProcessGroupResource extends ApplicationResource { * Creates a new Controller Service. * * @param httpServletRequest request - * @param controllerServiceEntity A controllerServiceEntity. + * @param requestControllerServiceEntity A controllerServiceEntity. * @return A controllerServiceEntity. */ @POST @@ -2110,17 +2094,17 @@ public class ProcessGroupResource extends ApplicationResource { @ApiParam( value = "The controller service configuration details.", required = true - ) final ControllerServiceEntity controllerServiceEntity) { + ) final ControllerServiceEntity requestControllerServiceEntity) { - if (controllerServiceEntity == null || controllerServiceEntity.getComponent() == null) { + if (requestControllerServiceEntity == null || requestControllerServiceEntity.getComponent() == null) { throw new IllegalArgumentException("Controller service details must be specified."); } - if (controllerServiceEntity.getRevision() == null || (controllerServiceEntity.getRevision().getVersion() == null || controllerServiceEntity.getRevision().getVersion() != 0)) { + if (requestControllerServiceEntity.getRevision() == null || (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Controller service."); } - final ControllerServiceDTO requestControllerService = controllerServiceEntity.getComponent(); + final ControllerServiceDTO requestControllerService = requestControllerServiceEntity.getComponent(); if (requestControllerService.getId() != null) { throw new IllegalArgumentException("Controller service ID cannot be specified."); } @@ -2136,37 +2120,37 @@ public class ProcessGroupResource extends ApplicationResource { requestControllerService.setParentGroupId(groupId); if (isReplicateRequest()) { - return replicate(HttpMethod.POST, controllerServiceEntity); + return replicate(HttpMethod.POST, requestControllerServiceEntity); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); - processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - - if (requestControllerService.getProperties() != null) { - final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getControllerServiceByType(requestControllerService.getType()); - AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerService.getProperties(), authorizable, authorizer, lookup); + return withWriteLock( + serviceFacade, + requestControllerServiceEntity, + lookup -> { + final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + + if (requestControllerService.getProperties() != null) { + final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getControllerServiceByType(requestControllerService.getType()); + AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerService.getProperties(), authorizable, authorizer, lookup); + } + }, + null, + controllerServiceEntity -> { + final ControllerServiceDTO controllerService = controllerServiceEntity.getComponent(); + + // set the processor id as appropriate + controllerService.setId(generateUuid()); + + // create the controller service and generate the json + final Revision revision = getRevision(controllerServiceEntity, controllerService.getId()); + final ControllerServiceEntity entity = serviceFacade.createControllerService(revision, groupId, controllerService); + controllerServiceResource.populateRemainingControllerServiceEntityContent(entity); + + // build the response + return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); } - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } - - // set the processor id as appropriate - requestControllerService.setId(generateUuid()); - - // create the controller service and generate the json - final Revision revision = getRevision(controllerServiceEntity, requestControllerService.getId()); - final ControllerServiceEntity entity = serviceFacade.createControllerService(revision, groupId, requestControllerService); - controllerServiceResource.populateRemainingControllerServiceEntityContent(entity); - - // build the response - return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + ); } // setters
http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java index 246ba70..cc16d26 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java @@ -354,27 +354,28 @@ public class ProcessorResource extends ApplicationResource { return replicate(HttpMethod.POST); } - final boolean isValidationPhase = isValidationPhase(httpServletRequest); - if (isValidationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable processor = lookup.getProcessor(id).getAuthorizable(); - processor.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (isValidationPhase) { - serviceFacade.verifyCanClearProcessorState(id); - return generateContinueResponse().build(); - } + final ProcessorEntity requestProcessorEntity = new ProcessorEntity(); + requestProcessorEntity.setId(id); - // get the component state - serviceFacade.clearProcessorState(id); + return withWriteLock( + serviceFacade, + requestProcessorEntity, + lookup -> { + final Authorizable processor = lookup.getProcessor(id).getAuthorizable(); + processor.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + () -> serviceFacade.verifyCanClearProcessorState(id), + (processorEntity) -> { + // get the component state + serviceFacade.clearProcessorState(processorEntity.getId()); - // generate the response entity - final ComponentStateEntity entity = new ComponentStateEntity(); + // generate the response entity + final ComponentStateEntity entity = new ComponentStateEntity(); - // generate the response - return clusterContext(generateOkResponse(entity)).build(); + // generate the response + return clusterContext(generateOkResponse(entity)).build(); + } + ); } /** @@ -382,7 +383,7 @@ public class ProcessorResource extends ApplicationResource { * * @param httpServletRequest request * @param id The id of the processor to update. - * @param processorEntity A processorEntity. + * @param requestProcessorEntity A processorEntity. * @return A processorEntity. * @throws InterruptedException if interrupted */ @@ -417,32 +418,33 @@ public class ProcessorResource extends ApplicationResource { @ApiParam( value = "The processor configuration details.", required = true - ) final ProcessorEntity processorEntity) throws InterruptedException { + ) final ProcessorEntity requestProcessorEntity) throws InterruptedException { - if (processorEntity == null || processorEntity.getComponent() == null) { + if (requestProcessorEntity == null || requestProcessorEntity.getComponent() == null) { throw new IllegalArgumentException("Processor details must be specified."); } - if (processorEntity.getRevision() == null) { + if (requestProcessorEntity.getRevision() == null) { throw new IllegalArgumentException("Revision must be specified."); } // ensure the same id is being used - final ProcessorDTO requestProcessorDTO = processorEntity.getComponent(); + final ProcessorDTO requestProcessorDTO = requestProcessorEntity.getComponent(); if (!id.equals(requestProcessorDTO.getId())) { throw new IllegalArgumentException(String.format("The processor id (%s) in the request body does " + "not equal the processor id of the requested resource (%s).", requestProcessorDTO.getId(), id)); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, processorEntity); + return replicate(HttpMethod.PUT, requestProcessorEntity); } // handle expects request (usually from the cluster manager) - final Revision revision = getRevision(processorEntity, id); + final Revision requestRevision = getRevision(requestProcessorEntity, id); return withWriteLock( serviceFacade, - revision, + requestProcessorEntity, + requestRevision, lookup -> { final NiFiUser user = NiFiUserUtils.getNiFiUser(); @@ -455,9 +457,11 @@ public class ProcessorResource extends ApplicationResource { } }, () -> serviceFacade.verifyUpdateProcessor(requestProcessorDTO), - () -> { + (revision, processorEntity) -> { + final ProcessorDTO processorDTO = processorEntity.getComponent(); + // update the processor - final ProcessorEntity entity = serviceFacade.updateProcessor(revision, requestProcessorDTO); + final ProcessorEntity entity = serviceFacade.updateProcessor(revision, processorDTO); populateRemainingProcessorEntityContent(entity); return clusterContext(generateOkResponse(entity)).build(); @@ -517,18 +521,22 @@ public class ProcessorResource extends ApplicationResource { return replicate(HttpMethod.DELETE); } - final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final ProcessorEntity requestProcessorEntity = new ProcessorEntity(); + requestProcessorEntity.setId(id); + + final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); return withWriteLock( serviceFacade, - revision, + requestProcessorEntity, + requestRevision, lookup -> { final Authorizable processor = lookup.getProcessor(id).getAuthorizable(); processor.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, () -> serviceFacade.verifyDeleteProcessor(id), - () -> { + (revision, processorEntity) -> { // delete the processor - final ProcessorEntity entity = serviceFacade.deleteProcessor(revision, id); + final ProcessorEntity entity = serviceFacade.deleteProcessor(revision, processorEntity.getId()); // generate the response return clusterContext(generateOkResponse(entity)).build();
