http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index b85c15d..e5b6327 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,7 +16,32 @@ */ package org.apache.nifi.web; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; import org.apache.nifi.action.Action; import org.apache.nifi.action.Component; import org.apache.nifi.action.FlowChangeAction; @@ -45,6 +70,7 @@ import org.apache.nifi.components.state.StateMap; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.Counter; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; @@ -128,12 +154,19 @@ import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; +import org.apache.nifi.web.api.entity.FlowEntity; import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.LabelEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; +import org.apache.nifi.web.api.entity.ReportingTaskEntity; +import org.apache.nifi.web.api.entity.SnippetEntity; import org.apache.nifi.web.controller.ControllerFacade; import org.apache.nifi.web.dao.ConnectionDAO; import org.apache.nifi.web.dao.ControllerServiceDAO; @@ -146,41 +179,32 @@ import org.apache.nifi.web.dao.RemoteProcessGroupDAO; import org.apache.nifi.web.dao.ReportingTaskDAO; import org.apache.nifi.web.dao.SnippetDAO; import org.apache.nifi.web.dao.TemplateDAO; +import org.apache.nifi.web.revision.DeleteRevisionTask; +import org.apache.nifi.web.revision.ExpiredRevisionClaimException; +import org.apache.nifi.web.revision.ReadOnlyRevisionCallback; +import org.apache.nifi.web.revision.RevisionClaim; +import org.apache.nifi.web.revision.RevisionManager; +import org.apache.nifi.web.revision.RevisionUpdate; +import org.apache.nifi.web.revision.StandardRevisionClaim; +import org.apache.nifi.web.revision.StandardRevisionUpdate; +import org.apache.nifi.web.revision.UpdateRevisionTask; import org.apache.nifi.web.util.SnippetUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.WebApplicationException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; /** * Implementation of NiFiServiceFacade that performs revision checking. */ public class StandardNiFiServiceFacade implements NiFiServiceFacade { - - private static final Logger logger = LoggerFactory.getLogger(StandardNiFiServiceFacade.class); + private static final Logger logger = Logger.getLogger(StandardNiFiServiceFacade.class); // nifi core components private ControllerFacade controllerFacade; private SnippetUtils snippetUtils; // optimistic locking manager - private OptimisticLockingManager optimisticLockingManager; +// private OptimisticLockingManager optimisticLockingManager; + + // revision manager + private RevisionManager revisionManager; // data access objects private ProcessorDAO processorDAO; @@ -211,6 +235,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private Authorizer authorizer; // ----------------------------------------- + // Synchronization methods + // ----------------------------------------- + @Override + public void claimRevision(Revision revision) { + revisionManager.requestClaim(revision); + } + + // ----------------------------------------- // Verification Operations // ----------------------------------------- @@ -369,14 +401,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final Connection connectionNode = connectionDAO.getConnection(connectionDTO.getId()); - final ConfigurationSnapshot<ConnectionDTO> snapshot = updateComponent( + + final RevisionUpdate<ConnectionDTO> snapshot = updateComponent( revision, connectionNode, () -> connectionDAO.updateConnection(connectionDTO), connection -> dtoFactory.createConnectionDto(connection)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(connectionNode); - return new UpdateResult<>(entityFactory.createConnectionEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy), false); + return new UpdateResult<>(entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); } @Override @@ -388,13 +421,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // get the component, ensure we have access to it, and perform the update request final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); - final ConfigurationSnapshot<ProcessorDTO> snapshot = updateComponent(revision, + final RevisionUpdate<ProcessorDTO> snapshot = updateComponent(revision, processorNode, () -> processorDAO.updateProcessor(processorDTO), proc -> dtoFactory.createProcessorDto(proc)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processorNode); - return new UpdateResult<>(entityFactory.createProcessorEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy), false); + return new UpdateResult<>(entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); } @Override @@ -405,13 +438,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final Label labelNode = labelDAO.getLabel(labelDTO.getId()); - final ConfigurationSnapshot<LabelDTO> snapshot = updateComponent(revision, + final RevisionUpdate<LabelDTO> snapshot = updateComponent(revision, labelNode, () -> labelDAO.updateLabel(labelDTO), label -> dtoFactory.createLabelDto(label)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(labelNode); - return new UpdateResult<>(entityFactory.createLabelEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy), false); + return new UpdateResult<>(entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); } @Override @@ -422,13 +455,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final Funnel funnelNode = funnelDAO.getFunnel(funnelDTO.getId()); - final ConfigurationSnapshot<FunnelDTO> snapshot = updateComponent(revision, + final RevisionUpdate<FunnelDTO> snapshot = updateComponent(revision, funnelNode, () -> funnelDAO.updateFunnel(funnelDTO), funnel -> dtoFactory.createFunnelDto(funnel)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(funnelNode); - return new UpdateResult<>(entityFactory.createFunnelEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy), false); + return new UpdateResult<>(entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); } @@ -444,37 +477,38 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { * @param <C> the Component Type of the updated component * * @return A ConfigurationSnapshot that represents the new configuration + * @throws ExpiredRevisionClaimException */ - private <D, C> ConfigurationSnapshot<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> - dtoCreation) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<D>() { - @Override - public ConfigurationResult<D> execute() { - // ensure write access to the flow - authorizable.authorize(authorizer, RequestAction.WRITE); - - // also ensure read access to the flow as the component must be read in order to generate a response - authorizable.authorize(authorizer, RequestAction.READ); + private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, D> dtoCreation) { + final String modifier = NiFiUserUtils.getNiFiUserName(); + try { + final RevisionUpdate<D> updatedComponent = revisionManager.updateRevision(new StandardRevisionClaim(revision), modifier, new UpdateRevisionTask<D>() { + @Override + public RevisionUpdate<D> update() { + // ensure write access to the flow + authorizable.authorize(authorizer, RequestAction.WRITE); + + // also ensure read access to the flow as the component must be read in order to generate a response + authorizable.authorize(authorizer, RequestAction.READ); + + // get the updated component + final C component = daoUpdate.get(); + + // save updated controller + controllerFacade.save(); - // get the updated component - final C component = daoUpdate.get(); + final Revision updatedRevision = incrementRevision(revision); + final D dto = dtoCreation.apply(component); - // save updated controller - controllerFacade.save(); - - return new ConfigurationResult<D>() { - @Override - public boolean isNew() { - return false; - } + final FlowModification lastModification = new FlowModification(updatedRevision, modifier); + return new StandardRevisionUpdate<>(dto, lastModification); + } + }); - @Override - public D getConfiguration() { - return dtoCreation.apply(component); - } - }; - } - }); + return updatedComponent; + } catch (final ExpiredRevisionClaimException erce) { + throw new InvalidRevisionException("Failed to update component " + authorizable, erce); + } } @@ -487,6 +521,69 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } } + private Set<Revision> getRevisionsForGroup(final String groupId) { + final Set<Revision> revisions = new HashSet<>(); + + revisions.add(revisionManager.getRevision(groupId)); + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); + if (processGroup == null) { + throw new IllegalArgumentException("Snippet contains a reference to Process Group with ID " + groupId + " but no Process Group exists with that ID"); + } + + processGroup.getConnections().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); + processGroup.getFunnels().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); + processGroup.getInputPorts().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); + processGroup.getOutputPorts().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); + processGroup.getLabels().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); + processGroup.getProcessors().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); + processGroup.getRemoteProcessGroups().stream().map(c -> c.getIdentifier()).map(id -> revisionManager.getRevision(id)).forEach(rev -> revisions.add(rev)); + processGroup.getProcessGroups().stream().map(c -> c.getIdentifier()).forEach(id -> revisions.addAll(getRevisionsForGroup(id))); + + return revisions; + } + + private Set<Revision> getRevisionsForSnippet(final SnippetDTO snippetDto) { + final Set<Revision> requiredRevisions = new HashSet<>(); + requiredRevisions.add(revisionManager.getRevision(snippetDto.getId())); + snippetDto.getConnections().entrySet().stream() + .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) + .forEach(rev -> requiredRevisions.add(rev)); + + snippetDto.getFunnels().entrySet().stream() + .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) + .forEach(rev -> requiredRevisions.add(rev)); + + snippetDto.getInputPorts().entrySet().stream() + .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) + .forEach(rev -> requiredRevisions.add(rev)); + + snippetDto.getOutputPorts().entrySet().stream() + .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) + .forEach(rev -> requiredRevisions.add(rev)); + + snippetDto.getLabels().entrySet().stream() + .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) + .forEach(rev -> requiredRevisions.add(rev)); + + snippetDto.getProcessors().entrySet().stream() + .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) + .forEach(rev -> requiredRevisions.add(rev)); + + snippetDto.getRemoteProcessGroups().entrySet().stream() + .map(entry -> new Revision(entry.getValue().getVersion(), entry.getValue().getClientId(), entry.getKey())) + .forEach(rev -> requiredRevisions.add(rev)); + + for (final String groupId : snippetDto.getProcessGroups().keySet()) { + requiredRevisions.addAll(getRevisionsForGroup(groupId)); + } + + return requiredRevisions; + } + + private ProcessGroup getGroup(final String groupId) { + return revisionManager.get(groupId, rev -> processGroupDAO.getProcessGroup(groupId)); + } + @Override public ConfigurationSnapshot<SnippetDTO> updateSnippet(final Revision revision, final SnippetDTO snippetDto) { // if label does not exist, then create new label @@ -494,14 +591,74 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return createSnippet(revision, snippetDto); } - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(snippetDto.getParentGroupId()); - return updateComponent( - revision, - processGroup, - () -> snippetDAO.updateSnippet(snippetDto), - snippet -> { - return dtoFactory.createSnippetDto(snippet); + final Set<Revision> requiredRevisions = getRevisionsForSnippet(snippetDto); + final ProcessGroup processGroup = getGroup(snippetDto.getParentGroupId()); + + final String modifier = NiFiUserUtils.getNiFiUserName(); + final RevisionClaim revisionClaim = new StandardRevisionClaim(requiredRevisions); + + RevisionUpdate<SnippetDTO> versionedSnippet; + try { + versionedSnippet = revisionManager.updateRevision(revisionClaim, modifier, new UpdateRevisionTask<SnippetDTO>() { + @Override + public RevisionUpdate<SnippetDTO> update() { + // ensure write access to the flow + processGroup.authorize(authorizer, RequestAction.WRITE); + + // also ensure read access to the flow as the component must be read in order to generate a response + processGroup.authorize(authorizer, RequestAction.READ); + + // get the updated component + final Snippet snippet = snippetDAO.updateSnippet(snippetDto); + + // save updated controller + controllerFacade.save(); + + final SnippetDTO snippetDto = dtoFactory.createSnippetDto(snippet); + + // Update each of the revisions that were required and + // build new SnippetDTO that contains all of the updated revisions + final SnippetDTO updatedSnippet = new SnippetDTO(); + updatedSnippet.setId(snippetDto.getId()); + updatedSnippet.setParentGroupId(snippetDto.getParentGroupId()); + updatedSnippet.setUri(snippetDto.getUri()); + updatedSnippet.setLinked(snippetDto.isLinked()); + + updatedSnippet.setConnections(updateRevisions(snippetDto.getConnections(), modifier)); + updatedSnippet.setFunnels(updateRevisions(snippetDto.getFunnels(), modifier)); + updatedSnippet.setInputPorts(updateRevisions(snippetDto.getInputPorts(), modifier)); + updatedSnippet.setLabels(updateRevisions(snippetDto.getLabels(), modifier)); + updatedSnippet.setOutputPorts(updateRevisions(snippetDto.getOutputPorts(), modifier)); + updatedSnippet.setProcessGroups(updateRevisions(snippetDto.getProcessGroups(), modifier)); + updatedSnippet.setProcessors(updateRevisions(snippetDto.getProcessors(), modifier)); + updatedSnippet.setRemoteProcessGroups(updateRevisions(snippetDto.getRemoteProcessGroups(), modifier)); + + final Revision updatedSnippetRevision = incrementRevision(revision); + final FlowModification lastModification = new FlowModification(updatedSnippetRevision, modifier); + return new StandardRevisionUpdate<>(snippetDto, lastModification); + } }); + } catch (ExpiredRevisionClaimException e) { + throw new InvalidRevisionException("Failed to update Snippet", e); + } + + return new ConfigurationSnapshot<SnippetDTO>(versionedSnippet.getLastModification().getRevision().getVersion(), versionedSnippet.getComponent()); + } + + private Map<String, RevisionDTO> updateRevisions(final Map<String, RevisionDTO> originalDtos, final String modifier) { + final Map<String, RevisionDTO> updatedComponents = new HashMap<>(originalDtos.size()); + for (final Map.Entry<String, RevisionDTO> entry : originalDtos.entrySet()) { + final String id = entry.getKey(); + final RevisionDTO revisionDto = entry.getValue(); + + final RevisionDTO updatedDto = new RevisionDTO(); + updatedDto.setClientId(revisionDto.getClientId()); + updatedDto.setLastModifier(modifier); + updatedDto.setVersion(revisionDto.getVersion() + 1); + updatedComponents.put(id, updatedDto); + } + + return updatedComponents; } @Override @@ -512,13 +669,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final Port inputPortNode = inputPortDAO.getPort(inputPortDTO.getId()); - final ConfigurationSnapshot<PortDTO> snapshot = updateComponent(revision, + final RevisionUpdate<PortDTO> snapshot = updateComponent(revision, inputPortNode, () -> inputPortDAO.updatePort(inputPortDTO), port -> dtoFactory.createPortDto(port)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(inputPortNode); - return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy), false); + return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); } @Override @@ -529,13 +686,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final Port outputPortNode = outputPortDAO.getPort(outputPortDTO.getId()); - final ConfigurationSnapshot<PortDTO> snapshot = updateComponent(revision, + final RevisionUpdate<PortDTO> snapshot = updateComponent(revision, outputPortNode, () -> outputPortDAO.updatePort(outputPortDTO), port -> dtoFactory.createPortDto(port)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(outputPortNode); - return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy), false); + return new UpdateResult<>(entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy), false); } @Override @@ -546,39 +703,47 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); - final ConfigurationSnapshot<RemoteProcessGroupDTO> snapshot = updateComponent( + final RevisionUpdate<RemoteProcessGroupDTO> snapshot = updateComponent( revision, remoteProcessGroupNode, () -> remoteProcessGroupDAO.updateRemoteProcessGroup(remoteProcessGroupDTO), remoteProcessGroup -> dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(remoteProcessGroupNode); - final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()); - return new UpdateResult<>(entityFactory.createRemoteProcessGroupEntity(snapshot.getConfiguration(), updateRevision, accessPolicy), false); + final RevisionDTO updateRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + return new UpdateResult<>(entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), updateRevision, accessPolicy), false); } @Override - public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupInputPort( + public RemoteProcessGroupPortEntity updateRemoteProcessGroupInputPort( final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId()); - return updateComponent( + final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent( revision, remoteProcessGroupNode, () -> remoteProcessGroupDAO.updateRemoteProcessGroupInputPort(remoteProcessGroupId, remoteProcessGroupPortDTO), remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); + + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(remoteProcessGroupNode); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, accessPolicy); } @Override - public ConfigurationSnapshot<RemoteProcessGroupPortDTO> updateRemoteProcessGroupOutputPort( + public RemoteProcessGroupPortEntity updateRemoteProcessGroupOutputPort( final Revision revision, final String remoteProcessGroupId, final RemoteProcessGroupPortDTO remoteProcessGroupPortDTO) { final RemoteProcessGroup remoteProcessGroupNode = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupPortDTO.getGroupId()); - return updateComponent( + final RevisionUpdate<RemoteProcessGroupPortDTO> snapshot = updateComponent( revision, remoteProcessGroupNode, () -> remoteProcessGroupDAO.updateRemoteProcessGroupOutputPort(remoteProcessGroupId, remoteProcessGroupPortDTO), remoteGroupPort -> dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort)); + + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(remoteProcessGroupNode); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + return entityFactory.createRemoteProcessGroupPortEntity(snapshot.getComponent(), updatedRevision, accessPolicy); } @Override @@ -593,40 +758,43 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } final ProcessGroup processGroupNode = processGroupDAO.getProcessGroup(processGroupDTO.getId()); - final ConfigurationSnapshot<ProcessGroupDTO> snapshot = updateComponent(revision, + final RevisionUpdate<ProcessGroupDTO> snapshot = updateComponent(revision, processGroupNode, () -> processGroupDAO.updateProcessGroup(processGroupDTO), processGroup -> dtoFactory.createProcessGroupDto(processGroup)); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processGroupNode); - final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()); - return new UpdateResult<>(entityFactory.createProcessGroupEntity(snapshot.getConfiguration(), updatedRevision, accessPolicy), false); + final RevisionDTO updatedRevision = dtoFactory.createRevisionDTO(snapshot.getLastModification()); + return new UpdateResult<>(entityFactory.createProcessGroupEntity(snapshot.getComponent(), updatedRevision, accessPolicy), false); } @Override public ConfigurationSnapshot<ControllerConfigurationDTO> updateControllerConfiguration(final Revision revision, final ControllerConfigurationDTO controllerConfigurationDTO) { + final Supplier<ControllerConfigurationDTO> daoUpdate = () -> { + // update the controller configuration through the proxy + if (controllerConfigurationDTO.getName() != null) { + controllerFacade.setName(controllerConfigurationDTO.getName()); + } + if (controllerConfigurationDTO.getComments() != null) { + controllerFacade.setComments(controllerConfigurationDTO.getComments()); + } + if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) { + controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount()); + } + if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) { + controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount()); + } - return updateComponent( + return controllerConfigurationDTO; + }; + + final RevisionUpdate<ControllerConfigurationDTO> updatedComponent = updateComponent( revision, controllerFacade, - () -> { - // update the controller configuration through the proxy - if (controllerConfigurationDTO.getName() != null) { - controllerFacade.setName(controllerConfigurationDTO.getName()); - } - if (controllerConfigurationDTO.getComments() != null) { - controllerFacade.setComments(controllerConfigurationDTO.getComments()); - } - if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) { - controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount()); - } - if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) { - controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount()); - } - - return controllerConfigurationDTO; - }, + daoUpdate, controller -> getControllerConfiguration()); + + return new ConfigurationSnapshot<>(updatedComponent.getLastModification().getRevision().getVersion()); } @Override @@ -673,15 +841,24 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } private ConfigurationSnapshot<Void> clearComponentState(final Revision revision, final Runnable clearState) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() { - @Override - public ConfigurationResult<Void> execute() { - // clear the state for the specified component - clearState.run(); + final RevisionClaim claim = new StandardRevisionClaim(revision); + final String modifier = NiFiUserUtils.getNiFiUserName(); + try { + final RevisionUpdate<Void> component = revisionManager.updateRevision(claim, modifier, new UpdateRevisionTask<Void>() { + @Override + public RevisionUpdate<Void> update() { + // clear the state for the specified component + clearState.run(); + + final FlowModification lastMod = new FlowModification(incrementRevision(revision), modifier); + return new StandardRevisionUpdate<Void>(null, lastMod); + } + }); - return new StandardConfigurationResult<Void>(false, null); - } - }); + return new ConfigurationSnapshot<>(component.getLastModification().getRevision().getVersion(), component.getComponent()); + } catch (ExpiredRevisionClaimException e) { + throw new InvalidRevisionException("Unable to clear component state", e); + } } @Override @@ -707,13 +884,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConnectionEntity deleteConnection(final Revision revision, final String connectionId) { final Connection connection = connectionDAO.getConnection(connectionId); - final ConfigurationSnapshot<Void> snapshot = deleteComponent( + final ConnectionDTO snapshot = deleteComponent( revision, connection, - () -> connectionDAO.deleteConnection(connectionId)); + () -> connectionDAO.deleteConnection(connectionId), + dtoFactory.createConnectionDto(connection)); - final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(connection); - return entityFactory.createConnectionEntity(null, dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy); + return entityFactory.createConnectionEntity(snapshot, null, null); } @Override @@ -745,65 +922,60 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ProcessorEntity deleteProcessor(final Revision revision, final String processorId) { final ProcessorNode processor = processorDAO.getProcessor(processorId); - final ConfigurationSnapshot<Void> snapshot = deleteComponent( + final ProcessorDTO snapshot = deleteComponent( revision, processor, - () -> processorDAO.deleteProcessor(processorId)); + () -> processorDAO.deleteProcessor(processorId), + dtoFactory.createProcessorDto(processor)); - return entityFactory.createProcessorEntity(null, dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), null); + return entityFactory.createProcessorEntity(snapshot, null, null); } @Override public LabelEntity deleteLabel(final Revision revision, final String labelId) { final Label label = labelDAO.getLabel(labelId); - final ConfigurationSnapshot<Void> snapshot = deleteComponent( + final LabelDTO snapshot = deleteComponent( revision, label, - () -> labelDAO.deleteLabel(labelId)); + () -> labelDAO.deleteLabel(labelId), + dtoFactory.createLabelDto(label)); - return entityFactory.createLabelEntity(null, dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), null); + return entityFactory.createLabelEntity(snapshot, null, null); } @Override public FunnelEntity deleteFunnel(final Revision revision, final String funnelId) { final Funnel funnel = funnelDAO.getFunnel(funnelId); - final ConfigurationSnapshot<Void> snapshot = deleteComponent( + final FunnelDTO snapshot = deleteComponent( revision, funnel, - () -> funnelDAO.deleteFunnel(funnelId)); + () -> funnelDAO.deleteFunnel(funnelId), + dtoFactory.createFunnelDto(funnel)); - return entityFactory.createFunnelEntity(null, dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), null); + return entityFactory.createFunnelEntity(snapshot, null, null); } /** * Deletes a component using the Optimistic Locking Manager * * @param revision the current revision - * @param action the action that deletes the component via the appropriate DAO object + * @param deleteAction the action that deletes the component via the appropriate DAO object * @return a ConfigurationSnapshot that represents the new configuration */ - private ConfigurationSnapshot<Void> deleteComponent(final Revision revision, final Authorizable authorizable, final Runnable action) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() { + private <D, C> D deleteComponent(final Revision revision, final Authorizable authorizable, final Runnable deleteAction, final D dto) { + final RevisionClaim claim = new StandardRevisionClaim(revision); + return revisionManager.deleteRevision(claim, new DeleteRevisionTask<D>() { @Override - public ConfigurationResult<Void> execute() { + public D performTask() { // ensure access to the component authorizable.authorize(authorizer, RequestAction.WRITE); - action.run(); + deleteAction.run(); // save the flow controllerFacade.save(); - return new ConfigurationResult<Void>() { - @Override - public boolean isNew() { - return false; - } - @Override - public Void getConfiguration() { - return null; - } - }; + return dto; } }); } @@ -814,56 +986,64 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ConfigurationSnapshot<Void> deleteSnippet(final Revision revision, final String snippetId) { + public SnippetEntity deleteSnippet(final Revision revision, final String snippetId) { final Snippet snippet = snippetDAO.getSnippet(snippetId); - final ProcessGroup processGroup = processGroupDAO.getProcessGroup(snippet.getParentGroupId()); - return deleteComponent(revision, + final ProcessGroup processGroup = getGroup(snippet.getParentGroupId()); + + final SnippetDTO snapshot = deleteComponent(revision, processGroup, - () -> snippetDAO.deleteSnippet(snippetId)); + () -> snippetDAO.deleteSnippet(snippetId), + dtoFactory.createSnippetDto(snippet)); + + return entityFactory.createSnippetEntity(snapshot, null, null); } @Override public PortEntity deleteInputPort(final Revision revision, final String inputPortId) { final Port port = inputPortDAO.getPort(inputPortId); - final ConfigurationSnapshot<Void> snapshot = deleteComponent( + final PortDTO snapshot = deleteComponent( revision, port, - () -> inputPortDAO.deletePort(inputPortId)); + () -> inputPortDAO.deletePort(inputPortId), + dtoFactory.createPortDto(port)); - return entityFactory.createPortEntity(null, dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), null); + return entityFactory.createPortEntity(snapshot, null, null); } @Override public PortEntity deleteOutputPort(final Revision revision, final String outputPortId) { final Port port = outputPortDAO.getPort(outputPortId); - final ConfigurationSnapshot<Void> snapshot = deleteComponent( + final PortDTO snapshot = deleteComponent( revision, port, - () -> outputPortDAO.deletePort(outputPortId)); + () -> outputPortDAO.deletePort(outputPortId), + dtoFactory.createPortDto(port)); - return entityFactory.createPortEntity(null, dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), null); + return entityFactory.createPortEntity(snapshot, null, null); } @Override public ProcessGroupEntity deleteProcessGroup(final Revision revision, final String groupId) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); - final ConfigurationSnapshot<Void> snapshot = deleteComponent( + final ProcessGroupDTO snapshot = deleteComponent( revision, processGroup, - () -> processGroupDAO.deleteProcessGroup(groupId)); + () -> processGroupDAO.deleteProcessGroup(groupId), + dtoFactory.createProcessGroupDto(processGroup)); - return entityFactory.createProcessGroupEntity(null, dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), null); + return entityFactory.createProcessGroupEntity(snapshot, null, null); } @Override public RemoteProcessGroupEntity deleteRemoteProcessGroup(final Revision revision, final String remoteProcessGroupId) { final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupId); - final ConfigurationSnapshot<Void> snapshot = deleteComponent( + final RemoteProcessGroupDTO snapshot = deleteComponent( revision, remoteProcessGroup, - () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId)); + () -> remoteProcessGroupDAO.deleteRemoteProcessGroup(remoteProcessGroupId), + dtoFactory.createRemoteProcessGroupDto(remoteProcessGroup)); - return entityFactory.createRemoteProcessGroupEntity(null, dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), null); + return entityFactory.createRemoteProcessGroupEntity(snapshot, null, null); } @Override @@ -874,7 +1054,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ConnectionEntity createConnection(final Revision revision, final String groupId, final ConnectionDTO connectionDTO) { - final ConfigurationSnapshot<ConnectionDTO> snapshot = createComponent( + final RevisionUpdate<ConnectionDTO> snapshot = createComponent( revision, connectionDTO, () -> connectionDAO.createConnection(groupId, connectionDTO), @@ -882,7 +1062,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Connection connection = connectionDAO.getConnection(connectionDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(connection); - return entityFactory.createConnectionEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy); + return entityFactory.createConnectionEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); } @Override @@ -913,7 +1093,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ProcessorEntity createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) { - final ConfigurationSnapshot<ProcessorDTO> snapshot = createComponent( + final RevisionUpdate<ProcessorDTO> snapshot = createComponent( revision, processorDTO, () -> processorDAO.createProcessor(groupId, processorDTO), @@ -921,12 +1101,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final ProcessorNode processor = processorDAO.getProcessor(processorDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processor); - return entityFactory.createProcessorEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy); + return entityFactory.createProcessorEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); } @Override public LabelEntity createLabel(final Revision revision, final String groupId, final LabelDTO labelDTO) { - final ConfigurationSnapshot<LabelDTO> snapshot = createComponent( + final RevisionUpdate<LabelDTO> snapshot = createComponent( revision, labelDTO, () -> labelDAO.createLabel(groupId, labelDTO), @@ -934,7 +1114,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Label label = labelDAO.getLabel(labelDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(label); - return entityFactory.createLabelEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy); + return entityFactory.createLabelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); } /** @@ -948,21 +1128,24 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { * @param <D> the DTO Type * @param <C> the NiFi Component Type * - * @return a ConfigurationSnapshot that represents the updated configuration + * @return a RevisionUpdate that represents the updated configuration */ - private <D, C> ConfigurationSnapshot<D> createComponent(final Revision revision, final ComponentDTO componentDto, + private <D, C> RevisionUpdate<D> createComponent(final Revision revision, final ComponentDTO componentDto, final Supplier<C> daoCreation, final Function<C, D> dtoCreation) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<D>() { - @Override - public ConfigurationResult<D> execute() { - // ensure id is set - if (StringUtils.isBlank(componentDto.getId())) { - componentDto.setId(UUID.randomUUID().toString()); - } + final String modifier = NiFiUserUtils.getNiFiUserName(); + // ensure id is set + if (StringUtils.isBlank(componentDto.getId())) { + componentDto.setId(UUID.randomUUID().toString()); + } + + final String groupId = componentDto.getParentGroupId(); + return revisionManager.get(groupId, new ReadOnlyRevisionCallback<RevisionUpdate<D>>() { + @Override + public RevisionUpdate<D> withRevision(final Revision revision) { // ensure access to process group - final ProcessGroup parent = processGroupDAO.getProcessGroup(componentDto.getParentGroupId()); + final ProcessGroup parent = processGroupDAO.getProcessGroup(groupId); parent.authorize(authorizer, RequestAction.WRITE); // add the component @@ -971,17 +1154,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult<D>() { - @Override - public boolean isNew() { - return true; - } - - @Override - public D getConfiguration() { - return dtoCreation.apply(component); - } - }; + final D dto = dtoCreation.apply(component); + final FlowModification lastMod = new FlowModification(new Revision(0L, revision.getClientId(), componentDto.getId()), modifier); + return new StandardRevisionUpdate<D>(dto, lastMod); } }); } @@ -990,7 +1165,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public FunnelEntity createFunnel(final Revision revision, final String groupId, final FunnelDTO funnelDTO) { - final ConfigurationSnapshot<FunnelDTO> snapshot = createComponent( + final RevisionUpdate<FunnelDTO> snapshot = createComponent( revision, funnelDTO, () -> funnelDAO.createFunnel(groupId, funnelDTO), @@ -998,14 +1173,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Funnel funnel = funnelDAO.getFunnel(funnelDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(funnel); - return entityFactory.createFunnelEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy); + return entityFactory.createFunnelEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); } private void validateSnippetContents(final FlowSnippetDTO flow) { // validate any processors if (flow.getProcessors() != null) { for (final ProcessorDTO processorDTO : flow.getProcessors()) { - final ProcessorNode processorNode = processorDAO.getProcessor(processorDTO.getId()); + final ProcessorNode processorNode = revisionManager.get(processorDTO.getId(), rev -> processorDAO.getProcessor(processorDTO.getId())); final Collection<ValidationResult> validationErrors = processorNode.getValidationErrors(); if (validationErrors != null && !validationErrors.isEmpty()) { final List<String> errors = new ArrayList<>(); @@ -1019,7 +1194,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { if (flow.getInputPorts() != null) { for (final PortDTO portDTO : flow.getInputPorts()) { - final Port port = inputPortDAO.getPort(portDTO.getId()); + final Port port = revisionManager.get(portDTO.getId(), rev -> inputPortDAO.getPort(portDTO.getId())); final Collection<ValidationResult> validationErrors = port.getValidationErrors(); if (validationErrors != null && !validationErrors.isEmpty()) { final List<String> errors = new ArrayList<>(); @@ -1033,7 +1208,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { if (flow.getOutputPorts() != null) { for (final PortDTO portDTO : flow.getOutputPorts()) { - final Port port = outputPortDAO.getPort(portDTO.getId()); + final Port port = revisionManager.get(portDTO.getId(), rev -> outputPortDAO.getPort(portDTO.getId())); final Collection<ValidationResult> validationErrors = port.getValidationErrors(); if (validationErrors != null && !validationErrors.isEmpty()) { final List<String> errors = new ArrayList<>(); @@ -1048,7 +1223,9 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // get any remote process group issues if (flow.getRemoteProcessGroups() != null) { for (final RemoteProcessGroupDTO remoteProcessGroupDTO : flow.getRemoteProcessGroups()) { - final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); + final RemoteProcessGroup remoteProcessGroup = revisionManager.get( + remoteProcessGroupDTO.getId(), rev -> remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId())); + if (remoteProcessGroup.getAuthorizationIssue() != null) { remoteProcessGroupDTO.setAuthorizationIssues(Arrays.asList(remoteProcessGroup.getAuthorizationIssue())); } @@ -1057,10 +1234,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ConfigurationSnapshot<FlowDTO> copySnippet(final Revision revision, final String groupId, final String snippetId, final Double originX, final Double originY) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowDTO>() { + public FlowEntity copySnippet(final Revision revision, final String groupId, final String snippetId, final Double originX, final Double originY) { + final FlowDTO flowDto = revisionManager.get(groupId, new ReadOnlyRevisionCallback<FlowDTO>() { @Override - public ConfigurationResult<FlowDTO> execute() { + public FlowDTO withRevision(final Revision revision) { String id = snippetId; // create the new snippet @@ -1072,54 +1249,49 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult<FlowDTO>() { - @Override - public boolean isNew() { - return false; - } - - @Override - public FlowDTO getConfiguration() { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - return dtoFactory.createFlowDto(group, snippet); - } - }; + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + return dtoFactory.createFlowDto(group, snippet); } }); + + final FlowEntity flowEntity = new FlowEntity(); + flowEntity.setFlow(flowDto); + return flowEntity; + } + + private <T> ConfigurationSnapshot<T> createConfigSnapshot(final RevisionUpdate<T> update) { + return new ConfigurationSnapshot<>(update.getLastModification().getRevision().getVersion(), update.getComponent()); } @Override public ConfigurationSnapshot<SnippetDTO> createSnippet(final Revision revision, final SnippetDTO snippetDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<SnippetDTO>() { - @Override - public ConfigurationResult<SnippetDTO> execute() { - // ensure id is set - if (StringUtils.isBlank(snippetDTO.getId())) { - snippetDTO.setId(UUID.randomUUID().toString()); - } - - // add the snippet - final Snippet snippet = snippetDAO.createSnippet(snippetDTO); - final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet); - - return new ConfigurationResult<SnippetDTO>() { - @Override - public boolean isNew() { - return true; + final RevisionClaim claim = new StandardRevisionClaim(revision); + final String modifier = NiFiUserUtils.getNiFiUserName(); + try { + final RevisionUpdate<SnippetDTO> update = revisionManager.updateRevision(claim, modifier, new UpdateRevisionTask<SnippetDTO>() { + @Override + public RevisionUpdate<SnippetDTO> update() { + // ensure id is set + if (StringUtils.isBlank(snippetDTO.getId())) { + snippetDTO.setId(UUID.randomUUID().toString()); } - @Override - public SnippetDTO getConfiguration() { - return responseSnippetDTO; - } - }; - } - }); + // add the snippet + final Snippet snippet = snippetDAO.createSnippet(snippetDTO); + final SnippetDTO responseSnippetDTO = dtoFactory.createSnippetDto(snippet); + return new StandardRevisionUpdate<>(responseSnippetDTO, new FlowModification(incrementRevision(revision), modifier)); + } + }); + + return createConfigSnapshot(update); + } catch (ExpiredRevisionClaimException e) { + throw new InvalidRevisionException("Could not create Snippet", e); + } } @Override public PortEntity createInputPort(final Revision revision, final String groupId, final PortDTO inputPortDTO) { - final ConfigurationSnapshot<PortDTO> snapshot = createComponent( + final RevisionUpdate<PortDTO> snapshot = createComponent( revision, inputPortDTO, () -> inputPortDAO.createPort(groupId, inputPortDTO), @@ -1127,12 +1299,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Port port = inputPortDAO.getPort(inputPortDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(port); - return entityFactory.createPortEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy); + return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); } @Override public PortEntity createOutputPort(final Revision revision, final String groupId, final PortDTO outputPortDTO) { - final ConfigurationSnapshot<PortDTO> snapshot = createComponent( + final RevisionUpdate<PortDTO> snapshot = createComponent( revision, outputPortDTO, () -> outputPortDAO.createPort(groupId, outputPortDTO), @@ -1140,23 +1312,23 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Port port = outputPortDAO.getPort(outputPortDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(port); - return entityFactory.createPortEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy); + return entityFactory.createPortEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); } @Override public ProcessGroupEntity createProcessGroup(final String parentGroupId, final Revision revision, final ProcessGroupDTO processGroupDTO) { - final ConfigurationSnapshot<ProcessGroupDTO> snapshot = createComponent(revision, processGroupDTO, + final RevisionUpdate<ProcessGroupDTO> snapshot = createComponent(revision, processGroupDTO, () -> processGroupDAO.createProcessGroup(parentGroupId, processGroupDTO), processGroup -> dtoFactory.createProcessGroupDto(processGroup)); final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processGroup); - return entityFactory.createProcessGroupEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy); + return entityFactory.createProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); } @Override public RemoteProcessGroupEntity createRemoteProcessGroup(final Revision revision, final String groupId, final RemoteProcessGroupDTO remoteProcessGroupDTO) { - final ConfigurationSnapshot<RemoteProcessGroupDTO> snapshot = createComponent( + final RevisionUpdate<RemoteProcessGroupDTO> snapshot = createComponent( revision, remoteProcessGroupDTO, () -> remoteProcessGroupDAO.createRemoteProcessGroup(groupId, remoteProcessGroupDTO), @@ -1164,7 +1336,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final RemoteProcessGroup remoteProcessGroup = remoteProcessGroupDAO.getRemoteProcessGroup(remoteProcessGroupDTO.getId()); final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(remoteProcessGroup); - return entityFactory.createRemoteProcessGroupEntity(snapshot.getConfiguration(), dtoFactory.createRevisionDTO(snapshot.getVersion(), revision.getClientId()), accessPolicy); + return entityFactory.createRemoteProcessGroupEntity(snapshot.getComponent(), dtoFactory.createRevisionDTO(snapshot.getLastModification()), accessPolicy); } @Override @@ -1210,10 +1382,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public ConfigurationSnapshot<FlowDTO> createTemplateInstance(final Revision revision, final String groupId, final Double originX, final Double originY, final String templateId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<FlowDTO>() { + public FlowEntity createTemplateInstance(final Revision revision, final String groupId, final Double originX, final Double originY, final String templateId) { + final FlowDTO flowDto = revisionManager.get(groupId, new ReadOnlyRevisionCallback<FlowDTO>() { @Override - public ConfigurationResult<FlowDTO> execute() { + public FlowDTO withRevision(final Revision revision) { // instantiate the template - there is no need to make another copy of the flow snippet since the actual template // was copied and this dto is only used to instantiate it's components (which as already completed) final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId); @@ -1224,40 +1396,34 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // save the flow controllerFacade.save(); - return new ConfigurationResult<FlowDTO>() { - @Override - public boolean isNew() { - return false; - } - - @Override - public FlowDTO getConfiguration() { - final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); - return dtoFactory.createFlowDto(group, snippet); - } - }; + final ProcessGroup group = processGroupDAO.getProcessGroup(groupId); + return dtoFactory.createFlowDto(group, snippet); } }); + + final FlowEntity flowEntity = new FlowEntity(); + flowEntity.setFlow(flowDto); + return flowEntity; } @Override - public ConfigurationSnapshot<Void> createArchive(final Revision revision) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() { - @Override - public ConfigurationResult<Void> execute() { - // create the archive - controllerFacade.createArchive(); + public ProcessGroupEntity createArchive(final Revision revision) { + try { + controllerFacade.createArchive(); + } catch (IOException e) { + logger.error("Failed to create an archive", e); + } - return new StandardConfigurationResult<Void>(false, null); - } - }); + return getProcessGroup("root"); } @Override - public ConfigurationSnapshot<ProcessorDTO> setProcessorAnnotationData(final Revision revision, final String processorId, final String annotationData) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() { + public ProcessorEntity setProcessorAnnotationData(final Revision revision, final String processorId, final String annotationData) { + final String modifier = NiFiUserUtils.getNiFiUserName(); + + final RevisionUpdate<ProcessorEntity> update = revisionManager.updateRevision(new StandardRevisionClaim(revision), modifier, new UpdateRevisionTask<ProcessorEntity>() { @Override - public ConfigurationResult<ProcessorDTO> execute() { + public RevisionUpdate<ProcessorEntity> update() { // create the processor config final ProcessorConfigDTO config = new ProcessorConfigDTO(); config.setAnnotationData(annotationData); @@ -1270,19 +1436,29 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // update the processor configuration final ProcessorNode processor = processorDAO.updateProcessor(processorDTO); + final ProcessorDTO updatedProcDto = dtoFactory.createProcessorDto(processor); + // save the flow controllerFacade.save(); - return new StandardConfigurationResult<>(false, dtoFactory.createProcessorDto(processor)); + final AccessPolicyDTO accessPolicy = dtoFactory.createAccessPolicyDto(processor); + final FlowModification lastMod = new FlowModification(incrementRevision(revision), modifier); + final ProcessorEntity entity = entityFactory.createProcessorEntity(updatedProcDto, dtoFactory.createRevisionDTO(lastMod), accessPolicy); + + return new StandardRevisionUpdate<>(entity, lastMod); } }); + + return update.getComponent(); } @Override - public ConfigurationSnapshot<ControllerServiceDTO> createControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() { + public ControllerServiceEntity createControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) { + // TODO: Instead of "root" we need the ID of the Process Group that the Controller Service is being created in. + // Right now, though, they are not scoped to a particular group. + return revisionManager.get("root", new ReadOnlyRevisionCallback<RevisionUpdate<ControllerServiceEntity>>() { @Override - public ConfigurationResult<ControllerServiceDTO> execute() { + public RevisionUpdate<ControllerServiceEntity> withRevision(final Revision revision) { // ensure id is set if (StringUtils.isBlank(controllerServiceDTO.getId())) { controllerServiceDTO.setId(UUID.randomUUID().toString()); @@ -1298,21 +1474,32 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new StandardConfigurationResult<ControllerServiceDTO>(true, dtoFactory.createControllerServiceDto(controllerService)); + final Revision updatedRevision = new Revision(0L, revision.getClientId(), controllerService.getIdentifier()); + final FlowModification lastMod = new FlowModification(updatedRevision, NiFiUserUtils.getNiFiUserName()); + final RevisionDTO updatedRevisionDto = dtoFactory.createRevisionDTO(lastMod); + final ControllerServiceDTO updatedDto = dtoFactory.createControllerServiceDto(controllerService); + + // TODO: When ControllerServiceNode implements Authorizable, we need to update the access policy. + final AccessPolicyDTO accessPolicy = allAccess(); + final ControllerServiceEntity entity = entityFactory.createControllerServiceEntity(updatedDto, updatedRevisionDto, accessPolicy); + final RevisionUpdate<ControllerServiceEntity> update = new StandardRevisionUpdate<>(entity, lastMod); + return update; } - }); + }).getComponent(); } @Override - public ConfigurationSnapshot<ControllerServiceDTO> updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) { + public UpdateResult<ControllerServiceEntity> updateControllerService(final Revision revision, final ControllerServiceDTO controllerServiceDTO) { // if controller service does not exist, then create new controller service if (controllerServiceDAO.hasControllerService(controllerServiceDTO.getId()) == false) { - return createControllerService(revision, controllerServiceDTO); + return new UpdateResult<>(createControllerService(revision, controllerServiceDTO), true); } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ControllerServiceDTO>() { + final String modifier = NiFiUserUtils.getNiFiUserName(); + final RevisionUpdate<ControllerServiceEntity> update = revisionManager.updateRevision(new StandardRevisionClaim(revision), modifier, + new UpdateRevisionTask<ControllerServiceEntity>() { @Override - public ConfigurationResult<ControllerServiceDTO> execute() { + public RevisionUpdate<ControllerServiceEntity> update() { final ControllerServiceNode controllerService = controllerServiceDAO.updateControllerService(controllerServiceDTO); // save the update @@ -1322,34 +1509,108 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new StandardConfigurationResult<ControllerServiceDTO>(false, dtoFactory.createControllerServiceDto(controllerService)); + final Revision updatedRevision = incrementRevision(revision); + final ControllerServiceDTO controllerServiceDto = dtoFactory.createControllerServiceDto(controllerService); + final FlowModification lastMod = new FlowModification(updatedRevision, modifier); + final RevisionDTO updatedRevisionDto = dtoFactory.createRevisionDTO(lastMod); + + // TODO: When ControllerServiceNode implements Authorizable, we need to update the access policy. + final AccessPolicyDTO accessPolicy = allAccess(); + final ControllerServiceEntity entity = entityFactory.createControllerServiceEntity(controllerServiceDto, updatedRevisionDto, accessPolicy); + final RevisionUpdate<ControllerServiceEntity> update = new StandardRevisionUpdate<>(entity, lastMod); + return update; } }); + + return new UpdateResult<>(update.getComponent(), false); } @Override - public ConfigurationSnapshot<Set<ControllerServiceReferencingComponentDTO>> updateControllerServiceReferencingComponents( - final Revision revision, - final String controllerServiceId, - final org.apache.nifi.controller.ScheduledState scheduledState, - final org.apache.nifi.controller.service.ControllerServiceState controllerServiceState) { + public ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingComponents( + final Revision revision, final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { + + // Get a set of all component ID's that are referencing the controller service, because we will need. + // to lock on those components. We do not need a Read Lock here from the Revision Manager because + // we already have a claim on the controller service. + final Set<Revision> referencingRevisions = controllerServiceDAO.getControllerService(controllerServiceId).getReferences().getReferencingComponents().stream() + .map(component -> revisionManager.getRevision(component.getIdentifier())) + .collect(Collectors.toSet()); - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Set<ControllerServiceReferencingComponentDTO>>() { + // TODO: Need to figure out how to handle this. We need the Revision Claim to encapsulate the controller service + // as well as all of the referencing components. One option is for the verifyCanUpdateControllerServiceReferencingComponents() + // to obtain these claims and return to client. Another option is to just not obtain a claim on them and have an override + // to the revisionManager.updateRevision() method that allows us to specify a RevisionClaim and a Set<String> that contains + // additional component ID's that must be claimed before the operation can proceed. + revisionManager.requestClaim(referencingRevisions); + final Set<Revision> requiredRevisions = new HashSet<>(referencingRevisions); + requiredRevisions.add(revision); + final RevisionClaim claim = new StandardRevisionClaim(requiredRevisions); + final String modifier = NiFiUserUtils.getNiFiUserName(); + + final RevisionUpdate<ControllerServiceReferencingComponentsEntity> update = revisionManager.updateRevision(claim, modifier, new UpdateRevisionTask<ControllerServiceReferencingComponentsEntity>() { @Override - public ConfigurationResult<Set<ControllerServiceReferencingComponentDTO>> execute() { - final ControllerServiceReference reference = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); + public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() { + final Set<ConfiguredComponent> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState); + final ControllerServiceReference reference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences(); + + final Map<String, Revision> updatedRevisions = new HashMap<>(); + for (final Revision refRevision : referencingRevisions) { + updatedRevisions.put(refRevision.getComponentId(), refRevision); + } + + for (final ConfiguredComponent component : updated) { + final Revision currentRevision = revisionManager.getRevision(component.getIdentifier()); + final Revision updatedRevision = incrementRevision(currentRevision); + updatedRevisions.put(component.getIdentifier(), updatedRevision); + } - return new StandardConfigurationResult<Set<ControllerServiceReferencingComponentDTO>>(false, dtoFactory.createControllerServiceReferencingComponentsDto(reference)); + final ControllerServiceReferencingComponentsEntity entity = createControllerServiceReferencingComponentsEntity(reference, updatedRevisions); + return new StandardRevisionUpdate<>(entity, null, new HashSet<>(updatedRevisions.values())); } }); + + return update.getComponent(); } + private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference, final Map<String, Revision> revisions) { + final String modifier = NiFiUserUtils.getNiFiUserName(); + final Set<ConfiguredComponent> referencingComponents = reference.getReferencingComponents(); + + final Set<ControllerServiceReferencingComponentEntity> componentEntities = new HashSet<>(); + for (final ConfiguredComponent refComponent : referencingComponents) { + AccessPolicyDTO accessPolicy = null; + if (refComponent instanceof Authorizable) { + accessPolicy = dtoFactory.createAccessPolicyDto((Authorizable) refComponent); + } + + final Revision revision = revisions.get(refComponent.getIdentifier()); + final FlowModification flowMod = new FlowModification(revision, modifier); + final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(flowMod); + final ControllerServiceReferencingComponentDTO dto = dtoFactory.createControllerServiceReferencingComponentDTO(refComponent); + componentEntities.add(entityFactory.createControllerServiceReferencingComponentEntity(dto, revisionDto, accessPolicy)); + } + + final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity(); + entity.setControllerServiceReferencingComponents(componentEntities); + + return entity; + } + + @Override - public ConfigurationSnapshot<Void> deleteControllerService(final Revision revision, final String controllerServiceId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() { + public ControllerServiceEntity deleteControllerService(final Revision revision, final String controllerServiceId) { + final RevisionClaim claim = new StandardRevisionClaim(revision); + final String modifier = NiFiUserUtils.getNiFiUserName(); + return revisionManager.updateRevision(claim, modifier, new UpdateRevisionTask<ControllerServiceEntity>() { @Override - public ConfigurationResult<Void> execute() { - // delete the label + public RevisionUpdate<ControllerServiceEntity> update() { + // TODO: Verify authorizations once ControllerServiceNode implements Authorizable + // ensure access to the component + + final ControllerServiceNode controllerServiceNode = controllerServiceDAO.getControllerService(controllerServiceId); + final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(controllerServiceNode); + + // delete the controller service controllerServiceDAO.deleteControllerService(controllerServiceId); // save the update @@ -1359,17 +1620,19 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new StandardConfigurationResult<Void>(false, null); + final ControllerServiceEntity entity = entityFactory.createControllerServiceEntity(dto, null, null); + return new StandardRevisionUpdate<>(entity, null); } - }); + }).getComponent(); } @Override - public ConfigurationSnapshot<ReportingTaskDTO> createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ReportingTaskDTO>() { + public ReportingTaskEntity createReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { + // TODO: replace "root" with the ID of the process group once the reporting tasks are scoped by group + return revisionManager.get("root", new ReadOnlyRevisionCallback<ReportingTaskEntity>() { @Override - public ConfigurationResult<ReportingTaskDTO> execute() { + public ReportingTaskEntity withRevision(Revision revision) { // ensure id is set if (StringUtils.isBlank(reportingTaskDTO.getId())) { reportingTaskDTO.setId(UUID.randomUUID().toString()); @@ -1385,21 +1648,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new StandardConfigurationResult<ReportingTaskDTO>(true, dtoFactory.createReportingTaskDto(reportingTask)); + final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask); + final RevisionDTO updatedRevision = new RevisionDTO(); + final AccessPolicyDTO accessPolicy = allAccess(); // TODO: Fix this once ReportingTask extends Authorizable + return entityFactory.createReportingTaskEntity(dto, updatedRevision, accessPolicy); } }); } @Override - public ConfigurationSnapshot<ReportingTaskDTO> updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { + public UpdateResult<ReportingTaskEntity> updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { // if reporting task does not exist, then create new reporting task if (reportingTaskDAO.hasReportingTask(reportingTaskDTO.getId()) == false) { - return createReportingTask(revision, reportingTaskDTO); + return new UpdateResult<>(createReportingTask(revision, reportingTaskDTO), true); } - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ReportingTaskDTO>() { + final String modifier = NiFiUserUtils.getNiFiUserName(); + final RevisionUpdate<ReportingTaskEntity> update = revisionManager.updateRevision(new StandardRevisionClaim(revision), modifier, new UpdateRevisionTask<ReportingTaskEntity>() { @Override - public ConfigurationResult<ReportingTaskDTO> execute() { + public RevisionUpdate<ReportingTaskEntity> update() { final ReportingTaskNode reportingTask = reportingTaskDAO.updateReportingTask(reportingTaskDTO); // save the update @@ -1409,17 +1676,28 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new StandardConfigurationResult<ReportingTaskDTO>(false, dtoFactory.createReportingTaskDto(reportingTask)); + final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTask); + final RevisionDTO updatedRevision = new RevisionDTO(); + final AccessPolicyDTO accessPolicy = allAccess(); // TODO: Remove this once Reporting Task extends Authorizable + final ReportingTaskEntity entity = entityFactory.createReportingTaskEntity(dto, updatedRevision, accessPolicy); + final FlowModification lastMod = new FlowModification(incrementRevision(revision), modifier); + return new StandardRevisionUpdate<>(entity, lastMod); } }); + + final ReportingTaskEntity entity = update.getComponent(); + return new UpdateResult<>(entity, false); } @Override - public ConfigurationSnapshot<Void> deleteReportingTask(final Revision revision, final String reportingTaskId) { - return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() { + public ReportingTaskEntity deleteReportingTask(final Revision revision, final String reportingTaskId) { + // TODO: replace "root" with the ID of the process group once the reporting tasks are scoped by group + return revisionManager.get("root", new ReadOnlyRevisionCallback<ReportingTaskEntity>() { @Override - public ConfigurationResult<Void> execute() { - // delete the label + public ReportingTaskEntity withRevision(final Revision revision) { + final ReportingTaskNode reportingTaskNode = reportingTaskDAO.getReportingTask(reportingTaskId); + + // delete the reporting task reportingTaskDAO.deleteReportingTask(reportingTaskId); // save the update @@ -1429,7 +1707,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { controllerFacade.save(); } - return new StandardConfigurationResult<Void>(false, null); + final ReportingTaskDTO dto = dtoFactory.createReportingTaskDto(reportingTaskNode); + return entityFactory.createReportingTaskEntity(dto, null, null); } }); } @@ -1490,8 +1769,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // Read Operations // ----------------------------------------- @Override + @Deprecated public RevisionDTO getRevision() { - return dtoFactory.createRevisionDTO(optimisticLockingManager.getLastModification()); + final Revision revision = revisionManager.getRevision(processGroupDAO.getProcessGroup("root").getIdentifier()); + return dtoFactory.createRevisionDTO(revision.getVersion(), revision.getClientId()); } @Override @@ -1541,32 +1822,47 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ComponentStateDTO getProcessorState(String processorId) { - final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null; - final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL); + return revisionManager.get(processorId, new ReadOnlyRevisionCallback<ComponentStateDTO>() { + @Override + public ComponentStateDTO withRevision(Revision revision) { + final StateMap clusterState = isClustered() ? processorDAO.getState(processorId, Scope.CLUSTER) : null; + final StateMap localState = processorDAO.getState(processorId, Scope.LOCAL); - // processor will be non null as it was already found when getting the state - final ProcessorNode processor = processorDAO.getProcessor(processorId); - return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState); + // processor will be non null as it was already found when getting the state + final ProcessorNode processor = processorDAO.getProcessor(processorId); + return dtoFactory.createComponentStateDTO(processorId, processor.getProcessor().getClass(), localState, clusterState); + } + }); } @Override public ComponentStateDTO getControllerServiceState(String controllerServiceId) { - final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null; - final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL); + return revisionManager.get(controllerServiceId, new ReadOnlyRevisionCallback<ComponentStateDTO>() { + @Override + public ComponentStateDTO withRevision(Revision revision) { + final StateMap clusterState = isClustered() ? controllerServiceDAO.getState(controllerServiceId, Scope.CLUSTER) : null; + final StateMap localState = controllerServiceDAO.getState(controllerServiceId, Scope.LOCAL); - // controller service will be non null as it was already found when getting the state - final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); - return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState); + // controller service will be non null as it was already found when getting the state + final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId); + return dtoFactory.createComponentStateDTO(controllerServiceId, controllerService.getControllerServiceImplementation().getClass(), localState, clusterState); + } + }); } @Override public ComponentStateDTO getReportingTaskState(String reportingTaskId) { - final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null; - final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL); + return revisionManager.get(reportingTaskId, new ReadOnlyRevisionCallback<ComponentStateDTO>() { + @Override + public ComponentStateDTO withRevision(Revision revision) { + final StateMap clusterState = isClustered() ? reportingTaskDAO.getState(reportingTaskId, Scope.CLUSTER) : null; + final StateMap localState = reportingTaskDAO.getState(reportingTaskId, Scope.LOCAL); - // reporting task will be non null as it was already found when getting the state - final ReportingTaskNode reportingTask = reportingTaskDAO.getReportingTask(reportingTaskId); - return dtoFactory.createComponentStateDTO(reportingTaskId, reportingTask.getReportingTask().getClass(), localState, clusterState); + // reporting task will be non null as it was already found when getting the state +
<TRUNCATED>
