http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java index 5180cb6..5e82674 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/CountersResource.java @@ -40,6 +40,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CountersDTO; +import org.apache.nifi.web.api.entity.ComponentEntity; import org.apache.nifi.web.api.entity.CounterEntity; import org.apache.nifi.web.api.entity.CountersEntity; @@ -233,27 +234,28 @@ public class CountersResource extends ApplicationResource { return replicate(HttpMethod.PUT); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - authorizeCounters(RequestAction.WRITE); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } + final ComponentEntity requestComponentEntity = new ComponentEntity(); + requestComponentEntity.setId(id); - // reset the specified counter - final CounterDTO counter = serviceFacade.updateCounter(id); + return withWriteLock( + serviceFacade, + requestComponentEntity, + lookup -> { + authorizeCounters(RequestAction.WRITE); + }, + null, + (componentEntity) -> { + // reset the specified counter + final CounterDTO counter = serviceFacade.updateCounter(requestComponentEntity.getId()); - // create the response entity - final CounterEntity entity = new CounterEntity(); - entity.setCounter(counter); + // create the response entity + final CounterEntity entity = new CounterEntity(); + entity.setCounter(counter); - // generate the response - return clusterContext(generateOkResponse(entity)).build(); + // generate the response + return clusterContext(generateOkResponse(entity)).build(); + } + ); } // setters
http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java index 69b2567..fbf4c55 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java @@ -37,7 +37,9 @@ import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.FlowFileDTO; import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.dto.ListingRequestDTO; +import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; +import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.FlowFileEntity; import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.request.ClientIdParameter; @@ -321,35 +323,35 @@ public class FlowFileQueueResource extends ApplicationResource { return replicate(HttpMethod.POST); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final ConnectionAuthorizable connAuth = lookup.getConnection(id); - final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier()); - dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - serviceFacade.verifyListQueue(id); - return generateContinueResponse().build(); - } - - // ensure the id is the same across the cluster - final String listingRequestId = generateUuid(); - - // submit the listing request - final ListingRequestDTO listingRequest = serviceFacade.createFlowFileListingRequest(id, listingRequestId); - populateRemainingFlowFileListingContent(id, listingRequest); - - // create the response entity - final ListingRequestEntity entity = new ListingRequestEntity(); - entity.setListingRequest(listingRequest); - - // generate the URI where the response will be - final URI location = URI.create(listingRequest.getUri()); - return Response.status(Status.ACCEPTED).location(location).entity(entity).build(); + final ConnectionEntity requestConnectionEntity = new ConnectionEntity(); + requestConnectionEntity.setId(id); + + return withWriteLock( + serviceFacade, + requestConnectionEntity, + lookup -> { + final ConnectionAuthorizable connAuth = lookup.getConnection(id); + final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier()); + dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }, + () -> serviceFacade.verifyListQueue(id), + (connectionEntity) -> { + // ensure the id is the same across the cluster + final String listingRequestId = generateUuid(); + + // submit the listing request + final ListingRequestDTO listingRequest = serviceFacade.createFlowFileListingRequest(connectionEntity.getId(), listingRequestId); + populateRemainingFlowFileListingContent(connectionEntity.getId(), listingRequest); + + // create the response entity + final ListingRequestEntity entity = new ListingRequestEntity(); + entity.setListingRequest(listingRequest); + + // generate the URI where the response will be + final URI location = URI.create(listingRequest.getUri()); + return Response.status(Status.ACCEPTED).location(location).entity(entity).build(); + } + ); } /** @@ -458,34 +460,50 @@ public class FlowFileQueueResource extends ApplicationResource { return replicate(HttpMethod.DELETE); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId); - final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier()); - dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } - - // delete the listing request - final ListingRequestDTO listingRequest = serviceFacade.deleteFlowFileListingRequest(connectionId, listingRequestId); + return withWriteLock( + serviceFacade, + new ListingEntity(connectionId, listingRequestId), + lookup -> { + final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId); + final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier()); + dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + }, + null, + (listingEntity) -> { + // delete the listing request + final ListingRequestDTO listingRequest = serviceFacade.deleteFlowFileListingRequest(listingEntity.getConnectionId(), listingEntity.getListingRequestId()); + + // prune the results as they were already received when the listing completed + listingRequest.setFlowFileSummaries(null); + + // populate remaining content + populateRemainingFlowFileListingContent(listingEntity.getConnectionId(), listingRequest); + + // create the response entity + final ListingRequestEntity entity = new ListingRequestEntity(); + entity.setListingRequest(listingRequest); + + return generateOkResponse(entity).build(); + } + ); + } - // prune the results as they were already received when the listing completed - listingRequest.setFlowFileSummaries(null); + private static class ListingEntity extends Entity { + final String connectionId; + final String listingRequestId; - // populate remaining content - populateRemainingFlowFileListingContent(connectionId, listingRequest); + public ListingEntity(String connectionId, String listingRequestId) { + this.connectionId = connectionId; + this.listingRequestId = listingRequestId; + } - // create the response entity - final ListingRequestEntity entity = new ListingRequestEntity(); - entity.setListingRequest(listingRequest); + public String getConnectionId() { + return connectionId; + } - return generateOkResponse(entity).build(); + public String getListingRequestId() { + return listingRequestId; + } } /** @@ -528,34 +546,35 @@ public class FlowFileQueueResource extends ApplicationResource { return replicate(HttpMethod.POST); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final ConnectionAuthorizable connAuth = lookup.getConnection(id); - final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier()); - dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } - - // ensure the id is the same across the cluster - final String dropRequestId = generateUuid(); - - // submit the drop request - final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(id, dropRequestId); - dropRequest.setUri(generateResourceUri("flowfile-queues", id, "drop-requests", dropRequest.getId())); - - // create the response entity - final DropRequestEntity entity = new DropRequestEntity(); - entity.setDropRequest(dropRequest); - - // generate the URI where the response will be - final URI location = URI.create(dropRequest.getUri()); - return Response.status(Status.ACCEPTED).location(location).entity(entity).build(); + final ConnectionEntity requestConnectionEntity = new ConnectionEntity(); + requestConnectionEntity.setId(id); + + return withWriteLock( + serviceFacade, + requestConnectionEntity, + lookup -> { + final ConnectionAuthorizable connAuth = lookup.getConnection(id); + final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier()); + dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + (connectionEntity) -> { + // ensure the id is the same across the cluster + final String dropRequestId = generateUuid(); + + // submit the drop request + final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(connectionEntity.getId(), dropRequestId); + dropRequest.setUri(generateResourceUri("flowfile-queues", connectionEntity.getId(), "drop-requests", dropRequest.getId())); + + // create the response entity + final DropRequestEntity entity = new DropRequestEntity(); + entity.setDropRequest(dropRequest); + + // generate the URI where the response will be + final URI location = URI.create(dropRequest.getUri()); + return Response.status(Status.ACCEPTED).location(location).entity(entity).build(); + } + ); } /** @@ -664,29 +683,45 @@ public class FlowFileQueueResource extends ApplicationResource { return replicate(HttpMethod.DELETE); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId); - final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier()); - dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } + return withWriteLock( + serviceFacade, + new DropEntity(connectionId, dropRequestId), + lookup -> { + final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId); + final Authorizable dataAuthorizable = lookup.getData(connAuth.getSource().getIdentifier()); + dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + (dropEntity) -> { + // delete the drop request + final DropRequestDTO dropRequest = serviceFacade.deleteFlowFileDropRequest(dropEntity.getConnectionId(), dropEntity.getDropRequestId()); + dropRequest.setUri(generateResourceUri("flowfile-queues", dropEntity.getConnectionId(), "drop-requests", dropEntity.getDropRequestId())); + + // create the response entity + final DropRequestEntity entity = new DropRequestEntity(); + entity.setDropRequest(dropRequest); + + return generateOkResponse(entity).build(); + } + ); + } - // delete the drop request - final DropRequestDTO dropRequest = serviceFacade.deleteFlowFileDropRequest(connectionId, dropRequestId); - dropRequest.setUri(generateResourceUri("flowfile-queues", connectionId, "drop-requests", dropRequestId)); + private static class DropEntity extends Entity { + final String connectionId; + final String dropRequestId; - // create the response entity - final DropRequestEntity entity = new DropRequestEntity(); - entity.setDropRequest(dropRequest); + public DropEntity(String connectionId, String dropRequestId) { + this.connectionId = connectionId; + this.dropRequestId = dropRequestId; + } - return generateOkResponse(entity).build(); + public String getConnectionId() { + return connectionId; + } + + public String getDropRequestId() { + return dropRequestId; + } } // setters http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index 06470c4..dd5a220 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -527,7 +527,7 @@ public class FlowResource extends ApplicationResource { * * @param httpServletRequest request * @param id The id of the process group. - * @param scheduleComponentsEntity A scheduleComponentsEntity. + * @param requestScheduleComponentsEntity A scheduleComponentsEntity. * @return A processGroupEntity. */ @PUT @@ -559,20 +559,23 @@ public class FlowResource extends ApplicationResource { required = true ) @PathParam("id") String id, - ScheduleComponentsEntity scheduleComponentsEntity) { + @ApiParam( + value = "The request to schedule or unschedule. If the comopnents in the request are not specified, all authorized components will be considered.", + required = true + ) final ScheduleComponentsEntity requestScheduleComponentsEntity) { // ensure the same id is being used - if (!id.equals(scheduleComponentsEntity.getId())) { + if (!id.equals(requestScheduleComponentsEntity.getId())) { throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does " - + "not equal the process group id of the requested resource (%s).", scheduleComponentsEntity.getId(), id)); + + "not equal the process group id of the requested resource (%s).", requestScheduleComponentsEntity.getId(), id)); } final ScheduledState state; - if (scheduleComponentsEntity.getState() == null) { + if (requestScheduleComponentsEntity.getState() == null) { throw new IllegalArgumentException("The scheduled state must be specified."); } else { try { - state = ScheduledState.valueOf(scheduleComponentsEntity.getState()); + state = ScheduledState.valueOf(requestScheduleComponentsEntity.getState()); } catch (final IllegalArgumentException iae) { throw new IllegalArgumentException(String.format("The scheduled must be one of [%s].", StringUtils.join(EnumSet.of(ScheduledState.RUNNING, ScheduledState.STOPPED), ", "))); } @@ -584,7 +587,7 @@ public class FlowResource extends ApplicationResource { } // if the components are not specified, gather all components and their current revision - if (scheduleComponentsEntity.getComponents() == null) { + if (requestScheduleComponentsEntity.getComponents() == null) { // get the current revisions for the components being updated final Set<Revision> revisions = serviceFacade.getRevisionsFromGroup(id, group -> { final Set<String> componentIds = new HashSet<>(); @@ -626,34 +629,42 @@ public class FlowResource extends ApplicationResource { }); // set the components and their current revision - scheduleComponentsEntity.setComponents(componentsToSchedule); + requestScheduleComponentsEntity.setComponents(componentsToSchedule); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, scheduleComponentsEntity); + return replicate(HttpMethod.PUT, requestScheduleComponentsEntity); } - final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents(); - final Map<String, Revision> componentRevisions = componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey()))); - final Set<Revision> revisions = new HashSet<>(componentRevisions.values()); + final Map<String, RevisionDTO> requestComponentsToSchedule = requestScheduleComponentsEntity.getComponents(); + final Map<String, Revision> requestComponentRevisions = + requestComponentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey()))); + final Set<Revision> requestRevisions = new HashSet<>(requestComponentRevisions.values()); return withWriteLock( serviceFacade, - revisions, + requestScheduleComponentsEntity, + requestRevisions, lookup -> { // ensure access to the flow authorizeFlow(); // ensure access to every component being scheduled - componentsToSchedule.keySet().forEach(componentId -> { + requestComponentsToSchedule.keySet().forEach(componentId -> { final Authorizable connectable = lookup.getConnectable(componentId); connectable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }); }, - () -> serviceFacade.verifyScheduleComponents(id, state, componentRevisions.keySet()), - () -> { + () -> serviceFacade.verifyScheduleComponents(id, state, requestComponentRevisions.keySet()), + (revisions, scheduleComponentsEntity) -> { + final ScheduledState scheduledState = ScheduledState.valueOf(scheduleComponentsEntity.getState()); + + final Map<String, RevisionDTO> componentsToSchedule = scheduleComponentsEntity.getComponents(); + final Map<String, Revision> componentRevisions = + componentsToSchedule.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getRevision(e.getValue(), e.getKey()))); + // update the process group - final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, state, componentRevisions); + final ScheduleComponentsEntity entity = serviceFacade.scheduleComponents(id, scheduledState, componentRevisions); return clusterContext(generateOkResponse(entity)).build(); } ); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java index c23b1b9..a6747d8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java @@ -142,7 +142,7 @@ public class FunnelResource extends ApplicationResource { * * @param httpServletRequest request * @param id The id of the funnel to update. - * @param funnelEntity A funnelEntity. + * @param requestFunnelEntity A funnelEntity. * @return A funnelEntity. */ @PUT @@ -175,40 +175,41 @@ public class FunnelResource extends ApplicationResource { @ApiParam( value = "The funnel configuration details.", required = true - ) final FunnelEntity funnelEntity) { + ) final FunnelEntity requestFunnelEntity) { - if (funnelEntity == null || funnelEntity.getComponent() == null) { + if (requestFunnelEntity == null || requestFunnelEntity.getComponent() == null) { throw new IllegalArgumentException("Funnel details must be specified."); } - if (funnelEntity.getRevision() == null) { + if (requestFunnelEntity.getRevision() == null) { throw new IllegalArgumentException("Revision must be specified."); } // ensure the ids are the same - final FunnelDTO requestFunnelDTO = funnelEntity.getComponent(); + final FunnelDTO requestFunnelDTO = requestFunnelEntity.getComponent(); if (!id.equals(requestFunnelDTO.getId())) { throw new IllegalArgumentException(String.format("The funnel id (%s) in the request body does not equal the " + "funnel id of the requested resource (%s).", requestFunnelDTO.getId(), id)); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, funnelEntity); + return replicate(HttpMethod.PUT, requestFunnelEntity); } // Extract the revision - final Revision revision = getRevision(funnelEntity, id); + final Revision requestRevision = getRevision(requestFunnelEntity, id); return withWriteLock( serviceFacade, - revision, + requestFunnelEntity, + requestRevision, lookup -> { Authorizable authorizable = lookup.getFunnel(id); authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, null, - () -> { + (revision, funnelEntity) -> { // update the funnel - final FunnelEntity entity = serviceFacade.updateFunnel(revision, requestFunnelDTO); + final FunnelEntity entity = serviceFacade.updateFunnel(revision, funnelEntity.getComponent()); populateRemainingFunnelEntityContent(entity); return clusterContext(generateOkResponse(entity)).build(); @@ -270,19 +271,23 @@ public class FunnelResource extends ApplicationResource { return replicate(HttpMethod.DELETE); } + final FunnelEntity requestFunnelEntity = new FunnelEntity(); + requestFunnelEntity.setId(id); + // handle expects request (usually from the cluster manager) - final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); return withWriteLock( serviceFacade, - revision, + requestFunnelEntity, + requestRevision, lookup -> { final Authorizable funnel = lookup.getFunnel(id); funnel.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, () -> serviceFacade.verifyDeleteFunnel(id), - () -> { + (revision, funnelEntity) -> { // delete the specified funnel - final FunnelEntity entity = serviceFacade.deleteFunnel(revision, id); + final FunnelEntity entity = serviceFacade.deleteFunnel(revision, funnelEntity.getId()); return clusterContext(generateOkResponse(entity)).build(); } ); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java index a57e8aa..a295fc6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java @@ -142,7 +142,7 @@ public class InputPortResource extends ApplicationResource { * * @param httpServletRequest request * @param id The id of the input port to update. - * @param portEntity A inputPortEntity. + * @param requestPortEntity A inputPortEntity. * @return A inputPortEntity. */ @PUT @@ -175,40 +175,43 @@ public class InputPortResource extends ApplicationResource { @ApiParam( value = "The input port configuration details.", required = true - ) final PortEntity portEntity) { + ) final PortEntity requestPortEntity) { - if (portEntity == null || portEntity.getComponent() == null) { + if (requestPortEntity == null || requestPortEntity.getComponent() == null) { throw new IllegalArgumentException("Input port details must be specified."); } - if (portEntity.getRevision() == null) { + if (requestPortEntity.getRevision() == null) { throw new IllegalArgumentException("Revision must be specified."); } // ensure the ids are the same - final PortDTO requestPortDTO = portEntity.getComponent(); + final PortDTO requestPortDTO = requestPortEntity.getComponent(); if (!id.equals(requestPortDTO.getId())) { throw new IllegalArgumentException(String.format("The input port id (%s) in the request body does not equal the " + "input port id of the requested resource (%s).", requestPortDTO.getId(), id)); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, portEntity); + return replicate(HttpMethod.PUT, requestPortEntity); } // handle expects request (usually from the cluster manager) - final Revision revision = getRevision(portEntity, id); + final Revision requestRevision = getRevision(requestPortEntity, id); return withWriteLock( serviceFacade, - revision, + requestPortEntity, + requestRevision, lookup -> { Authorizable authorizable = lookup.getInputPort(id); authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, () -> serviceFacade.verifyUpdateInputPort(requestPortDTO), - () -> { + (revision, portEntity) -> { + final PortDTO portDTO = portEntity.getComponent(); + // update the input port - final PortEntity entity = serviceFacade.updateInputPort(revision, requestPortDTO); + final PortEntity entity = serviceFacade.updateInputPort(revision, portDTO); populateRemainingInputPortEntityContent(entity); return clusterContext(generateOkResponse(entity)).build(); @@ -267,19 +270,23 @@ public class InputPortResource extends ApplicationResource { return replicate(HttpMethod.DELETE); } + final PortEntity requestPortEntity = new PortEntity(); + requestPortEntity.setId(id); + // handle expects request (usually from the cluster manager) - final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); return withWriteLock( serviceFacade, - revision, + requestPortEntity, + requestRevision, lookup -> { final Authorizable inputPort = lookup.getInputPort(id); inputPort.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, () -> serviceFacade.verifyDeleteInputPort(id), - () -> { + (revision, portEntity) -> { // delete the specified input port - final PortEntity entity = serviceFacade.deleteInputPort(revision, id); + final PortEntity entity = serviceFacade.deleteInputPort(revision, portEntity.getId()); return clusterContext(generateOkResponse(entity)).build(); } ); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java index ddde515..64ddde3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java @@ -142,7 +142,7 @@ public class LabelResource extends ApplicationResource { * * @param httpServletRequest request * @param id The id of the label to update. - * @param labelEntity A labelEntity. + * @param requestLabelEntity A labelEntity. * @return A labelEntity. */ @PUT @@ -175,40 +175,43 @@ public class LabelResource extends ApplicationResource { @ApiParam( value = "The label configuraiton details.", required = true - ) final LabelEntity labelEntity) { + ) final LabelEntity requestLabelEntity) { - if (labelEntity == null || labelEntity.getComponent() == null) { + if (requestLabelEntity == null || requestLabelEntity.getComponent() == null) { throw new IllegalArgumentException("Label details must be specified."); } - if (labelEntity.getRevision() == null) { + if (requestLabelEntity.getRevision() == null) { throw new IllegalArgumentException("Revision must be specified."); } // ensure the ids are the same - final LabelDTO requestLabelDTO = labelEntity.getComponent(); + final LabelDTO requestLabelDTO = requestLabelEntity.getComponent(); if (!id.equals(requestLabelDTO.getId())) { throw new IllegalArgumentException(String.format("The label id (%s) in the request body does not equal the " + "label id of the requested resource (%s).", requestLabelDTO.getId(), id)); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, labelEntity); + return replicate(HttpMethod.PUT, requestLabelEntity); } // handle expects request (usually from the cluster manager) - final Revision revision = getRevision(labelEntity, id); + final Revision requestRevision = getRevision(requestLabelEntity, id); return withWriteLock( serviceFacade, - revision, + requestLabelEntity, + requestRevision, lookup -> { Authorizable authorizable = lookup.getLabel(id); authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, null, - () -> { + (revision, labelEntity) -> { + final LabelDTO labelDTO = labelEntity.getComponent(); + // update the label - final LabelEntity entity = serviceFacade.updateLabel(revision, requestLabelDTO); + final LabelEntity entity = serviceFacade.updateLabel(revision, labelDTO); populateRemainingLabelEntityContent(entity); return clusterContext(generateOkResponse(entity)).build(); @@ -267,19 +270,23 @@ public class LabelResource extends ApplicationResource { return replicate(HttpMethod.DELETE); } + final LabelEntity requestLabelEntity = new LabelEntity(); + requestLabelEntity.setId(id); + // handle expects request (usually from the cluster manager) - final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); return withWriteLock( serviceFacade, - revision, + requestLabelEntity, + requestRevision, lookup -> { final Authorizable label = lookup.getLabel(id); label.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, null, - () -> { + (revision, labelEntity) -> { // delete the specified label - final LabelEntity entity = serviceFacade.deleteLabel(revision, id); + final LabelEntity entity = serviceFacade.deleteLabel(revision, labelEntity.getId()); return clusterContext(generateOkResponse(entity)).build(); } ); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java index 70a9e2d..4070415 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java @@ -142,7 +142,7 @@ public class OutputPortResource extends ApplicationResource { * * @param httpServletRequest request * @param id The id of the output port to update. - * @param portEntity A outputPortEntity. + * @param requestPortEntity A outputPortEntity. * @return A outputPortEntity. */ @PUT @@ -175,40 +175,43 @@ public class OutputPortResource extends ApplicationResource { @ApiParam( value = "The output port configuration details.", required = true - ) final PortEntity portEntity) { + ) final PortEntity requestPortEntity) { - if (portEntity == null || portEntity.getComponent() == null) { + if (requestPortEntity == null || requestPortEntity.getComponent() == null) { throw new IllegalArgumentException("Output port details must be specified."); } - if (portEntity.getRevision() == null) { + if (requestPortEntity.getRevision() == null) { throw new IllegalArgumentException("Revision must be specified."); } // ensure the ids are the same - PortDTO requestPortDTO = portEntity.getComponent(); + PortDTO requestPortDTO = requestPortEntity.getComponent(); if (!id.equals(requestPortDTO.getId())) { throw new IllegalArgumentException(String.format("The output port id (%s) in the request body does not equal the " + "output port id of the requested resource (%s).", requestPortDTO.getId(), id)); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, portEntity); + return replicate(HttpMethod.PUT, requestPortEntity); } // handle expects request (usually from the cluster manager) - final Revision revision = getRevision(portEntity, id); + final Revision requestRevision = getRevision(requestPortEntity, id); return withWriteLock( serviceFacade, - revision, + requestPortEntity, + requestRevision, lookup -> { Authorizable authorizable = lookup.getOutputPort(id); authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, () -> serviceFacade.verifyUpdateOutputPort(requestPortDTO), - () -> { + (revision, portEntity) -> { + final PortDTO portDTO = portEntity.getComponent(); + // update the output port - final PortEntity entity = serviceFacade.updateOutputPort(revision, requestPortDTO); + final PortEntity entity = serviceFacade.updateOutputPort(revision, portDTO); populateRemainingOutputPortEntityContent(entity); return clusterContext(generateOkResponse(entity)).build(); @@ -267,19 +270,23 @@ public class OutputPortResource extends ApplicationResource { return replicate(HttpMethod.DELETE); } + final PortEntity requestPortEntity = new PortEntity(); + requestPortEntity.setId(id); + // handle expects request (usually from the cluster manager) - final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); return withWriteLock( serviceFacade, - revision, + requestPortEntity, + requestRevision, lookup -> { final Authorizable outputPort = lookup.getOutputPort(id); outputPort.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, () -> serviceFacade.verifyDeleteOutputPort(id), - () -> { + (revision, portEntity) -> { // delete the specified output port - final PortEntity entity = serviceFacade.deleteOutputPort(revision, id); + final PortEntity entity = serviceFacade.deleteOutputPort(revision, portEntity.getId()); return clusterContext(generateOkResponse(entity)).build(); } );
