http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java index 2a48183..b627964 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java @@ -31,6 +31,7 @@ import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UiExtensionType; +import org.apache.nifi.web.UpdateResult; import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; @@ -425,19 +426,22 @@ public class ReportingTaskResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(revisionEntity.getRevision(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyCanClearReportingTaskState(id); return generateContinueResponse().build(); } // get the component state - final RevisionDTO requestRevision = revisionEntity.getRevision(); - final ConfigurationSnapshot<Void> snapshot = serviceFacade.clearReportingTaskState(new Revision(requestRevision.getVersion(), requestRevision.getClientId()), id); + final ConfigurationSnapshot<Void> snapshot = serviceFacade.clearReportingTaskState(revision, id); // create the revision final RevisionDTO responseRevision = new RevisionDTO(); - responseRevision.setClientId(requestRevision.getClientId()); + responseRevision.setClientId(revision.getClientId()); responseRevision.setVersion(snapshot.getVersion()); // generate the response entity @@ -521,32 +525,27 @@ public class ReportingTaskResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(reportingTaskEntity, id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyUpdateReportingTask(requestReportingTaskDTO); return generateContinueResponse().build(); } // update the reporting task - final RevisionDTO revision = reportingTaskEntity.getRevision(); - final ConfigurationSnapshot<ReportingTaskDTO> controllerResponse = serviceFacade.updateReportingTask( - new Revision(revision.getVersion(), revision.getClientId()), requestReportingTaskDTO); + final UpdateResult<ReportingTaskEntity> controllerResponse = serviceFacade.updateReportingTask(revision, requestReportingTaskDTO); // get the results - final ReportingTaskDTO responseReportingTaskDTO = controllerResponse.getConfiguration(); - - // get the updated revision - final RevisionDTO updatedRevision = new RevisionDTO(); - updatedRevision.setClientId(revision.getClientId()); - updatedRevision.setVersion(controllerResponse.getVersion()); + final ReportingTaskEntity entity = controllerResponse.getResult(); // build the response entity - final ReportingTaskEntity entity = new ReportingTaskEntity(); - entity.setRevision(updatedRevision); - entity.setReportingTask(populateRemainingReportingTaskContent(availability, responseReportingTaskDTO)); + populateRemainingReportingTaskContent(availability, entity.getReportingTask()); if (controllerResponse.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(responseReportingTaskDTO.getUri()), entity)).build(); + return clusterContext(generateCreatedResponse(URI.create(entity.getReportingTask().getUri()), entity)).build(); } else { return clusterContext(generateOkResponse(entity)).build(); } @@ -620,30 +619,18 @@ public class ReportingTaskResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyDeleteReportingTask(id); return generateContinueResponse().build(); } - // determine the specified version - Long clientVersion = null; - if (version != null) { - clientVersion = version.getLong(); - } - // delete the specified reporting task - final ConfigurationSnapshot<Void> controllerResponse = serviceFacade.deleteReportingTask(new Revision(clientVersion, clientId.getClientId()), id); - - // get the updated revision - final RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - revision.setVersion(controllerResponse.getVersion()); - - // build the response entity - final ReportingTaskEntity entity = new ReportingTaskEntity(); - entity.setRevision(revision); - + final ReportingTaskEntity entity = serviceFacade.deleteReportingTask(revision, id); return clusterContext(generateOkResponse(entity)).build(); }
http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TemplateResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TemplateResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TemplateResource.java index ceb9455..6bb8a15 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TemplateResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/TemplateResource.java @@ -195,8 +195,9 @@ public class TemplateResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + // TODO: NEED VERSION FOR REVISION! + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase) { return generateContinueResponse().build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index afcf86a..fa3b413 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -16,6 +16,30 @@ */ package org.apache.nifi.web.api.dto; +import java.text.Collator; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.ws.rs.WebApplicationException; + import org.apache.nifi.action.Action; import org.apache.nifi.action.component.details.ComponentDetails; import org.apache.nifi.action.component.details.ExtensionDetails; @@ -135,28 +159,7 @@ import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusSnapshotDTO; - -import javax.ws.rs.WebApplicationException; -import java.text.Collator; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; public final class DtoFactory { @@ -686,18 +689,31 @@ public final class DtoFactory { dto.setLinked(snippet.isLinked()); // populate the snippet contents ids - dto.setConnections(copy(snippet.getConnections())); - dto.setFunnels(copy(snippet.getFunnels())); - dto.setInputPorts(copy(snippet.getInputPorts())); - dto.setLabels(copy(snippet.getLabels())); - dto.setOutputPorts(copy(snippet.getOutputPorts())); - dto.setProcessGroups(copy(snippet.getProcessGroups())); - dto.setProcessors(copy(snippet.getProcessors())); - dto.setRemoteProcessGroups(copy(snippet.getRemoteProcessGroups())); + dto.setConnections(mapRevisionToDto(snippet.getConnections())); + dto.setFunnels(mapRevisionToDto(snippet.getFunnels())); + dto.setInputPorts(mapRevisionToDto(snippet.getInputPorts())); + dto.setLabels(mapRevisionToDto(snippet.getLabels())); + dto.setOutputPorts(mapRevisionToDto(snippet.getOutputPorts())); + dto.setProcessGroups(mapRevisionToDto(snippet.getProcessGroups())); + dto.setProcessors(mapRevisionToDto(snippet.getProcessors())); + dto.setRemoteProcessGroups(mapRevisionToDto(snippet.getRemoteProcessGroups())); return dto; } + private Map<String, RevisionDTO> mapRevisionToDto(final Map<String, Revision> revisionMap) { + final Map<String, RevisionDTO> dtos = new HashMap<>(revisionMap.size()); + for (final Map.Entry<String, Revision> entry : revisionMap.entrySet()) { + final Revision revision = entry.getValue(); + final RevisionDTO revisionDto = new RevisionDTO(); + revisionDto.setClientId(revision.getClientId()); + revisionDto.setVersion(revision.getVersion()); + + dtos.put(entry.getKey(), revisionDto); + } + return dtos; + } + /** * Creates a TemplateDTO from the specified template. * @@ -1173,7 +1189,7 @@ public final class DtoFactory { } // create the reference dto's - dto.setReferencingComponents(createControllerServiceReferencingComponentsDto(controllerServiceNode.getReferences())); + dto.setReferencingComponents(createControllerServiceReferencingComponentEntities(controllerServiceNode.getReferences())); // add the validation errors final Collection<ValidationResult> validationErrors = controllerServiceNode.getValidationErrors(); @@ -1189,6 +1205,55 @@ public final class DtoFactory { return dto; } + + // TODO: REMOVE THIS... + public Set<ControllerServiceReferencingComponentEntity> createControllerServiceReferencingComponentEntities(final ControllerServiceReference reference) { + final Set<ControllerServiceReferencingComponentDTO> dtos = createControllerServiceReferencingComponentsDto(reference); + final Set<ControllerServiceReferencingComponentEntity> entities = new HashSet<>(dtos.size()); + for (final ControllerServiceReferencingComponentDTO dto : dtos) { + final ControllerServiceReferencingComponentEntity entity = new ControllerServiceReferencingComponentEntity(); + entity.setControllerServiceReferencingComponent(dto); + entities.add(entity); + } + + return entities; + } + + + public ControllerServiceReferencingComponentDTO createControllerServiceReferencingComponentDTO(final ConfiguredComponent component) { + final ControllerServiceReferencingComponentDTO dto = new ControllerServiceReferencingComponentDTO(); + + dto.setDescriptors(new HashMap<>()); + dto.setGroupId(null); // TODO + dto.setId(component.getIdentifier()); + dto.setName(component.getName()); + dto.setProperties(convertProperties(component.getProperties())); + dto.setState(null); // TODO + dto.setType(null); // TODO + dto.setValidationErrors(component.getValidationErrors().stream().map(err -> err.toString()).collect(Collectors.toList())); + + if (component instanceof ControllerServiceNode) { + final ControllerServiceNode serviceNode = (ControllerServiceNode) component; + final Set<ConfiguredComponent> refs = serviceNode.getReferences().getReferencingComponents(); + final Set<ControllerServiceReferencingComponentDTO> refDtos = new HashSet<>(refs.size()); + + for (final ConfiguredComponent ref : refs) { + refDtos.add(createControllerServiceReferencingComponentDTO(ref)); + } + dto.setReferencingComponents(refDtos); + } + + return dto; + } + + private Map<String, String> convertProperties(final Map<PropertyDescriptor, String> properties) { + final Map<String, String> converted = new HashMap<>(properties.size()); + for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { + converted.put(entry.getKey().getDisplayName(), entry.getValue()); + } + return converted; + } + public Set<ControllerServiceReferencingComponentDTO> createControllerServiceReferencingComponentsDto(final ControllerServiceReference reference) { return createControllerServiceReferencingComponentsDto(reference, new HashSet<ControllerServiceNode>()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java index 0d46d94..5e82da2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java @@ -18,12 +18,16 @@ package org.apache.nifi.web.api.dto; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentEntity; import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.LabelEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; +import org.apache.nifi.web.api.entity.ReportingTaskEntity; +import org.apache.nifi.web.api.entity.SnippetEntity; public final class EntityFactory { @@ -152,4 +156,61 @@ public final class EntityFactory { return entity; } + public RemoteProcessGroupPortEntity createRemoteProcessGroupPortEntity(final RemoteProcessGroupPortDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy) { + final RemoteProcessGroupPortEntity entity = new RemoteProcessGroupPortEntity(); + entity.setRevision(revision); + if (dto != null) { + entity.setAccessPolicy(accessPolicy); + entity.setId(dto.getId()); + if (accessPolicy != null && accessPolicy.getCanRead()) { + entity.setRemoteProcessGroupPort(dto); + } + } + + return entity; + } + + public SnippetEntity createSnippetEntity(final SnippetDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy) { + final SnippetEntity entity = new SnippetEntity(); + entity.setRevision(revision); + if (dto != null) { + entity.setAccessPolicy(accessPolicy); + entity.setId(dto.getId()); + if (accessPolicy != null && accessPolicy.getCanRead()) { + entity.setSnippet(dto); + } + } + + return entity; + } + + public ReportingTaskEntity createReportingTaskEntity(final ReportingTaskDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy) { + final ReportingTaskEntity entity = new ReportingTaskEntity(); + entity.setRevision(revision); + if (dto != null) { + entity.setAccessPolicy(accessPolicy); + entity.setId(dto.getId()); + if (accessPolicy != null && accessPolicy.getCanRead()) { + entity.setReportingTask(dto); + } + } + + return entity; + } + + public ControllerServiceReferencingComponentEntity createControllerServiceReferencingComponentEntity( + final ControllerServiceReferencingComponentDTO dto, final RevisionDTO revision, final AccessPolicyDTO accessPolicy) { + + final ControllerServiceReferencingComponentEntity entity = new ControllerServiceReferencingComponentEntity(); + entity.setRevision(revision); + if (dto != null) { + entity.setAccessPolicy(accessPolicy); + entity.setId(dto.getId()); + if (accessPolicy != null && accessPolicy.getCanRead()) { + entity.setControllerServiceReferencingComponent(dto); + } + } + + return entity; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index 40e2ee8..ba50204 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -148,9 +148,11 @@ public class ControllerFacade implements Authorizable { /** * Creates an archive of the current flow. + * + * @throws IOException if unable to save a copy of the flow */ - public void createArchive() { - flowService.saveFlowChanges(TimeUnit.SECONDS, 0, true); + public void createArchive() throws IOException { + flowService.archiveFlow(); } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java index 033634b..eac7a5a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ControllerServiceDAO.java @@ -20,10 +20,9 @@ import java.util.Set; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ScheduledState; - import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.api.dto.ControllerServiceDTO; @@ -72,9 +71,9 @@ public interface ControllerServiceDAO { * @param controllerServiceId service id * @param scheduledState scheduled state * @param controllerServiceState the value of state - * @return the org.apache.nifi.controller.service.ControllerServiceReference + * @return the set of all components that were modified as a result of this action */ - ControllerServiceReference updateControllerServiceReferencingComponents(String controllerServiceId, ScheduledState scheduledState, ControllerServiceState controllerServiceState); + Set<ConfiguredComponent> updateControllerServiceReferencingComponents(String controllerServiceId, ScheduledState scheduledState, ControllerServiceState controllerServiceState); /** * Determines whether this controller service can be updated. http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java index bcace0c..435d5ce 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java @@ -17,19 +17,19 @@ package org.apache.nifi.web.dao.impl; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; - import org.apache.nifi.controller.exception.ValidationException; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.ResourceNotFoundException; @@ -121,7 +121,7 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro } @Override - public ControllerServiceReference updateControllerServiceReferencingComponents( + public Set<ConfiguredComponent> updateControllerServiceReferencingComponents( final String controllerServiceId, final ScheduledState scheduledState, final ControllerServiceState controllerServiceState) { // get the controller service final ControllerServiceNode controllerService = locateControllerService(controllerServiceId); @@ -129,19 +129,19 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro // this request is either acting upon referncing services or schedulable components if (controllerServiceState != null) { if (ControllerServiceState.ENABLED.equals(controllerServiceState)) { - serviceProvider.enableReferencingServices(controllerService); + return serviceProvider.enableReferencingServices(controllerService); } else { - serviceProvider.disableReferencingServices(controllerService); + return serviceProvider.disableReferencingServices(controllerService); } } else if (scheduledState != null) { if (ScheduledState.RUNNING.equals(scheduledState)) { - serviceProvider.scheduleReferencingComponents(controllerService); + return serviceProvider.scheduleReferencingComponents(controllerService); } else { - serviceProvider.unscheduleReferencingComponents(controllerService); + return serviceProvider.unscheduleReferencingComponents(controllerService); } } - return controllerService.getReferences(); + return Collections.emptySet(); } private List<String> validateProposedConfiguration(final ControllerServiceNode controllerService, final ControllerServiceDTO controllerServiceDTO) { http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardSnippetDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardSnippetDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardSnippetDAO.java index 24e9256..f4b0582 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardSnippetDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardSnippetDAO.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.web.dao.impl; +import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -30,11 +31,13 @@ import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.ResourceNotFoundException; +import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.SnippetDTO; import org.apache.nifi.web.dao.SnippetDAO; import org.apache.nifi.web.util.SnippetUtils; @@ -99,6 +102,17 @@ public class StandardSnippetDAO implements SnippetDAO { } } + private Map<String, Revision> mapDtoToRevision(final Map<String, RevisionDTO> revisionMap) { + final Map<String, Revision> revisions = new HashMap<>(revisionMap.size()); + for (final Map.Entry<String, RevisionDTO> entry : revisionMap.entrySet()) { + final RevisionDTO revisionDto = entry.getValue(); + final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), entry.getKey()); + + revisions.put(entry.getKey(), revision); + } + return revisions; + } + @Override public Snippet createSnippet(final SnippetDTO snippetDTO) { // create the snippet request @@ -106,14 +120,14 @@ public class StandardSnippetDAO implements SnippetDAO { snippet.setId(snippetDTO.getId()); snippet.setParentGroupId(snippetDTO.getParentGroupId()); snippet.setLinked(snippetDTO.isLinked()); - snippet.addProcessors(snippetDTO.getProcessors()); - snippet.addProcessGroups(snippetDTO.getProcessGroups()); - snippet.addRemoteProcessGroups(snippetDTO.getRemoteProcessGroups()); - snippet.addInputPorts(snippetDTO.getInputPorts()); - snippet.addOutputPorts(snippetDTO.getOutputPorts()); - snippet.addConnections(snippetDTO.getConnections()); - snippet.addLabels(snippetDTO.getLabels()); - snippet.addFunnels(snippetDTO.getFunnels()); + snippet.addProcessors(mapDtoToRevision(snippetDTO.getProcessors())); + snippet.addProcessGroups(mapDtoToRevision(snippetDTO.getProcessGroups())); + snippet.addRemoteProcessGroups(mapDtoToRevision(snippetDTO.getRemoteProcessGroups())); + snippet.addInputPorts(mapDtoToRevision(snippetDTO.getInputPorts())); + snippet.addOutputPorts(mapDtoToRevision(snippetDTO.getOutputPorts())); + snippet.addConnections(mapDtoToRevision(snippetDTO.getConnections())); + snippet.addLabels(mapDtoToRevision(snippetDTO.getLabels())); + snippet.addFunnels(mapDtoToRevision(snippetDTO.getFunnels())); // ensure this snippet isn't empty if (snippet.isEmpty()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java index e1df382..44cbb52 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/SnippetUtils.java @@ -88,7 +88,7 @@ public final class SnippetUtils { // add any processors if (!snippet.getProcessors().isEmpty()) { final Set<ProcessorDTO> processors = new LinkedHashSet<>(); - for (String processorId : snippet.getProcessors()) { + for (String processorId : snippet.getProcessors().keySet()) { final ProcessorNode processor = processGroup.getProcessor(processorId); if (processor == null) { throw new IllegalStateException("A processor in this snippet could not be found."); @@ -101,7 +101,7 @@ public final class SnippetUtils { // add any connections if (!snippet.getConnections().isEmpty()) { final Set<ConnectionDTO> connections = new LinkedHashSet<>(); - for (String connectionId : snippet.getConnections()) { + for (String connectionId : snippet.getConnections().keySet()) { final Connection connection = processGroup.getConnection(connectionId); if (connection == null) { throw new IllegalStateException("A connection in this snippet could not be found."); @@ -114,7 +114,7 @@ public final class SnippetUtils { // add any funnels if (!snippet.getFunnels().isEmpty()) { final Set<FunnelDTO> funnels = new LinkedHashSet<>(); - for (String funnelId : snippet.getFunnels()) { + for (String funnelId : snippet.getFunnels().keySet()) { final Funnel funnel = processGroup.getFunnel(funnelId); if (funnel == null) { throw new IllegalStateException("A funnel in this snippet could not be found."); @@ -127,7 +127,7 @@ public final class SnippetUtils { // add any input ports if (!snippet.getInputPorts().isEmpty()) { final Set<PortDTO> inputPorts = new LinkedHashSet<>(); - for (String inputPortId : snippet.getInputPorts()) { + for (String inputPortId : snippet.getInputPorts().keySet()) { final Port inputPort = processGroup.getInputPort(inputPortId); if (inputPort == null) { throw new IllegalStateException("An input port in this snippet could not be found."); @@ -140,7 +140,7 @@ public final class SnippetUtils { // add any labels if (!snippet.getLabels().isEmpty()) { final Set<LabelDTO> labels = new LinkedHashSet<>(); - for (String labelId : snippet.getLabels()) { + for (String labelId : snippet.getLabels().keySet()) { final Label label = processGroup.getLabel(labelId); if (label == null) { throw new IllegalStateException("A label in this snippet could not be found."); @@ -153,7 +153,7 @@ public final class SnippetUtils { // add any output ports if (!snippet.getOutputPorts().isEmpty()) { final Set<PortDTO> outputPorts = new LinkedHashSet<>(); - for (String outputPortId : snippet.getOutputPorts()) { + for (String outputPortId : snippet.getOutputPorts().keySet()) { final Port outputPort = processGroup.getOutputPort(outputPortId); if (outputPort == null) { throw new IllegalStateException("An output port in this snippet could not be found."); @@ -166,7 +166,7 @@ public final class SnippetUtils { // add any process groups if (!snippet.getProcessGroups().isEmpty()) { final Set<ProcessGroupDTO> processGroups = new LinkedHashSet<>(); - for (String childGroupId : snippet.getProcessGroups()) { + for (String childGroupId : snippet.getProcessGroups().keySet()) { final ProcessGroup childGroup = processGroup.getProcessGroup(childGroupId); if (childGroup == null) { throw new IllegalStateException("A process group in this snippet could not be found."); @@ -179,7 +179,7 @@ public final class SnippetUtils { // add any remote process groups if (!snippet.getRemoteProcessGroups().isEmpty()) { final Set<RemoteProcessGroupDTO> remoteProcessGroups = new LinkedHashSet<>(); - for (String remoteProcessGroupId : snippet.getRemoteProcessGroups()) { + for (String remoteProcessGroupId : snippet.getRemoteProcessGroups().keySet()) { final RemoteProcessGroup remoteProcessGroup = processGroup.getRemoteProcessGroup(remoteProcessGroupId); if (remoteProcessGroup == null) { throw new IllegalStateException("A remote process group in this snippet could not be found."); http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml index 9f13a23..e955cd1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml @@ -37,10 +37,8 @@ <property name="properties" ref="nifiProperties"/> </bean> - <!-- optimistic locking manager --> - <bean id="webOptimisticLockingManager" class="org.apache.nifi.web.spring.OptimisticLockingManagerFactoryBean" depends-on="clusterManagerOptimisticLockingManager"> - <property name="properties" ref="nifiProperties"/> - </bean> + <!-- revision manager --> + <bean id="revisionManager" class="org.apache.nifi.web.revision.NaiveRevisionManager" /> <!-- content access --> <bean id="contentAccess" class="org.apache.nifi.web.StandardNiFiContentAccess"> @@ -137,7 +135,7 @@ <property name="auditService" ref="auditService"/> <property name="keyService" ref="keyService"/> <property name="snippetUtils" ref="snippetUtils"/> - <property name="optimisticLockingManager" ref="webOptimisticLockingManager"/> + <property name="revisionManager" ref="revisionManager" /> <property name="dtoFactory" ref="dtoFactory"/> <property name="entityFactory" ref="entityFactory"/> <property name="clusterManager" ref="clusterManager"/> http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java index 70aa30e..f16528b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/FlowModification.java @@ -54,4 +54,8 @@ public class FlowModification { return lastModifier; } + @Override + public String toString() { + return "Last Modified by '" + lastModifier + "' with Revision " + revision; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java index e92be01..b205590 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/OptimisticLockingManager.java @@ -16,12 +16,16 @@ */ package org.apache.nifi.web; +import org.apache.nifi.web.revision.RevisionManager; + /** * A manager for optimistic locking based on revisions. A revision is composed * of a client ID and a version number. Two revisions are considered equal if * either their version numbers match or their client IDs match. * + * @deprecated This class has been deprecated in favor of {@link RevisionManager} */ +@Deprecated public interface OptimisticLockingManager { /** http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardOptimisticLockingManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardOptimisticLockingManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardOptimisticLockingManager.java index 3bc22f7..3cc6aca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardOptimisticLockingManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/StandardOptimisticLockingManager.java @@ -22,13 +22,16 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextThreadLocal; +import org.apache.nifi.web.revision.NaiveRevisionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Implements the OptimisticLockingManager interface. * + * @deprecated This class has been deprecated in favor of {@link NaiveRevisionManager} */ +@Deprecated public class StandardOptimisticLockingManager implements OptimisticLockingManager { private static final Logger logger = LoggerFactory.getLogger(StandardOptimisticLockingManager.class); http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/DeleteRevisionTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/DeleteRevisionTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/DeleteRevisionTask.java new file mode 100644 index 0000000..dc42638 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/DeleteRevisionTask.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +public interface DeleteRevisionTask<T> { + + T performTask(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/ExpiredRevisionClaimException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/ExpiredRevisionClaimException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/ExpiredRevisionClaimException.java new file mode 100644 index 0000000..8d1b6e8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/ExpiredRevisionClaimException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import org.apache.nifi.web.InvalidRevisionException; + +public class ExpiredRevisionClaimException extends InvalidRevisionException { + private static final long serialVersionUID = 5648579322377770273L; + + public ExpiredRevisionClaimException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/LockVerificationFailedException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/LockVerificationFailedException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/LockVerificationFailedException.java new file mode 100644 index 0000000..3fa22f0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/LockVerificationFailedException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +public class LockVerificationFailedException extends IllegalStateException { + private static final long serialVersionUID = 1L; + + public LockVerificationFailedException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java new file mode 100644 index 0000000..c2232a6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java @@ -0,0 +1,589 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import java.text.Collator; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.Stack; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +import org.apache.nifi.web.FlowModification; +import org.apache.nifi.web.InvalidRevisionException; +import org.apache.nifi.web.Revision; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * This class implements a naive approach for Revision Management. + * Each call into the Revision Manager will block until any previously held + * lock is expired or unlocked. This provides a very simple solution but can + * likely be improved by allowing, for instance, multiple threads to obtain + * temporary locks simultaneously, etc. + * </p> + */ +public class NaiveRevisionManager implements RevisionManager { + private static final Logger logger = LoggerFactory.getLogger(NaiveRevisionManager.class); + + private final long claimExpirationNanos; + private final ConcurrentMap<String, RevisionLock> revisionLockMap = new ConcurrentHashMap<>(); + + public NaiveRevisionManager() { + this(1, TimeUnit.MINUTES); + } + + /** + * Constructs a new NaiveRevisionManager that uses the given number of Nanoseconds as the expiration time + * for a Revision Claims + * + * @param claimNanos the number of nanoseconds that a Revision Claim should last + */ + public NaiveRevisionManager(final long claimExpiration, final TimeUnit timeUnit) { + this.claimExpirationNanos = timeUnit.toNanos(claimExpiration); + } + + @Override + public RevisionClaim requestClaim(Revision revision) throws InvalidRevisionException { + return requestClaim(Collections.singleton(revision)); + } + + @Override + public RevisionClaim requestClaim(final Collection<Revision> revisions) { + logger.debug("Attempting to claim Revisions {}", revisions); + + // Try to obtain a Revision Claim (temporary lock) on all revisions + final List<Revision> revisionList = new ArrayList<>(revisions); + revisionList.sort(new RevisionComparator()); + + ClaimResult failedClaimResult = null; + final Set<RevisionLock> locksObtained = new HashSet<>(); + for (int i = 0; i < revisionList.size(); i++) { + final Revision revision = revisionList.get(i); + final RevisionLock revisionLock = getRevisionLock(revision); + + final ClaimResult claimResult = revisionLock.requestClaim(revision); + logger.debug("Obtained Revision Claim for {}", revision); + + if (claimResult.isSuccessful()) { + locksObtained.add(revisionLock); + } else { + logger.debug("Failed to obtain Revision Claim for component with ID {} because Current Revision is {} but supplied Revision is {}", + revision.getComponentId(), claimResult.getLastModification().getRevision(), revision); + + failedClaimResult = claimResult; + break; + } + } + + // if we got a Revision Claim on each Revision, return a successful result + if (locksObtained.size() == revisionList.size()) { + logger.debug("Obtained Revision Claim for all components"); + + // it's possible that obtaining the locks took a while if we are obtaining + // many. Renew the timestamp to ensure that the first locks obtained don't + // expire too quickly. + final long timestamp = System.nanoTime() + claimExpirationNanos; + for (final RevisionLock revisionLock : locksObtained) { + revisionLock.renewExpiration(timestamp); + } + + return new StandardRevisionClaim(revisions); + } + + // We failed to obtain all of the Revision Claims necessary. Since + // we need this call to atomically obtain all or nothing, we have to now + // release the locks that we did obtain. + for (final RevisionLock revisionLock : locksObtained) { + revisionLock.releaseClaim(); + } + + final FlowModification lastMod = failedClaimResult.getLastModification(); + if (lastMod.getRevision().getClientId() == null || lastMod.getRevision().getClientId().trim().isEmpty() || lastMod.getRevision().getVersion() == null) { + throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.", + failedClaimResult.getProposedRevision(), lastMod.getRevision())); + } else { + throw new InvalidRevisionException(String.format("Component %s has been updated by '%s'. Please refresh to synchronize the view.", + failedClaimResult.getProposedRevision().getComponentId(), lastMod.getLastModifier())); + } + } + + @Override + public Revision getRevision(final String componentId) { + final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, + id -> new RevisionLock(new FlowModification(new Revision(0L, null, componentId), null), claimExpirationNanos)); + + return revisionLock.getRevision(); + } + + @Override + public <T> T deleteRevision(final RevisionClaim claim, final DeleteRevisionTask<T> task) throws ExpiredRevisionClaimException { + logger.debug("Attempting to delete revision using {}", claim); + int successCount = 0; + final List<Revision> revisionList = new ArrayList<>(claim.getRevisions()); + revisionList.sort(new RevisionComparator()); + + String failedId = null; + for (final Revision revision : revisionList) { + final RevisionLock revisionLock = getRevisionLock(revision); + final boolean verified = revisionLock.requestWriteLock(revision); + + if (verified) { + logger.debug("Verified Revision Claim for {}", revision); + successCount++; + } else { + logger.debug("Failed to verify Revision Claim for {}", revision); + failedId = revision.getComponentId(); + break; + } + } + + if (successCount == revisionList.size()) { + logger.debug("Successfully verified Revision Claim for all revisions"); + + final T taskValue = task.performTask(); + for (final Revision revision : revisionList) { + deleteRevisionLock(revision); + logger.debug("Deleted Revision {}", revision); + } + + return taskValue; + } + + // We failed to obtain a thread lock for all revisions. Relinquish + // any Revision Claims that we have + for (int i = 0; i < successCount; i++) { + final Revision revision = revisionList.get(i); + final RevisionLock revisionLock = getRevisionLock(revision); + revisionLock.relinquishRevisionClaim(revision); + logger.debug("Relinquished lock for {}", revision); + } + + // Throw an Exception indicating that we failed to obtain the locks + throw new ExpiredRevisionClaimException("Invalid Revision was given for component with ID '" + failedId + "'"); + } + + @Override + public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final String modifier, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException { + int successCount = 0; + logger.debug("Attempting to update revision using {}", originalClaim); + + final List<Revision> revisionList = new ArrayList<>(originalClaim.getRevisions()); + revisionList.sort(new RevisionComparator()); + + String failedId = null; + for (final Revision revision : revisionList) { + final RevisionLock revisionLock = getRevisionLock(revision); + final boolean verified = revisionLock.requestWriteLock(revision); + + if (verified) { + logger.debug("Verified Revision Claim for {}", revision); + successCount++; + } else { + logger.debug("Failed to verify Revision Claim for {}", revision); + failedId = revision.getComponentId(); + break; + } + } + + // We successfully verified all revisions. + if (successCount == revisionList.size()) { + logger.debug("Successfully verified Revision Claim for all revisions"); + + RevisionUpdate<T> updatedComponent = null; + try { + updatedComponent = task.update(); + } finally { + // Release the lock that we are holding and update the revision. + // To do this, we need to map the old revision to the new revision + // so that we have an efficient way to lookup the pairing, so that + // we can easily obtain the old revision and the new revision for + // the same component in order to call #unlock on the RevisionLock + final Map<Revision, Revision> updatedRevisions = new HashMap<>(); + final Map<String, Revision> revisionsByComponentId = new HashMap<>(); + for (final Revision revision : revisionList) { + updatedRevisions.put(revision, revision); + revisionsByComponentId.put(revision.getComponentId(), revision); + } + + if (updatedComponent != null) { + for (final Revision updatedRevision : updatedComponent.getUpdatedRevisions()) { + final Revision oldRevision = revisionsByComponentId.get(updatedRevision.getComponentId()); + if (oldRevision != null) { + updatedRevisions.put(oldRevision, updatedRevision); + } + } + } + + for (final Revision revision : revisionList) { + getRevisionLock(revision).unlock(revision, updatedRevisions.get(revision), modifier); + } + } + + return updatedComponent; + } + + // We failed to obtain a thread lock for all revisions. Relinquish + // any Revision Claims that we have + for (int i = 0; i < successCount; i++) { + final Revision revision = revisionList.get(i); + final RevisionLock revisionLock = getRevisionLock(revision); + revisionLock.cancelWriteLock(); + logger.debug("Relinquished lock for {}", revision); + } + + // Throw an Exception indicating that we failed to obtain the locks + throw new InvalidRevisionException("Invalid Revision was given for component with ID '" + failedId + "'"); + } + + @Override + public boolean releaseClaim(final RevisionClaim claim) { + boolean success = true; + + final List<Revision> revisions = new ArrayList<>(claim.getRevisions()); + revisions.sort(new RevisionComparator()); + + for (final Revision revision : revisions) { + final RevisionLock revisionLock = getRevisionLock(revision); + success = revisionLock.relinquishRevisionClaim(revision) && success; + } + + return success; + } + + @Override + public <T> T get(final String componentId, final ReadOnlyRevisionCallback<T> callback) { + final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, id -> new RevisionLock(new FlowModification(new Revision(0L, null, id), null), claimExpirationNanos)); + logger.debug("Attempting to obtain read lock for {}", revisionLock.getRevision()); + revisionLock.acquireReadLock(); + logger.debug("Obtained read lock for {}", revisionLock.getRevision()); + + try { + return callback.withRevision(revisionLock.getRevision()); + } finally { + logger.debug("Releasing read lock for {}", revisionLock.getRevision()); + revisionLock.relinquishReadLock(); + } + } + + @Override + public <T> T get(final Set<String> componentIds, final Supplier<T> callback) { + final List<String> sortedIds = new ArrayList<>(componentIds); + sortedIds.sort(Collator.getInstance()); + + final Stack<RevisionLock> revisionLocks = new Stack<>(); + for (final String componentId : sortedIds) { + final RevisionLock revisionLock = revisionLockMap.computeIfAbsent(componentId, id -> new RevisionLock(new FlowModification(new Revision(0L, null, id), null), claimExpirationNanos)); + logger.debug("Attempting to obtain read lock for {}", revisionLock.getRevision()); + revisionLock.acquireReadLock(); + revisionLocks.push(revisionLock); + logger.debug("Obtained read lock for {}", revisionLock.getRevision()); + } + + logger.debug("Obtained read lock for all necessary components; calling call-back"); + try { + return callback.get(); + } finally { + while (!revisionLocks.isEmpty()) { + final RevisionLock lock = revisionLocks.pop(); + logger.debug("Releasing read lock for {}", lock.getRevision()); + lock.relinquishReadLock(); + } + } + } + + private void deleteRevisionLock(final Revision revision) { + final RevisionLock revisionLock = revisionLockMap.remove(revision.getComponentId()); + if (revisionLock == null) { + return; + } + + + revisionLock.releaseClaim(); + } + + private RevisionLock getRevisionLock(final Revision revision) { + return revisionLockMap.computeIfAbsent(revision.getComponentId(), id -> new RevisionLock(new FlowModification(revision, null), claimExpirationNanos)); + } + + + private static class RevisionLock { + private final AtomicReference<FlowModification> lastModReference = new AtomicReference<>(); + private final AtomicReference<LockStamp> lockStamp = new AtomicReference<>(); + private final long lockNanos; + private final ReadWriteLock threadLock = new ReentrantReadWriteLock(); + + public RevisionLock(final FlowModification lastMod, final long lockNanos) { + this.lockNanos = lockNanos; + lastModReference.set(lastMod); + } + + /** + * Requests that a Revision Claim be granted for the proposed Revision + * + * @param proposedRevision the revision to obtain a Claim for + * + * @return <code>true</code> if the Revision is valid and a Claim has been granted, <code>false</code> otherwise + */ + public ClaimResult requestClaim(final Revision proposedRevision) { + // acquire the claim, blocking if necessary. + acquireClaim(proposedRevision.getClientId()); + + threadLock.writeLock().lock(); + try { + // check if the revision is correct + final FlowModification lastModification = lastModReference.get(); + final Revision currentRevision = lastModification.getRevision(); + if (proposedRevision.equals(currentRevision)) { + // revision is correct - return true + return new ClaimResult(true, lastModification, proposedRevision); + } + + // revision is incorrect. Release the Claim and return false + releaseClaim(); + logger.debug("Cannot obtain Revision Claim {} because the Revision is out-of-date. Current revision is {}", proposedRevision, currentRevision); + return new ClaimResult(false, lastModification, proposedRevision); + } finally { + threadLock.writeLock().unlock(); + } + } + + /** + * Verifies that the given Revision has a Claim against it already and that the Claim belongs + * to the same client as the given Revision. If so, upgrades the Revision Claim to a lock that + * will not be relinquished until the {@link #unlock(Revision)} method is called. + * + * @param proposedRevision the current Revision + * @return <code>true</code> if the Revision Claim was upgraded to a lock, <code>false</code> otherwise + * @throws ExpiredRevisionClaimException if the Revision Claim for the given Revision has already expired + */ + public boolean requestWriteLock(final Revision proposedRevision) throws ExpiredRevisionClaimException { + Objects.requireNonNull(proposedRevision); + threadLock.writeLock().lock(); + + if (getRevision().equals(proposedRevision)) { + final LockStamp stamp = lockStamp.get(); + + if (stamp == null) { + logger.debug("Attempted to obtain write lock for {} but no Claim was obtained", proposedRevision); + throw new IllegalStateException("No claim has been obtained for " + proposedRevision + " so cannot lock the component for modification"); + } + + if (stamp.getClientId() == null || stamp.getClientId().equals(proposedRevision.getClientId())) { + // TODO - Must make sure that we don't have an expired stamp if it is the result of another + // operation taking a long time. I.e., Client A fires off two requests for Component X. If the + // first one takes 2 minutes to complete, it should not result in the second request getting + // rejected. I.e., we want to ensure that if the request is received before the Claim expired, + // that we do not throw an ExpiredRevisionClaimException. Expiration of the Revision is intended + // only to avoid the case where a node obtains a Claim and then the node is lost or otherwise does + // not fulfill the second phase of the two-phase commit. + // We may need a Queue of updates (queue would need to be bounded, with a request getting + // rejected if queue is full). + if (stamp.isExpired()) { + threadLock.writeLock().unlock(); + throw new ExpiredRevisionClaimException("Claim for " + proposedRevision + " has expired"); + } + + // Intentionally leave the thread lock in a locked state! + return true; + } else { + logger.debug("Failed to verify {} because the Client ID was not the same as the Lock Stamp's Client ID (Lock Stamp was {})", proposedRevision, stamp); + } + } + + // revision is wrong. Unlock thread lock and return false + threadLock.writeLock().unlock(); + return false; + } + + private void acquireClaim(final String clientId) { + while (true) { + final LockStamp stamp = lockStamp.get(); + + if (stamp == null || stamp.isExpired()) { + final long now = System.nanoTime(); + final boolean lockObtained = lockStamp.compareAndSet(stamp, new LockStamp(clientId, now + lockNanos)); + if (lockObtained) { + return; + } + } else { + Thread.yield(); + } + } + } + + public void acquireReadLock() { + // Wait until we can claim the lock stamp + boolean obtained = false; + while (!obtained) { + // If the lock stamp is not null, then there is either an active Claim or a + // write lock held. Wait until it is null and then replace it atomically + // with a LockStamp that does not expire (expiration time is Long.MAX_VALUE). + final LockStamp curStamp = lockStamp.get(); + obtained = (curStamp == null || curStamp.isExpired()) && lockStamp.compareAndSet(curStamp, new LockStamp(null, Long.MAX_VALUE)); + + if (!obtained) { + // Could not obtain lock. Yield so that we don't sit + // around doing nothing with the thread. + Thread.yield(); + } + } + + // Now we can obtain the read lock without problem. + threadLock.readLock().lock(); + } + + public void relinquishReadLock() { + lockStamp.set(null); + threadLock.readLock().unlock(); + } + + private void releaseClaim() { + lockStamp.set(null); + } + + /** + * Releases the Revision Claim if and only if the current revision matches the proposed revision + * + * @param proposedRevision the proposed revision to check against the current revision + * @return <code>true</code> if the Revision Claim was relinquished, <code>false</code> otherwise + */ + public boolean relinquishRevisionClaim(final Revision proposedRevision) { + threadLock.writeLock().lock(); + try { + if (getRevision().equals(proposedRevision)) { + releaseClaim(); + return true; + } + + return false; + } finally { + threadLock.writeLock().unlock(); + } + } + + /** + * Releases the lock and any Revision Claim that is held for the given Revision and + * updates the revision + * + * @param proposedRevision the current Revision + * @param updatedRevision the Revision to update the current revision to + */ + public void unlock(final Revision proposedRevision, final Revision updatedRevision, final String modifier) { + final Revision curRevision = getRevision(); + if (curRevision == null) { + throw new IllegalMonitorStateException("Cannot unlock " + proposedRevision + " because it is not locked"); + } + + if (!curRevision.equals(proposedRevision)) { + // Intentionally leave the thread lock in a locked state! + throw new IllegalMonitorStateException("Cannot unlock " + proposedRevision + " because the version is not valid"); + } + + lastModReference.set(new FlowModification(updatedRevision, modifier)); + + // Set stamp to null to indicate that it is not locked. + releaseClaim(); + + // Thread Lock should already be locked if this is called. + threadLock.writeLock().unlock(); + } + + public void cancelWriteLock() { + releaseClaim(); + threadLock.writeLock().unlock(); + } + + /** + * Updates expiration time to the given timestamp + * + * @param timestamp the new expiration timestamp in nanoseconds + */ + public void renewExpiration(final long timestamp) { + final LockStamp stamp = lockStamp.get(); + final String clientId = stamp == null ? null : stamp.getClientId(); + lockStamp.set(new LockStamp(clientId, timestamp)); + } + + public Revision getRevision() { + final FlowModification lastMod = lastModReference.get(); + return (lastMod == null) ? null : lastMod.getRevision(); + } + } + + + private static class LockStamp { + private final String clientId; + private final long expirationTimestamp; + + public LockStamp(final String clientId, final long expirationTimestamp) { + this.clientId = clientId; + this.expirationTimestamp = expirationTimestamp; + } + + public String getClientId() { + return clientId; + } + + public boolean isExpired() { + return System.nanoTime() > expirationTimestamp; + } + + @Override + public String toString() { + return clientId; + } + } + + private static class ClaimResult { + private final boolean successful; + private final FlowModification lastMod; + private final Revision proposedRevision; + + public ClaimResult(final boolean successful, final FlowModification lastMod, final Revision proposedRevision) { + this.successful = successful; + this.lastMod = lastMod; + this.proposedRevision = proposedRevision; + } + + public boolean isSuccessful() { + return successful; + } + + public FlowModification getLastModification() { + return lastMod; + } + + public Revision getProposedRevision() { + return proposedRevision; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/ReadOnlyRevisionCallback.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/ReadOnlyRevisionCallback.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/ReadOnlyRevisionCallback.java new file mode 100644 index 0000000..ce538c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/ReadOnlyRevisionCallback.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import org.apache.nifi.web.Revision; + +public interface ReadOnlyRevisionCallback<T> { + + /** + * Performs some action while a Read Lock is held for the given + * Revision. Note that this method must never change the component + * for which the Read Lock is held! + * + * @param revision the Revision for which a Read Lock is held + * @return the result of applying the logic of this callback + */ + T withRevision(Revision revision); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionClaim.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionClaim.java new file mode 100644 index 0000000..62696b3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionClaim.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import java.util.Set; + +import org.apache.nifi.web.Revision; + +public interface RevisionClaim { + Set<Revision> getRevisions(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionComparator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionComparator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionComparator.java new file mode 100644 index 0000000..16677fb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionComparator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +import java.util.Comparator; + +import org.apache.nifi.web.Revision; + +public class RevisionComparator implements Comparator<Revision> { + + @Override + public int compare(final Revision o1, final Revision o2) { + final int componentComparison = o1.getComponentId().compareTo(o2.getComponentId()); + if (componentComparison != 0) { + return componentComparison; + } + + final int clientComparison = o1.getClientId().compareTo(o2.getClientId()); + if (clientComparison != 0) { + return clientComparison; + } + + return o1.getVersion().compareTo(o2.getVersion()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionLockResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionLockResult.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionLockResult.java new file mode 100644 index 0000000..2ca9d7f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/RevisionLockResult.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.revision; + +public enum RevisionLockResult { + + /** + * A lock was obtained for all Revisions for which a lock was requested. + */ + LOCK_OBTAINED(true), + + /** + * No lock was obtained because one or more of the Revisions was not up to date. + */ + INVALID_REVISION(false); + + + + private final boolean successful; + + private RevisionLockResult(final boolean successful) { + this.successful = successful; + } + + /** + * Indicates whether or not the result provides a successful lock + * + * @return <code>true</code> if a lock was successfully obtained, <code>false</code> otherwise + */ + public boolean isSuccess() { + return successful; + } +}
