http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 0600cb6..cb44834 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,30 +16,6 @@ */ package org.apache.nifi.web; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import javax.ws.rs.WebApplicationException; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; @@ -64,7 +40,6 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; -import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; @@ -139,7 +114,6 @@ import org.apache.nifi.web.api.dto.action.ActionDTO; import org.apache.nifi.web.api.dto.action.HistoryDTO; import org.apache.nifi.web.api.dto.action.HistoryQueryDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; -import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; @@ -161,10 +135,12 @@ import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.LabelEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; +import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; import org.apache.nifi.web.api.entity.SnippetEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.dao.ConnectionDAO; @@ -191,6 +167,29 @@ import org.apache.nifi.web.util.SnippetUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.WebApplicationException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + /** * Implementation of NiFiServiceFacade that performs revision checking. */ @@ -232,12 +231,154 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private Authorizer authorizer; + private final AuthorizableLookup authorizableLookup = new AuthorizableLookup() { + @Override + public Authorizable getProcessor(final String id) { + return processorDAO.getProcessor(id); + } + + @Override + public Authorizable getInputPort(final String id) { + return inputPortDAO.getPort(id); + } + + @Override + public Authorizable getOutputPort(final String id) { + return outputPortDAO.getPort(id); + } + + @Override + public Authorizable getConnection(final String id) { + return connectionDAO.getConnection(id); + } + + @Override + public Authorizable getProcessGroup(final String id) { + return processGroupDAO.getProcessGroup(id); + } + + @Override + public Authorizable getRemoteProcessGroup(final String id) { + return remoteProcessGroupDAO.getRemoteProcessGroup(id); + } + + @Override + public Authorizable getRemoteProcessGroupInputPort(final String remoteProcessGroupId, final String id) { + final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); + return remoteProcessGroup.getInputPort(id); + } + + @Override + public Authorizable getRemoteProcessGroupOutputPort(final String remoteProcessGroupId, final String id) { + final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); + return remoteProcessGroup.getOutputPort(id); + } + + @Override + public Authorizable getLabel(final String id) { + return labelDAO.getLabel(id); + } + + @Override + public Authorizable getFunnel(final String id) { + return funnelDAO.getFunnel(id); + } + + @Override + public Authorizable getControllerService(final String id) { + return controllerServiceDAO.getControllerService(id); + } + + @Override + public Authorizable getControllerServiceReferencingComponent(String controllerSeriveId, String id) { + final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerSeriveId); + final ControllerServiceReference referencingComponents = controllerService.getReferences(); + + ConfiguredComponent reference = null; + for (final ConfiguredComponent component : referencingComponents.getReferencingComponents()) { + if (component.getIdentifier().equals(id)) { + reference = component; + break; + } + } + + if (reference == null) { + throw new ResourceNotFoundException("Unable to find referencing component with id " + id); + } + + return reference; + } + + @Override + public Authorizable getReportingTask(final String id) { + return reportingTaskDAO.getReportingTask(id); + } + + @Override + public Snippet getSnippet(final String id) { + return snippetDAO.getSnippet(id); + } + + @Override + public Authorizable getTemplate(final String id) { + return templateDAO.getTemplate(id); + } + + @Override + public Authorizable getConnectable(String id) { + final ProcessGroup group = processGroupDAO.getProcessGroup(controllerFacade.getRootGroupId()); + return group.findConnectable(id); + } + }; + // ----------------------------------------- // Synchronization methods // ----------------------------------------- + + public void authorizeAccess(AuthorizeAccess authorizeAccess) { + authorizeAccess.authorize(authorizableLookup); + } + + @Override + public void claimRevision(Revision revision, NiFiUser user) { + revisionManager.requestClaim(revision, user); + } + + @Override + public void claimRevisions(Set<Revision> revisions, NiFiUser user) { + revisionManager.requestClaim(revisions, user); + } + + @Override + public void cancelRevision(Revision revision) { + revisionManager.cancelClaim(revision); + } + + @Override + public void cancelRevisions(Set<Revision> revisions) { + revisionManager.cancelClaims(revisions); + } + @Override - public void claimRevision(Revision revision) { - revisionManager.requestClaim(revision, NiFiUserUtils.getNiFiUser()); + public Set<Revision> getRevisionsFromGroup(String groupId, Function<ProcessGroup, Set<String>> getComponents) { + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + final Set<String> componentIds = revisionManager.get(group.getIdentifier(), rev -> getComponents.apply(group)); + return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); + } + + @Override + public Set<Revision> getRevisionsFromSnippet(String snippetId) { + final Snippet snippet = snippetDAO.getSnippet(snippetId); + final Set<String> componentIds = new HashSet<>(); + componentIds.addAll(snippet.getProcessors().keySet()); + componentIds.addAll(snippet.getFunnels().keySet()); + componentIds.addAll(snippet.getLabels().keySet()); + componentIds.addAll(snippet.getConnections().keySet()); + componentIds.addAll(snippet.getInputPorts().keySet()); + componentIds.addAll(snippet.getOutputPorts().keySet()); + componentIds.addAll(snippet.getProcessGroups().keySet()); + componentIds.addAll(snippet.getRemoteProcessGroups().keySet()); + return componentIds.stream().map(id -> revisionManager.getRevision(id)).collect(Collectors.toSet()); } // ----------------------------------------- @@ -338,15 +479,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyUpdateProcessGroup(ProcessGroupDTO processGroupDTO) { + public void verifyScheduleComponents(String groupId, ScheduledState state, Set<String> componentIds) { try { - // if group does not exist, then the update request is likely creating it - // so we don't verify since it will fail - if (processGroupDAO.hasProcessGroup(processGroupDTO.getId())) { - processGroupDAO.verifyUpdate(processGroupDTO); - } + processGroupDAO.verifyScheduleComponents(groupId, state, componentIds); } catch (final Exception e) { - revisionManager.cancelClaim(processGroupDTO.getId()); + componentIds.forEach(id -> revisionManager.cancelClaim(id)); throw e; } } @@ -531,7 +668,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { * @param <D> the DTO Type of the updated component * @param <C> the Component Type of the updated component * - * @return A ConfigurationSnapshot that represents the new configuration + * @return A RevisionUpdate that represents the new configuration */ private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); @@ -540,12 +677,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new UpdateRevisionTask<D>() { @Override public RevisionUpdate<D> update() { - // ensure write access to the flow - authorizable.authorize(authorizer, RequestAction.WRITE); - - // also ensure read access to the flow as the component must be read in order to generate a response - authorizable.authorize(authorizer, RequestAction.READ); - // get the updated component final C component = daoUpdate.get(); @@ -568,164 +699,50 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override - public void verifyUpdateSnippet(SnippetDTO snippetDto) { + public void verifyUpdateSnippet(SnippetDTO snippetDto, final Set<String> affectedComponentIds) { try { // if snippet does not exist, then the update request is likely creating it // so we don't verify since it will fail if (snippetDAO.hasSnippet(snippetDto.getId())) { - snippetDAO.verifyUpdate(snippetDto); + snippetDAO.verifyUpdateSnippetComponent(snippetDto); } } catch (final Exception e) { - revisionManager.cancelClaim(snippetDto.getId()); + affectedComponentIds.forEach(id -> revisionManager.cancelClaim(snippetDto.getId())); throw e; } } - private Set<Revision> getRevisionsForGroup(final String groupId) { - final Set<Revision> revisions = new HashSet<>(); - - revisions.add(revisionManager.getRevision(groupId)); - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - if (processGroup == null) { - throw new IllegalArgumentException("Snippet contains a reference to Process Group with ID " + groupId + " but no Process Group exists with that ID"); - } - - processGroup.getConnections().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); - processGroup.getFunnels().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); - processGroup.getInputPorts().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); - processGroup.getOutputPorts().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); - processGroup.getLabels().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); - processGroup.getProcessors().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); - processGroup.getRemoteProcessGroups().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); - processGroup.getProcessGroups().stream().map(c -> c.getIdentifier()).forEach(id -> revisions.addAll(getRevisionsForGroup(id))); - - return revisions; - } - - private Set<Revision> getRevisionsForSnippet(final SnippetDTO snippetDto) { - final Set<Revision> requiredRevisions = new HashSet<>(); - requiredRevisions.add(revisionManager.getRevision(snippetDto.getId())); - snippetDto.getConnections().entrySet().stream() - .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) - .forEach(rev -> requiredRevisions.add(rev)); - - snippetDto.getFunnels().entrySet().stream() - .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) - .forEach(rev -> requiredRevisions.add(rev)); - - snippetDto.getInputPorts().entrySet().stream() - .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) - .forEach(rev -> requiredRevisions.add(rev)); - - snippetDto.getOutputPorts().entrySet().stream() - .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) - .forEach(rev -> requiredRevisions.add(rev)); - - snippetDto.getLabels().entrySet().stream() - .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) - .forEach(rev -> requiredRevisions.add(rev)); - - snippetDto.getProcessors().entrySet().stream() - .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) - .forEach(rev -> requiredRevisions.add(rev)); - - snippetDto.getRemoteProcessGroups().entrySet().stream() - .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) - .forEach(rev -> requiredRevisions.add(rev)); - - for (final String groupId : snippetDto.getProcessGroups().keySet()) { - requiredRevisions.addAll(getRevisionsForGroup(groupId)); - } - - return requiredRevisions; - } - - private ProcessGroup getGroup(final String groupId) { - return revisionManager.get(groupId, rev -> processGroupDAO.getProcessGroup(groupId)); - } - @Override - public UpdateResult<SnippetEntity> updateSnippet(final Revision revision, final SnippetDTO snippetDto) { - // if label does not exist, then create new label - if (snippetDAO.hasSnippet(snippetDto.getId()) == false) { - return new UpdateResult<>(createSnippet(snippetDto), true); - } - - final Set<Revision> requiredRevisions = getRevisionsForSnippet(snippetDto); - - // if the parent group is specified in the request, ensure write access to it as it could be moving the components in the snippet - final String requestProcessGroupIdentifier = snippetDto.getParentGroupId(); - if (requestProcessGroupIdentifier != null) { - final ProcessGroup requestProcessGroup = processGroupDAO.getProcessGroup(requestProcessGroupIdentifier); - requestProcessGroup.authorize(authorizer, RequestAction.WRITE); - } - + public SnippetEntity updateSnippet(final Set<Revision> revisions, final SnippetDTO snippetDto) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final String modifier = user.getUserName(); - final RevisionClaim revisionClaim = new StandardRevisionClaim(requiredRevisions); + final RevisionClaim revisionClaim = new StandardRevisionClaim(revisions); - RevisionUpdate<SnippetDTO> versionedSnippet; + RevisionUpdate<SnippetDTO> snapshot; try { - versionedSnippet = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<SnippetDTO>() { + snapshot = revisionManager.updateRevision(revisionClaim, user, new UpdateRevisionTask<SnippetDTO>() { @Override public RevisionUpdate<SnippetDTO> update() { // get the updated component - final Snippet snippet = snippetDAO.updateSnippet(snippetDto); + final Snippet snippet = snippetDAO.updateSnippetComponents(snippetDto); - // ensure write access to the flow - final ProcessGroup processGroup = getGroup(snippet.getParentGroupId()); - processGroup.authorize(authorizer, RequestAction.WRITE); + // drop the snippet + snippetDAO.dropSnippet(snippet.getId()); // save updated controller controllerFacade.save(); - final SnippetDTO snippetDto = dtoFactory.createSnippetDto(snippet); - - // Update each of the revisions that were required and - // build new SnippetDTO that contains all of the updated revisions - final SnippetDTO updatedSnippet = new SnippetDTO(); - updatedSnippet.setId(snippetDto.getId()); - updatedSnippet.setParentGroupId(snippetDto.getParentGroupId()); - updatedSnippet.setUri(snippetDto.getUri()); - updatedSnippet.setLinked(snippetDto.isLinked()); - - updatedSnippet.setConnections(updateRevisions(snippetDto.getConnections(), modifier)); - updatedSnippet.setFunnels(updateRevisions(snippetDto.getFunnels(), modifier)); - updatedSnippet.setInputPorts(updateRevisions(snippetDto.getInputPorts(), modifier)); - updatedSnippet.setLabels(updateRevisions(snippetDto.getLabels(), modifier)); - updatedSnippet.setOutputPorts(updateRevisions(snippetDto.getOutputPorts(), modifier)); - updatedSnippet.setProcessGroups(updateRevisions(snippetDto.getProcessGroups(), modifier)); - updatedSnippet.setProcessors(updateRevisions(snippetDto.getProcessors(), modifier)); - updatedSnippet.setRemoteProcessGroups(updateRevisions(snippetDto.getRemoteProcessGroups(), modifier)); - - final Revision updatedSnippetRevision = incrementRevision(revision); - final FlowModification lastModification = new FlowModification(updatedSnippetRevision, modifier); - return new StandardRevisionUpdate<>(updatedSnippet, lastModification); + // increment the revisions + final Set<Revision> updatedRevisions = revisions.stream().map(revision -> incrementRevision(revision)).collect(Collectors.toSet()); + + final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); + return new StandardRevisionUpdate<>(dto, null, updatedRevisions); } }); } catch (ExpiredRevisionClaimException e) { throw new InvalidRevisionException("Failed to update Snippet", e); } - final ProcessGroup parentGroup = processGroupDAO.getProcessGroup(versionedSnippet.getComponent().getParentGroupId()); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(parentGroup); - return new UpdateResult<>(entityFactory.createSnippetEntity(versionedSnippet.getComponent(), dtoFactory.createRevisionDTO(versionedSnippet.getLastModification()), accessPolicy), false); - } - - private Map<String, RevisionDTO> updateRevisions(final Map<String, RevisionDTO> originalDtos, final String modifier) { - final Map<String, RevisionDTO> updatedComponents = new HashMap<>(originalDtos.size()); - for (final Map.Entry<String, RevisionDTO> entry : originalDtos.entrySet()) { - final String id = entry.getKey(); - final RevisionDTO revisionDto = entry.getValue(); - - final RevisionDTO updatedDto = new RevisionDTO(); - updatedDto.setClientId(revisionDto.getClientId()); - updatedDto.setLastModifier(modifier); - updatedDto.setVersion(revisionDto.getVersion() + 1); - updatedComponents.put(id, updatedDto); - } - - return updatedComponents; + return entityFactory.createSnippetEntity(snapshot.getComponent()); } @Override @@ -840,6 +857,35 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override + public ScheduleComponentsEntity scheduleComponents(final String processGroupId, final ScheduledState state, final Map<String, Revision> componentRevisions) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final RevisionUpdate<ScheduleComponentsEntity> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(componentRevisions.values()), user, new + UpdateRevisionTask<ScheduleComponentsEntity>() { + @Override + public RevisionUpdate<ScheduleComponentsEntity> update() { + // schedule the components + processGroupDAO.scheduleComponents(processGroupId, state, componentRevisions.keySet()); + + // update the revisions + final Map<String, Revision> updatedRevisions = new HashMap<>(); + for (final Revision revision : componentRevisions.values()) { + final Revision currentRevision = revisionManager.getRevision(revision.getComponentId()); + final Revision updatedRevision = incrementRevision(currentRevision); + updatedRevisions.put(revision.getComponentId(), updatedRevision); + } + + // gather details for response + final ScheduleComponentsEntity entity = new ScheduleComponentsEntity(); + entity.setId(processGroupId); + entity.setState(state.name()); + return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); + } + }); + + return updatedComponent.getComponent(); + } + + @Override public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) { final Supplier<ControllerConfigurationDTO> daoUpdate = () -> { // update the controller configuration through the proxy @@ -1032,7 +1078,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { * * @param revision the current revision * @param deleteAction the action that deletes the component via the appropriate DAO object - * @return a ConfigurationSnapshot that represents the new configuration + * @return a dto that represents the new configuration */ private <D, C> D deleteComponent(final Revision revision, final Authorizable authorizable, final Runnable deleteAction, final D dto) { final RevisionClaim claim = new StandardRevisionClaim(revision); @@ -1043,17 +1089,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public D performTask() { logger.debug("Attempting to delete component {} with claim {}", authorizable, claim); - // ensure access to the component - authorizable.authorize(authorizer, RequestAction.WRITE); - - // If the component has outgoing connections, ensure that we can delete them all. - if (authorizable instanceof Connectable) { - final Connectable connectable = (Connectable) authorizable; - for (final Connection connection : connectable.getConnections()) { - connection.authorize(authorizer, RequestAction.WRITE); - } - } - deleteAction.run(); // save the flow @@ -1066,24 +1101,39 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void verifyDeleteSnippet(String id) { - snippetDAO.verifyDelete(id); + public void verifyDeleteSnippet(String snippetId, Set<String> affectedComponentIds) { + try { + snippetDAO.verifyDeleteSnippetComponents(snippetId); + } catch (final Exception e) { + affectedComponentIds.forEach(id -> revisionManager.cancelClaim(id)); + throw e; + } } @Override - public SnippetEntity deleteSnippet(final Revision revision, final String snippetId) { + public SnippetEntity deleteSnippet(final Set<Revision> revisions, final String snippetId) { final Snippet snippet = snippetDAO.getSnippet(snippetId); - final ProcessGroup processGroup = getGroup(snippet.getParentGroupId()); - // ensure access to process group - processGroup.authorize(authorizer, RequestAction.WRITE); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final RevisionClaim claim = new StandardRevisionClaim(revisions); + final SnippetDTO dto = revisionManager.deleteRevision(claim, user, new DeleteRevisionTask<SnippetDTO>() { + @Override + public SnippetDTO performTask() { + // delete the components in the snippet + snippetDAO.deleteSnippetComponents(snippetId); + + // drop the snippet + snippetDAO.dropSnippet(snippetId); - final SnippetDTO snapshot = deleteComponent(revision, - processGroup, - () -> snippetDAO.deleteSnippet(snippetId), - dtoFactory.createSnippetDto(snippet)); + // save + controllerFacade.save(); - return entityFactory.createSnippetEntity(snapshot, null, null); + // create the dto for the snippet that was just removed + return dtoFactory.createSnippetDto(snippet); + } + }); + + return entityFactory.createSnippetEntity(dto); } @Override @@ -1228,10 +1278,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final String groupId = componentDto.getParentGroupId(); return revisionManager.get(groupId, rev -> { - // ensure access to process group - final ProcessGroup parent = processGroupDAO.getProcessGroup(groupId); - parent.authorize(authorizer, RequestAction.WRITE); - // add the component final C component = daoCreation.get(); @@ -1319,21 +1365,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { public FlowEntity copySnippet(final String groupId, final String snippetId, final Double originX, final Double originY, final String idGenerationSeed) { final FlowDTO flowDto = revisionManager.get(groupId, rev -> { - // ensure access to process group - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - processGroup.authorize(authorizer, RequestAction.WRITE); - // create the new snippet final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed); - // TODO - READ access to all components in snippet - // validate the new snippet validateSnippetContents(snippet); // save the flow controllerFacade.save(); + // drop the snippet + snippetDAO.dropSnippet(snippetId); + // identify all components added final Set<String> identifiers = new HashSet<>(); snippet.getProcessors().stream() @@ -1363,6 +1406,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { .map(remoteOutputPort -> remoteOutputPort.getId()) .forEach(id -> identifiers.add(id)); + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); return revisionManager.get(identifiers, () -> { final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); @@ -1381,21 +1425,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public SnippetEntity createSnippet(final SnippetDTO snippetDTO) { - final String modifier = NiFiUserUtils.getNiFiUserName(); - - // ensure id is set - if (StringUtils.isBlank(snippetDTO.getId())) { - snippetDTO.setId(UUID.randomUUID().toString()); - } - final String groupId = snippetDTO.getParentGroupId(); final RevisionUpdate<SnippetDTO> snapshot = revisionManager.get(groupId, rev -> { - // ensure access to process group - final ProcessGroup parent = processGroupDAO.getProcessGroup(groupId); - parent.authorize(authorizer, RequestAction.WRITE); - - // TODO - READ access to all components in snippet - // add the component final Snippet snippet = snippetDAO.createSnippet(snippetDTO); @@ -1403,13 +1434,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); final SnippetDTO dto = dtoFactory.createSnippetDto(snippet); - final FlowModification lastMod = new FlowModification(new Revision(0L, rev.getClientId(), snippetDTO.getId()), modifier); - return new StandardRevisionUpdate<SnippetDTO>(dto, lastMod); + return new StandardRevisionUpdate<SnippetDTO>(dto, null); }); - final ProcessGroup parentGroup = processGroupDAO.getProcessGroup(snapshot.getComponent().getParentGroupId()); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(parentGroup); - return entityFactory.createSnippetEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); + return entityFactory.createSnippetEntity(snapshot.getComponent()); } @Override @@ -1483,6 +1511,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // create the template Template template = templateDAO.createTemplate(templateDTO, groupId); + // drop the snippet + snippetDAO.dropSnippet(snippetId); + return dtoFactory.createTemplateDTO(template); } @@ -1505,16 +1536,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateId, final String idGenerationSeed) { final FlowDTO flowDto = revisionManager.get(groupId, rev -> { - // ensure access to process group - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - processGroup.authorize(authorizer, RequestAction.WRITE); - // instantiate the template - there is no need to make another copy of the flow snippet since the actual template // was copied and this dto is only used to instantiate it's components (which as already completed) final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId, idGenerationSeed); - // TODO - READ access to all components in snippet - // validate the new snippet validateSnippetContents(snippet); @@ -1673,9 +1698,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences(); final Map<String, Revision> updatedRevisions = new HashMap<>(); - for (final Revision refRevision : referenceRevisions.values()) { - updatedRevisions.put(refRevision.getComponentId(), refRevision); - } + for (final Revision refRevision : referenceRevisions.values()) { + updatedRevisions.put(refRevision.getComponentId(), refRevision); + } for (final ConfiguredComponent component : updated) { final Revision currentRevision = revisionManager.getRevision(component.getIdentifier()); @@ -2087,8 +2112,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public FlowFileDTO getFlowFile(String connectionId, String flowFileUuid) { - final Connection connection = connectionDAO.getConnection(connectionId); - connection.authorize(authorizer, RequestAction.WRITE); return dtoFactory.createFlowFileDTO(connectionDAO.getFlowFile(connectionId, flowFileUuid)); } @@ -2437,16 +2460,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public SnippetEntity getSnippet(String snippetId) { - return revisionManager.get(snippetId, rev -> { - final Snippet snippet = snippetDAO.getSnippet(snippetId); - final ProcessGroup parentGroup = processGroupDAO.getProcessGroup(snippet.getParentGroupId()); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(parentGroup); - return entityFactory.createSnippetEntity(dtoFactory.createSnippetDto(snippet), dtoFactory.createRevisionDTO(rev), accessPolicy); - }); - } - - @Override public Set<PortEntity> getInputPorts(String groupId) { final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); group.authorize(authorizer, RequestAction.READ); @@ -2584,12 +2597,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ConfigurationSnapshot<ProcessGroupFlowDTO> getProcessGroupFlow(String groupId, boolean recurse) { + public ProcessGroupFlowEntity getProcessGroupFlow(String groupId, boolean recurse) { return revisionManager.get(groupId, rev -> { // get all identifiers for every child component final Set<String> identifiers = new HashSet<>(); - ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); processGroup.getProcessors().stream() .map(proc -> proc.getIdentifier()) .forEach(id -> identifiers.add(id)); @@ -2616,17 +2629,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { .flatMap(remoteGroup -> remoteGroup.getOutputPorts().stream()) .map(remoteOutputPort -> remoteOutputPort.getIdentifier()) .forEach(id -> identifiers.add(id)); - processGroup.getControllerServices(false).stream() - .map(controllerService -> controllerService.getIdentifier()) - .forEach(id -> identifiers.add(id)); // read lock on every component being accessed in the dto conversion return revisionManager.get(identifiers, () -> { final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId); - ConfigurationSnapshot<ProcessGroupFlowDTO> response = new ConfigurationSnapshot<>(rev.getVersion(), - dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager)); - return response; + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processGroup); + return entityFactory.createProcessGroupFlowEntity(dtoFactory.createProcessGroupFlowDto(processGroup, groupStatus, revisionManager), accessPolicy); }); }); }
http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index cee84eb..c7ea7b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -16,33 +16,25 @@ */ package org.apache.nifi.web.api; -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.TreeMap; -import java.util.UUID; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.core.CacheControl; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.ResponseBuilder; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriBuilderException; -import javax.ws.rs.core.UriInfo; - +import com.sun.jersey.api.core.HttpContext; +import com.sun.jersey.api.representation.Form; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserDetails; +import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.controller.Snippet; +import org.apache.nifi.web.AuthorizableLookup; +import org.apache.nifi.web.AuthorizeAccess; +import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.SnippetDTO; import org.apache.nifi.web.api.entity.ComponentEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter; @@ -52,10 +44,29 @@ import org.slf4j.LoggerFactory; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; -import com.sun.jersey.api.core.HttpContext; -import com.sun.jersey.api.representation.Form; -import com.sun.jersey.core.util.MultivaluedMapImpl; -import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.CacheControl; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriBuilderException; +import javax.ws.rs.core.UriInfo; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Supplier; /** * Base class for controllers. @@ -369,4 +380,128 @@ public abstract class ApplicationResource { protected Revision getRevision(ComponentEntity entity, String componentId) { return getRevision(entity.getRevision(), componentId); } + + /** + * Authorizes the specified Snippet with the specified request action. + * + * @param authorizer authorizer + * @param lookup lookup + * @param action action + */ + protected void authorizeSnippet(final Snippet snippet, final Authorizer authorizer, final AuthorizableLookup lookup, final RequestAction action) { + final Consumer<Authorizable> authorize = authorizable -> authorizable.authorize(authorizer, action); + + snippet.getProcessGroups().keySet().stream().map(id -> lookup.getProcessGroup(id)).forEach(authorize); + snippet.getRemoteProcessGroups().keySet().stream().map(id -> lookup.getRemoteProcessGroup(id)).forEach(authorize); + snippet.getProcessors().keySet().stream().map(id -> lookup.getProcessor(id)).forEach(authorize); + snippet.getInputPorts().keySet().stream().map(id -> lookup.getInputPort(id)).forEach(authorize); + snippet.getOutputPorts().keySet().stream().map(id -> lookup.getOutputPort(id)).forEach(authorize); + snippet.getConnections().keySet().stream().map(id -> lookup.getConnection(id)).forEach(authorize); + snippet.getConnections().keySet().stream().map(id -> lookup.getConnection(id)).forEach(authorize); + snippet.getFunnels().keySet().stream().map(id -> lookup.getFunnel(id)).forEach(authorize); + } + + /** + * Authorizes the specified Snippet with the specified request action. + * + * @param authorizer authorizer + * @param lookup lookup + * @param action action + */ + protected void authorizeSnippet(final SnippetDTO snippet, final Authorizer authorizer, final AuthorizableLookup lookup, final RequestAction action) { + final Consumer<Authorizable> authorize = authorizable -> authorizable.authorize(authorizer, action); + + snippet.getProcessGroups().keySet().stream().map(id -> lookup.getProcessGroup(id)).forEach(authorize); + snippet.getRemoteProcessGroups().keySet().stream().map(id -> lookup.getRemoteProcessGroup(id)).forEach(authorize); + snippet.getProcessors().keySet().stream().map(id -> lookup.getProcessor(id)).forEach(authorize); + snippet.getInputPorts().keySet().stream().map(id -> lookup.getInputPort(id)).forEach(authorize); + snippet.getOutputPorts().keySet().stream().map(id -> lookup.getOutputPort(id)).forEach(authorize); + snippet.getConnections().keySet().stream().map(id -> lookup.getConnection(id)).forEach(authorize); + snippet.getConnections().keySet().stream().map(id -> lookup.getConnection(id)).forEach(authorize); + snippet.getFunnels().keySet().stream().map(id -> lookup.getFunnel(id)).forEach(authorize); + } + + /** + * Executes an action through the service facade using the specified revision. + * + * @param serviceFacade service facade + * @param revision revision + * @param authorizer authorizer + * @param verifier verifier + * @param action executor + * @return the response + */ + protected Response withWriteLock( + final NiFiServiceFacade serviceFacade, final Revision revision, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier<Response> action) { + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(authorizer); + serviceFacade.claimRevision(revision, user); + } + + try { + if (validationPhase) { + if (verifier != null) { + verifier.run(); + } + return generateContinueResponse().build(); + } + } catch (final Exception e) { + serviceFacade.cancelRevision(revision); + throw e; + } + + try { + // delete the specified output port + return action.get(); + } finally { + serviceFacade.cancelRevision(revision); + } + } + + /** + * Executes an action through the service facade using the specified revision. + * + * @param serviceFacade service facade + * @param revisions revisions + * @param authorizer authorizer + * @param verifier verifier + * @param action executor + * @return the response + */ + protected Response withWriteLock( + final NiFiServiceFacade serviceFacade, final Set<Revision> revisions, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier<Response> action) { + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(authorizer); + serviceFacade.claimRevisions(revisions, user); + } + + try { + if (validationPhase) { + if (verifier != null) { + verifier.run(); + } + return generateContinueResponse().build(); + } + } catch (final Exception e) { + serviceFacade.cancelRevisions(revisions); + throw e; + } + + try { + // delete the specified output port + return action.get(); + } finally { + serviceFacade.cancelRevisions(revisions); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index 3c9887a..3c44683 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -16,10 +16,27 @@ */ package org.apache.nifi.web.api; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -35,26 +52,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; -import org.apache.nifi.web.api.dto.ListingRequestDTO; -import org.apache.nifi.web.api.entity.ConnectionEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; - -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.net.URI; +import java.util.Set; /** * RESTful endpoint for managing a Connection. @@ -69,6 +68,7 @@ public class ConnectionResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; /** * Populate the URIs for the specified connections. @@ -187,13 +187,19 @@ public class ConnectionResource extends ApplicationResource { value = "The connection id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable conn = lookup.getConnection(id); + conn.authorize(authorizer, RequestAction.READ); + }); + // get the specified relationship ConnectionEntity entity = serviceFacade.getConnection(id); populateRemainingConnectionEntityContent(entity); @@ -237,11 +243,11 @@ public class ConnectionResource extends ApplicationResource { value = "The connection id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The connection configuration details.", required = true - ) ConnectionEntity connectionEntity) { + ) final ConnectionEntity connectionEntity) { if (connectionEntity == null || connectionEntity.getComponent() == null) { throw new IllegalArgumentException("Connection details must be specified."); @@ -261,38 +267,32 @@ public class ConnectionResource extends ApplicationResource { // replicate if cluster manager if (properties.isClusterManager()) { - // change content type to JSON for serializing entity - final Map<String, String> headersToOverride = new HashMap<>(); - headersToOverride.put("content-type", MediaType.APPLICATION_JSON); - - // replicate the request - return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), connectionEntity, getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), connectionEntity, getHeaders()).getResponse(); } - // handle expects request final Revision revision = getRevision(connectionEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - - if (validationPhase) { - serviceFacade.verifyUpdateConnection(connection); - return generateContinueResponse().build(); - } - - // update the relationship target - final UpdateResult<ConnectionEntity> updateResult = serviceFacade.updateConnection(revision, connection); - - final ConnectionEntity entity = updateResult.getResult(); - populateRemainingConnectionEntityContent(entity); - - // generate the response - if (updateResult.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable conn = lookup.getConnection(id); + conn.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyUpdateConnection(connection), + () -> { + // update the relationship target + final UpdateResult<ConnectionEntity> updateResult = serviceFacade.updateConnection(revision, connection); + + final ConnectionEntity entity = updateResult.getResult(); + populateRemainingConnectionEntityContent(entity); + + // generate the response + if (updateResult.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + } else { + return clusterContext(generateOkResponse(entity)).build(); + } + }); } /** @@ -331,17 +331,17 @@ public class ConnectionResource extends ApplicationResource { value = "The revision is used to verify the client is working with the latest version of the flow.", required = false ) - @QueryParam(VERSION) LongParameter version, + @QueryParam(VERSION) final LongParameter version, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The connection id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -352,22 +352,23 @@ public class ConnectionResource extends ApplicationResource { final Long clientVersion = version == null ? null : version.getLong(); final Revision revision = new Revision(clientVersion, clientId.getClientId(), id); - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - - if (validationPhase) { - serviceFacade.verifyDeleteConnection(id); - return generateContinueResponse().build(); - } - - // delete the connection - final ConnectionEntity entity = serviceFacade.deleteConnection(revision, id); - - // generate the response - return clusterContext(generateOkResponse(entity)).build(); + // get the current user + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable conn = lookup.getConnection(id); + conn.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyDeleteConnection(id), + () -> { + // delete the connection + final ConnectionEntity entity = serviceFacade.deleteConnection(revision, id); + + // generate the response + return clusterContext(generateOkResponse(entity)).build(); + } + ); } // setters @@ -382,4 +383,8 @@ public class ConnectionResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4dd50c80/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java index 705d0d5..90fbc85 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java @@ -23,6 +23,9 @@ import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceState; @@ -64,6 +67,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.net.URI; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -84,6 +88,7 @@ public class ControllerServiceResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; + private Authorizer authorizer; @Context private ServletContext servletContext; @@ -184,13 +189,19 @@ public class ControllerServiceResource extends ApplicationResource { value = "The controller service id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable controllerService = lookup.getControllerService(id); + controllerService.authorize(authorizer, RequestAction.READ); + }); + // get the controller service final ControllerServiceEntity entity = serviceFacade.getControllerService(id); populateRemainingControllerServiceEntityContent(entity); @@ -233,12 +244,12 @@ public class ControllerServiceResource extends ApplicationResource { value = "The controller service id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The property name to return the descriptor for.", required = true ) - @QueryParam("propertyName") String propertyName) { + @QueryParam("propertyName") final String propertyName) { // ensure the property name is specified if (propertyName == null) { @@ -250,6 +261,12 @@ public class ControllerServiceResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable controllerService = lookup.getControllerService(id); + controllerService.authorize(authorizer, RequestAction.READ); + }); + // get the property descriptor final PropertyDescriptorDTO descriptor = serviceFacade.getControllerServicePropertyDescriptor(id, propertyName); @@ -293,13 +310,19 @@ public class ControllerServiceResource extends ApplicationResource { value = "The controller service id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable controllerService = lookup.getControllerService(id); + controllerService.authorize(authorizer, RequestAction.WRITE); + }); + // get the component state final ComponentStateDTO state = serviceFacade.getControllerServiceState(id); @@ -344,12 +367,12 @@ public class ControllerServiceResource extends ApplicationResource { @ApiParam( value = "The revision used to verify the client is working with the latest version of the flow.", required = true - ) ComponentStateEntity revisionEntity, + ) final ComponentStateEntity revisionEntity, @ApiParam( value = "The controller service id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -359,6 +382,11 @@ public class ControllerServiceResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final boolean validationPhase = isValidationPhase(httpServletRequest); if (validationPhase) { + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable controllerService = lookup.getControllerService(id); + controllerService.authorize(authorizer, RequestAction.WRITE); + }); serviceFacade.verifyCanClearControllerServiceState(id); return generateContinueResponse().build(); } @@ -407,13 +435,19 @@ public class ControllerServiceResource extends ApplicationResource { value = "The controller service id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // authorize access + serviceFacade.authorizeAccess(lookup -> { + final Authorizable controllerService = lookup.getControllerService(id); + controllerService.authorize(authorizer, RequestAction.READ); + }); + // get the controller service final ControllerServiceReferencingComponentsEntity entity = serviceFacade.getControllerServiceReferencingComponents(id); @@ -449,11 +483,16 @@ public class ControllerServiceResource extends ApplicationResource { } ) public Response updateControllerServiceReferences( - @Context HttpServletRequest httpServletRequest, + @Context final HttpServletRequest httpServletRequest, + @ApiParam( + value = "The controller service id.", + required = true + ) + @PathParam("id") final String id, @ApiParam( value = "The controller service request update request.", required = true - ) UpdateControllerServiceReferenceRequestEntity updateReferenceRequest) { + ) final UpdateControllerServiceReferenceRequestEntity updateReferenceRequest) { if (updateReferenceRequest.getId() == null) { throw new IllegalArgumentException("The controller service identifier must be specified."); @@ -467,29 +506,31 @@ public class ControllerServiceResource extends ApplicationResource { // need to consider controller service state first as it shares a state with // scheduled state (disabled) which is applicable for referencing services // but not referencing schedulable components - ControllerServiceState controllerServiceState = null; + ControllerServiceState requestControllerServiceState = null; try { - controllerServiceState = ControllerServiceState.valueOf(updateReferenceRequest.getState()); + requestControllerServiceState = ControllerServiceState.valueOf(updateReferenceRequest.getState()); } catch (final IllegalArgumentException iae) { // ignore } - ScheduledState scheduledState = null; + ScheduledState requestScheduledState = null; try { - scheduledState = ScheduledState.valueOf(updateReferenceRequest.getState()); + requestScheduledState = ScheduledState.valueOf(updateReferenceRequest.getState()); } catch (final IllegalArgumentException iae) { // ignore } // ensure an action has been specified - if (scheduledState == null && controllerServiceState == null) { + if (requestScheduledState == null && requestControllerServiceState == null) { throw new IllegalArgumentException("Must specify the updated state. To update referencing Processors " + "and Reporting Tasks the state should be RUNNING or STOPPED. To update the referencing Controller Services the " + "state should be ENABLED or DISABLED."); } // ensure the controller service state is not ENABLING or DISABLING - if (controllerServiceState != null && (ControllerServiceState.ENABLING.equals(controllerServiceState) || ControllerServiceState.DISABLING.equals(controllerServiceState))) { + if (requestControllerServiceState != null + && (ControllerServiceState.ENABLING.equals(requestControllerServiceState) || ControllerServiceState.DISABLING.equals(requestControllerServiceState))) { + throw new IllegalArgumentException("Cannot set the referencing services to ENABLING or DISABLING"); } @@ -504,24 +545,28 @@ public class ControllerServiceResource extends ApplicationResource { final RevisionDTO rev = e.getValue(); return new Revision(rev.getVersion(), rev.getClientId(), e.getKey()); })); - - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - referencingRevisions.entrySet().stream().forEach(e -> { - serviceFacade.claimRevision(e.getValue()); - }); - } - if (validationPhase) { - serviceFacade.verifyUpdateControllerServiceReferencingComponents(updateReferenceRequest.getId(), scheduledState, controllerServiceState); - return generateContinueResponse().build(); - } - - // update the controller service references - final ControllerServiceReferencingComponentsEntity entity = serviceFacade.updateControllerServiceReferencingComponents( - referencingRevisions, updateReferenceRequest.getId(), scheduledState, controllerServiceState); - - return clusterContext(generateOkResponse(entity)).build(); + final Set<Revision> revisions = new HashSet<>(referencingRevisions.values()); + + final ScheduledState scheduledState = requestScheduledState; + final ControllerServiceState controllerServiceState = requestControllerServiceState; + return withWriteLock( + serviceFacade, + revisions, + lookup -> { + referencingRevisions.entrySet().stream().forEach(e -> { + final Authorizable controllerService = lookup.getControllerServiceReferencingComponent(id, e.getKey()); + controllerService.authorize(authorizer, RequestAction.WRITE); + }); + }, + () -> serviceFacade.verifyUpdateControllerServiceReferencingComponents(updateReferenceRequest.getId(), scheduledState, controllerServiceState), + () -> { + // update the controller service references + final ControllerServiceReferencingComponentsEntity entity = serviceFacade.updateControllerServiceReferencingComponents( + referencingRevisions, updateReferenceRequest.getId(), scheduledState, controllerServiceState); + + return clusterContext(generateOkResponse(entity)).build(); + } + ); } /** @@ -559,11 +604,11 @@ public class ControllerServiceResource extends ApplicationResource { value = "The controller service id.", required = true ) - @PathParam("id") String id, + @PathParam("id") final String id, @ApiParam( value = "The controller service configuration details.", required = true - ) ControllerServiceEntity controllerServiceEntity) { + ) final ControllerServiceEntity controllerServiceEntity) { if (controllerServiceEntity == null || controllerServiceEntity.getComponent() == null) { throw new IllegalArgumentException("Controller service details must be specified."); @@ -587,27 +632,29 @@ public class ControllerServiceResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = getRevision(controllerServiceEntity, id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyUpdateControllerService(requestControllerServiceDTO); - return generateContinueResponse().build(); - } - - // update the controller service - final UpdateResult<ControllerServiceEntity> updateResult = serviceFacade.updateControllerService(revision, requestControllerServiceDTO); - - // build the response entity - final ControllerServiceEntity entity = updateResult.getResult(); - populateRemainingControllerServiceContent(entity.getComponent()); - - if (updateResult.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); - } else { - return clusterContext(generateOkResponse(entity)).build(); - } + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable controllerService = lookup.getControllerService(id); + controllerService.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyUpdateControllerService(requestControllerServiceDTO), + () -> { + // update the controller service + final UpdateResult<ControllerServiceEntity> updateResult = serviceFacade.updateControllerService(revision, requestControllerServiceDTO); + + // build the response entity + final ControllerServiceEntity entity = updateResult.getResult(); + populateRemainingControllerServiceContent(entity.getComponent()); + + if (updateResult.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getComponent().getUri()), entity)).build(); + } else { + return clusterContext(generateOkResponse(entity)).build(); + } + } + ); } /** @@ -649,17 +696,17 @@ public class ControllerServiceResource extends ApplicationResource { value = "The revision is used to verify the client is working with the latest version of the flow.", required = false ) - @QueryParam(VERSION) LongParameter version, + @QueryParam(VERSION) final LongParameter version, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final ClientIdParameter clientId, @ApiParam( value = "The controller service id.", required = true ) - @PathParam("id") String id) { + @PathParam("id") final String id) { // replicate if cluster manager if (properties.isClusterManager()) { @@ -668,18 +715,20 @@ public class ControllerServiceResource extends ApplicationResource { // handle expects request (usually from the cluster manager) final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - serviceFacade.claimRevision(revision); - } - if (validationPhase) { - serviceFacade.verifyDeleteControllerService(id); - return generateContinueResponse().build(); - } - - // delete the specified controller service - final ControllerServiceEntity entity = serviceFacade.deleteControllerService(revision, id); - return clusterContext(generateOkResponse(entity)).build(); + return withWriteLock( + serviceFacade, + revision, + lookup -> { + final Authorizable controllerService = lookup.getControllerService(id); + controllerService.authorize(authorizer, RequestAction.WRITE); + }, + () -> serviceFacade.verifyDeleteControllerService(id), + () -> { + // delete the specified controller service + final ControllerServiceEntity entity = serviceFacade.deleteControllerService(revision, id); + return clusterContext(generateOkResponse(entity)).build(); + } + ); } // setters @@ -694,4 +743,8 @@ public class ControllerServiceResource extends ApplicationResource { public void setProperties(NiFiProperties properties) { this.properties = properties; } + + public void setAuthorizer(Authorizer authorizer) { + this.authorizer = authorizer; + } }
