http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index b010bf3..b808ae6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -17,15 +17,40 @@ package org.apache.nifi.web.api; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiParam; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; -import io.swagger.annotations.Authorization; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.authorization.AuthorizableLookup; import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.ProcessGroupAuthorizable; import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUser; @@ -34,7 +59,6 @@ import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceState; -import org.apache.nifi.registry.flow.ComponentType; import org.apache.nifi.registry.flow.FlowRegistryUtils; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; @@ -55,9 +79,9 @@ import org.apache.nifi.web.api.dto.VersionedFlowDTO; import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO; import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; import org.apache.nifi.web.api.entity.VersionControlInformationEntity; -import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.VersionedFlowSnapshotEntity; import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity; import org.apache.nifi.web.api.request.ClientIdParameter; @@ -70,37 +94,12 @@ import org.apache.nifi.web.util.Pause; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedHashMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.stream.Collectors; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; @Path("/versions") @Api(value = "/versions", description = "Endpoint for managing version control for a flow") @@ -356,7 +355,10 @@ public class VersionsResource extends ApplicationResource { response = VersionControlInformationEntity.class, notes = NON_GUARANTEED_ENDPOINT, authorizations = { - @Authorization(value = "Read - /process-groups/{uuid}") + @Authorization(value = "Read - /process-groups/{uuid}"), + @Authorization(value = "Write - /process-groups/{uuid}"), + @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"), + @Authorization(value = "Read - any referenced Controller Services by any encapsulated components - /controller-services/{uuid}") }) @ApiResponses(value = { @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @@ -500,9 +502,11 @@ public class VersionsResource extends ApplicationResource { requestEntity, groupRevision, lookup -> { - final Authorizable processGroup = lookup.getProcessGroup(groupId).getAuthorizable(); + final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); + final Authorizable processGroup = groupAuthorizable.getAuthorizable(); processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true); }, () -> { final VersionControlInformationEntity entity = serviceFacade.getVersionControlInformation(groupId); @@ -663,22 +667,21 @@ public class VersionsResource extends ApplicationResource { throw new IllegalArgumentException("The Flow ID must be supplied."); } - // Perform the request if (isReplicateRequest()) { return replicate(HttpMethod.PUT, requestEntity); } - // Determine which components will be affected by updating the version - final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, NiFiUserUtils.getNiFiUser()); - final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); return withWriteLock( serviceFacade, requestEntity, requestRevision, lookup -> { - authorizeAffectedComponents(lookup, affectedComponents); + final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); + final Authorizable processGroup = groupAuthorizable.getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, () -> { // We do not enforce that the Process Group is 'not dirty' because at this point, @@ -691,13 +694,16 @@ public class VersionsResource extends ApplicationResource { // Update the Process Group to match the proposed flow snapshot final VersionControlInformationDTO versionControlInfoDto = new VersionControlInformationDTO(); versionControlInfoDto.setBucketId(snapshotMetadata.getBucketIdentifier()); + versionControlInfoDto.setBucketName(snapshotMetadata.getBucketName()); versionControlInfoDto.setCurrent(true); versionControlInfoDto.setFlowId(snapshotMetadata.getFlowIdentifier()); versionControlInfoDto.setFlowName(snapshotMetadata.getFlowName()); + versionControlInfoDto.setFlowDescription(snapshotMetadata.getFlowDescription()); versionControlInfoDto.setGroupId(groupId); versionControlInfoDto.setModified(false); versionControlInfoDto.setVersion(snapshotMetadata.getVersion()); versionControlInfoDto.setRegistryId(requestEntity.getRegistryId()); + versionControlInfoDto.setRegistryName(serviceFacade.getFlowRegistryName(requestEntity.getRegistryId())); final ProcessGroupEntity updatedGroup = serviceFacade.updateProcessGroup(rev, groupId, versionControlInfoDto, flowSnapshot, getIdGenerationSeed().orElse(null), false); final VersionControlInformationDTO updatedVci = updatedGroup.getComponent().getVersionControlInformation(); @@ -769,6 +775,13 @@ public class VersionsResource extends ApplicationResource { updateRequestDto.setProcessGroupId(asyncRequest.getProcessGroupId()); updateRequestDto.setRequestId(requestId); updateRequestDto.setUri(generateResourceUri("versions", requestType, requestId)); + updateRequestDto.setState(asyncRequest.getState()); + updateRequestDto.setPercentComplete(asyncRequest.getPercentComplete()); + + if (updateRequestDto.isComplete()) { + final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(asyncRequest.getProcessGroupId()); + updateRequestDto.setVersionControlInformation(vciEntity == null ? null : vciEntity.getVersionControlInformation()); + } final RevisionDTO groupRevision = serviceFacade.getProcessGroup(asyncRequest.getProcessGroupId()).getRevision(); @@ -830,6 +843,13 @@ public class VersionsResource extends ApplicationResource { final NiFiUser user = NiFiUserUtils.getNiFiUser(); final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest = requestManager.removeRequest(requestType, requestId, user); + if (asyncRequest == null) { + throw new ResourceNotFoundException("Could not find request of type " + requestType + " with ID " + requestId); + } + + if (!asyncRequest.isComplete()) { + asyncRequest.cancel(); + } final VersionedFlowUpdateRequestDTO updateRequestDto = new VersionedFlowUpdateRequestDTO(); updateRequestDto.setComplete(asyncRequest.isComplete()); @@ -838,6 +858,13 @@ public class VersionsResource extends ApplicationResource { updateRequestDto.setProcessGroupId(asyncRequest.getProcessGroupId()); updateRequestDto.setRequestId(requestId); updateRequestDto.setUri(generateResourceUri("versions", requestType, requestId)); + updateRequestDto.setPercentComplete(asyncRequest.getPercentComplete()); + updateRequestDto.setState(asyncRequest.getState()); + + if (updateRequestDto.isComplete()) { + final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(asyncRequest.getProcessGroupId()); + updateRequestDto.setVersionControlInformation(vciEntity == null ? null : vciEntity.getVersionControlInformation()); + } final RevisionDTO groupRevision = serviceFacade.getProcessGroup(asyncRequest.getProcessGroupId()).getRevision(); @@ -861,7 +888,10 @@ public class VersionsResource extends ApplicationResource { notes = NON_GUARANTEED_ENDPOINT, authorizations = { @Authorization(value = "Read - /process-groups/{uuid}"), - @Authorization(value = "Write - /process-groups/{uuid}") + @Authorization(value = "Write - /process-groups/{uuid}"), + @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"), + @Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components"), + @Authorization(value = "Write - if the template contains any restricted components - /restricted-components") }) @ApiResponses(value = { @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @@ -924,7 +954,7 @@ public class VersionsResource extends ApplicationResource { // a. Component itself is modified in some way, other than position changing. // b. Source and Destination of any Connection that is modified. // c. Any Processor or Controller Service that references a Controller Service that is modified. - // 2. Verify READ and WRITE permissions for user, for every component affected. + // 2. Verify READ and WRITE permissions for user, for every component. // 3. Verify that all components in the snapshot exist on all nodes (i.e., the NAR exists)? // 4. Verify that Process Group is already under version control. If not, must start Version Control instead of updateFlow // 5. Verify that Process Group is not 'dirty'. @@ -961,8 +991,13 @@ public class VersionsResource extends ApplicationResource { requestEntity, requestRevision, lookup -> { - // Step 2: Verify READ and WRITE permissions for user, for every component affected. - authorizeAffectedComponents(lookup, affectedComponents); + // Step 2: Verify READ and WRITE permissions for user, for every component. + final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); + final Authorizable processGroup = groupAuthorizable.getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true); + super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true); final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents(); final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents); @@ -980,7 +1015,7 @@ public class VersionsResource extends ApplicationResource { // Create an asynchronous request that will occur in the background, because this request may // result in stopping components, which can take an indeterminate amount of time. final String requestId = UUID.randomUUID().toString(); - final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user); + final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors"); // Submit the request to be performed in the background final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> { @@ -1005,6 +1040,8 @@ public class VersionsResource extends ApplicationResource { updateRequestDto.setProcessGroupId(groupId); updateRequestDto.setRequestId(requestId); updateRequestDto.setUri(generateResourceUri("versions", "update-requests", requestId)); + updateRequestDto.setPercentComplete(request.getPercentComplete()); + updateRequestDto.setState(request.getState()); final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity(); final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revision); @@ -1029,7 +1066,10 @@ public class VersionsResource extends ApplicationResource { notes = NON_GUARANTEED_ENDPOINT, authorizations = { @Authorization(value = "Read - /process-groups/{uuid}"), - @Authorization(value = "Write - /process-groups/{uuid}") + @Authorization(value = "Write - /process-groups/{uuid}"), + @Authorization(value = "Read - /{component-type}/{uuid} - For all encapsulated components"), + @Authorization(value = "Write - /{component-type}/{uuid} - For all encapsulated components"), + @Authorization(value = "Write - if the template contains any restricted components - /restricted-components") }) @ApiResponses(value = { @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @@ -1095,8 +1135,13 @@ public class VersionsResource extends ApplicationResource { requestEntity, requestRevision, lookup -> { - // Step 2: Verify READ and WRITE permissions for user, for every component affected. - authorizeAffectedComponents(lookup, affectedComponents); + // Step 2: Verify READ and WRITE permissions for user, for every component. + final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(groupId); + final Authorizable processGroup = groupAuthorizable.getAuthorizable(); + processGroup.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); + processGroup.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, false, true, true); + super.authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.WRITE, true, false, true, true); final VersionedProcessGroup groupContents = flowSnapshot.getFlowContents(); final boolean containsRestrictedComponents = FlowRegistryUtils.containsRestrictedComponent(groupContents); @@ -1134,7 +1179,7 @@ public class VersionsResource extends ApplicationResource { // If the information passed in is correct, but there have been no changes, there is nothing to do - just register the request, mark it complete, and return. if (currentVersion.getModified() == Boolean.FALSE) { final String requestId = UUID.randomUUID().toString(); - final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user); + final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Complete"); requestManager.submitRequest("revert-requests", requestId, request, task -> { }); @@ -1145,7 +1190,10 @@ public class VersionsResource extends ApplicationResource { updateRequestDto.setLastUpdated(new Date()); updateRequestDto.setProcessGroupId(groupId); updateRequestDto.setRequestId(requestId); + updateRequestDto.setVersionControlInformation(currentVersion); updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId)); + updateRequestDto.setPercentComplete(100); + updateRequestDto.setState(request.getState()); final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity(); updateRequestEntity.setProcessGroupRevision(revisionDto); @@ -1159,19 +1207,18 @@ public class VersionsResource extends ApplicationResource { // Create an asynchronous request that will occur in the background, because this request may // result in stopping components, which can take an indeterminate amount of time. final String requestId = UUID.randomUUID().toString(); - final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user); + final AsynchronousWebRequest<VersionControlInformationEntity> request = new StandardAsynchronousWebRequest<>(requestId, groupId, user, "Stopping Processors"); // Submit the request to be performed in the background final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> { try { - // TODO: change the URI to the new endpoint for 'revert' instead of 'change version' final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, affectedComponents, user, replicateRequest, requestEntity, flowSnapshot, request, idGenerationSeed, false); vcur.markComplete(updatedVersionControlEntity); } catch (final LifecycleManagementException e) { logger.error("Failed to update flow to new version", e); - vcur.setFailureReason("Failed to update flow to new version due to " + e); + vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage()); } }; @@ -1201,7 +1248,6 @@ public class VersionsResource extends ApplicationResource { final boolean verifyNotModified) throws LifecycleManagementException { // Steps 6-7: Determine which components must be stopped and stop them. - // Do we need to stop other types? Input Ports, Output Ports, Funnels, RPGs, etc. final Set<String> stoppableReferenceTypes = new HashSet<>(); stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR); stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT); @@ -1215,7 +1261,11 @@ public class VersionsResource extends ApplicationResource { logger.info("Stopping {} Processors", runningComponents.size()); final Pause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); componentLifecycle.scheduleComponents(exampleUri, user, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause); - asyncRequest.setLastUpdated(new Date()); + + if (asyncRequest.isCancelled()) { + return null; + } + asyncRequest.update(new Date(), "Disabling Affected Controller Services", 20); // Steps 8-9. Disable enabled controller services that are affected final Set<AffectedComponentEntity> enabledServices = affectedComponents.stream() @@ -1226,7 +1276,11 @@ public class VersionsResource extends ApplicationResource { logger.info("Disabling {} Controller Services", enabledServices.size()); final Pause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); componentLifecycle.activateControllerServices(exampleUri, user, groupId, enabledServices, ControllerServiceState.DISABLED, disableServicesPause); - asyncRequest.setLastUpdated(new Date()); + + if (asyncRequest.isCancelled()) { + return null; + } + asyncRequest.update(new Date(), "Updating Flow", 40); logger.info("Updating Process Group with ID {} to version {} of the Versioned Flow", groupId, flowSnapshot.getSnapshotMetadata().getVersion()); // If replicating request, steps 10-12 are performed on each node individually, and this is accomplished @@ -1281,21 +1335,32 @@ public class VersionsResource extends ApplicationResource { serviceFacade.updateProcessGroupContents(user, revision, groupId, vci, flowSnapshot, idGenerationSeed, verifyNotModified, false); } - asyncRequest.setLastUpdated(new Date()); + if (asyncRequest.isCancelled()) { + return null; + } + asyncRequest.update(new Date(), "Re-Enabling Controller Services", 60); // Step 13. Re-enable all disabled controller services final Pause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user); logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size()); componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause); - asyncRequest.setLastUpdated(new Date()); + + if (asyncRequest.isCancelled()) { + return null; + } + asyncRequest.update(new Date(), "Restarting Processors", 80); // Step 14. Restart all components final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents, user); final Pause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS); logger.info("Restarting {} Processors", componentsToStart.size()); componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause); - asyncRequest.setLastUpdated(new Date()); + + if (asyncRequest.isCancelled()) { + return null; + } + asyncRequest.update(new Date(), "Complete", 100); return serviceFacade.getVersionControlInformation(groupId); } @@ -1318,42 +1383,6 @@ public class VersionsResource extends ApplicationResource { } - private void authorizeAffectedComponents(final AuthorizableLookup lookup, final Set<AffectedComponentEntity> affectedComponents) { - final Map<String, List<AffectedComponentEntity>> componentsByType = affectedComponents.stream() - .collect(Collectors.groupingBy(entity -> entity.getComponent().getReferenceType())); - - authorize(componentsByType.get(ComponentType.PROCESSOR.name()), id -> lookup.getProcessor(id).getAuthorizable()); - authorize(componentsByType.get(ComponentType.CONTROLLER_SERVICE.name()), id -> lookup.getControllerService(id).getAuthorizable()); - - authorize(componentsByType.get(ComponentType.CONNECTION.name()), id -> lookup.getConnection(id).getAuthorizable()); - authorize(componentsByType.get(ComponentType.FUNNEL.name()), id -> lookup.getFunnel(id)); - authorize(componentsByType.get(ComponentType.INPUT_PORT.name()), id -> lookup.getInputPort(id)); - authorize(componentsByType.get(ComponentType.OUTPUT_PORT.name()), id -> lookup.getOutputPort(id)); - authorize(componentsByType.get(ComponentType.LABEL.name()), id -> lookup.getLabel(id)); - - authorize(componentsByType.get(ComponentType.PROCESS_GROUP.name()), id -> lookup.getProcessGroup(id).getAuthorizable()); - authorize(componentsByType.get(ComponentType.REMOTE_PROCESS_GROUP.name()), id -> lookup.getRemoteProcessGroup(id)); - - - // Remote Input Ports and Remote Output Ports are not authorized independently but rather at the Remote Process Group level, - // so we have to treat these a little differently. - componentsByType.getOrDefault(ComponentType.REMOTE_INPUT_PORT.name(), Collections.emptyList()).stream() - .forEach(affectedPort -> { - final String rpgId = affectedPort.getComponent().getProcessGroupId(); - final Authorizable rpg = lookup.getRemoteProcessGroup(rpgId); - rpg.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); - rpg.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - - componentsByType.getOrDefault(ComponentType.REMOTE_OUTPUT_PORT.name(), Collections.emptyList()).stream() - .forEach(affectedPort -> { - final String rpgId = affectedPort.getComponent().getProcessGroupId(); - final Authorizable rpg = lookup.getRemoteProcessGroup(rpgId); - rpg.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); - rpg.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - private Set<AffectedComponentEntity> getUpdatedEntities(final Set<AffectedComponentEntity> originalEntities, final NiFiUser user) { final Set<AffectedComponentEntity> entities = new LinkedHashSet<>(); @@ -1373,17 +1402,6 @@ public class VersionsResource extends ApplicationResource { } - private void authorize(final List<AffectedComponentEntity> componentDtos, final Function<String, Authorizable> authFunction) { - if (componentDtos != null) { - for (final AffectedComponentEntity entity : componentDtos) { - final Authorizable authorizable = authFunction.apply(entity.getComponent().getId()); - authorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser()); - authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - } - } - } - - public void setServiceFacade(NiFiServiceFacade serviceFacade) { this.serviceFacade = serviceFacade; }
http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java index 4b87b50..5dcb125 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java @@ -110,7 +110,6 @@ public class AsyncRequestManager<T> implements RequestManager<T> { } catch (final Exception e) { logger.error("Failed to perform asynchronous task", e); request.setFailureReason("Encountered unexpected error when performing asynchronous task: " + e); - request.setLastUpdated(new Date()); } } }); http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java index 2c14008..1309eee 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java @@ -39,11 +39,23 @@ public interface AsynchronousWebRequest<T> { Date getLastUpdated(); /** - * Updates the Date at which the status of this request was last updated + * @return the current state of the request + */ + public String getState(); + + /** + * @return the current percent complete, between 0 and 100 (inclusive) + */ + public int getPercentComplete(); + + /** + * Updates the request to indicate the new state and percent complete * - * @param date the date at which the status of this request was last updated + * @param date the last updated time + * @param state the new state + * @param percentComplete The percentage complete, between 0 and 100 (inclusive) */ - void setLastUpdated(Date date); + void update(Date date, String state, int percentComplete); /** * @return the user who submitted the request @@ -77,4 +89,14 @@ public interface AsynchronousWebRequest<T> { * @return the results of the request, if it completed successfully, or <code>null</code> if the request either has no completed or failed */ T getResults(); + + /** + * Cancels the request so that no more steps can be completed + */ + void cancel(); + + /** + * @return <code>true</code> if the request has been canceled, <code>false</code> otherwise + */ + boolean isCancelled(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java index 8ba9a58..4810a32 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java @@ -29,13 +29,17 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest private volatile boolean complete = false; private volatile Date lastUpdated = new Date(); + private volatile String state; + private volatile int percentComplete; private volatile String failureReason; + private volatile boolean cancelled; private volatile T results; - public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user) { + public StandardAsynchronousWebRequest(final String requestId, final String processGroupId, final NiFiUser user, final String state) { this.id = requestId; this.processGroupId = processGroupId; this.user = user; + this.state = state; } public String getRequestId() { @@ -57,6 +61,8 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest this.complete = true; this.results = results; this.lastUpdated = new Date(); + this.percentComplete = 100; + this.state = "Complete"; } @Override @@ -65,8 +71,34 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest } @Override - public void setLastUpdated(final Date date) { - this.lastUpdated = lastUpdated; + public String getState() { + return state; + } + + @Override + public int getPercentComplete() { + return percentComplete; + } + + @Override + public void update(Date date, String state, int percentComplete) { + if (percentComplete < 0 || percentComplete > 100) { + throw new IllegalArgumentException("Cannot set percent complete to a value of " + percentComplete + "; it must be between 0 and 100."); + } + + if (isCancelled()) { + throw new IllegalStateException("Cannot update state because request has already been cancelled by user"); + } + + if (isComplete()) { + final String failure = getFailureReason(); + final String explanation = failure == null ? "successfully" : "with failure reason: " + failure; + throw new IllegalStateException("Cannot update state to '" + state + "' because request is already completed " + explanation); + } + + this.lastUpdated = date; + this.state = state; + this.percentComplete = percentComplete; } @Override @@ -79,6 +111,7 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest this.failureReason = Objects.requireNonNull(explanation); this.complete = true; this.results = null; + this.lastUpdated = new Date(); } @Override @@ -90,4 +123,17 @@ public class StandardAsynchronousWebRequest<T> implements AsynchronousWebRequest public T getResults() { return results; } + + @Override + public void cancel() { + this.cancelled = true; + percentComplete = 100; + state = "Canceled by user"; + setFailureReason("Request cancelled by user"); + } + + @Override + public boolean isCancelled() { + return cancelled; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 3639b18..1c1e729 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -115,8 +115,12 @@ import org.apache.nifi.registry.ComponentVariableRegistry; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.registry.flow.VersionedComponent; +import org.apache.nifi.registry.flow.diff.FlowComparison; +import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedFunnel; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedLabel; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedPort; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; @@ -2174,6 +2178,38 @@ public final class DtoFactory { return dto; } + + public Set<ComponentDifferenceDTO> createComponentDifferenceDtos(final FlowComparison comparison) { + final Map<ComponentDifferenceDTO, List<String>> differencesByComponent = new HashMap<>(); + + for (final FlowDifference difference : comparison.getDifferences()) { + final ComponentDifferenceDTO componentDiff = createComponentDifference(difference); + final List<String> differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>()); + differences.add(difference.getDescription()); + } + + for (final Map.Entry<ComponentDifferenceDTO, List<String>> entry : differencesByComponent.entrySet()) { + entry.getKey().setDifferences(entry.getValue()); + } + + return differencesByComponent.keySet(); + } + + private ComponentDifferenceDTO createComponentDifference(final FlowDifference difference) { + VersionedComponent component = difference.getComponentA(); + if (component == null) { + component = difference.getComponentB(); + } + + final ComponentDifferenceDTO dto = new ComponentDifferenceDTO(); + dto.setComponentId(component.getIdentifier()); + dto.setComponentName(component.getName()); + dto.setComponentType(component.getComponentType().name()); + dto.setProcessGroupId(dto.getProcessGroupId()); + return dto; + } + + public VersionControlInformationDTO createVersionControlInformationDto(final ProcessGroup group) { if (group == null) { return null; @@ -2187,10 +2223,12 @@ public final class DtoFactory { final VersionControlInformationDTO dto = new VersionControlInformationDTO(); dto.setGroupId(group.getIdentifier()); dto.setRegistryId(versionControlInfo.getRegistryIdentifier()); + dto.setRegistryName(versionControlInfo.getRegistryName()); dto.setBucketId(versionControlInfo.getBucketIdentifier()); + dto.setBucketName(versionControlInfo.getBucketName()); dto.setFlowId(versionControlInfo.getFlowIdentifier()); - // TODO - need to get flow name here - dto.setFlowName(group.getName()); + dto.setFlowName(versionControlInfo.getFlowName()); + dto.setFlowDescription(versionControlInfo.getFlowDescription()); dto.setVersion(versionControlInfo.getVersion()); dto.setCurrent(versionControlInfo.getCurrent().orElse(null)); dto.setModified(versionControlInfo.getModified().orElse(null)); @@ -2204,6 +2242,9 @@ public final class DtoFactory { group.getProcessors().stream() .map(proc -> (InstantiatedVersionedProcessor) proc) .forEach(proc -> mapping.put(proc.getInstanceId(), proc.getIdentifier())); + group.getFunnels().stream() + .map(funnel -> (InstantiatedVersionedFunnel) funnel) + .forEach(funnel -> mapping.put(funnel.getInstanceId(), funnel.getIdentifier())); group.getInputPorts().stream() .map(port -> (InstantiatedVersionedPort) port) .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); @@ -2224,13 +2265,17 @@ public final class DtoFactory { .forEach(rpg -> { mapping.put(rpg.getInstanceId(), rpg.getIdentifier()); - rpg.getInputPorts().stream() - .map(port -> (InstantiatedVersionedRemoteGroupPort) port) - .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + if (rpg.getInputPorts() != null) { + rpg.getInputPorts().stream() + .map(port -> (InstantiatedVersionedRemoteGroupPort) port) + .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + } - rpg.getOutputPorts().stream() - .map(port -> (InstantiatedVersionedRemoteGroupPort) port) - .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + if (rpg.getOutputPorts() != null) { + rpg.getOutputPorts().stream() + .map(port -> (InstantiatedVersionedRemoteGroupPort) port) + .forEach(port -> mapping.put(port.getInstanceId(), port.getIdentifier())); + } }); group.getProcessGroups().stream() @@ -3407,9 +3452,12 @@ public final class DtoFactory { final VersionControlInformationDTO copy = new VersionControlInformationDTO(); copy.setRegistryId(original.getRegistryId()); + copy.setRegistryName(original.getRegistryName()); copy.setBucketId(original.getBucketId()); + copy.setBucketName(original.getBucketName()); copy.setFlowId(original.getFlowId()); copy.setFlowName(original.getFlowName()); + copy.setFlowDescription(original.getFlowDescription()); copy.setVersion(original.getVersion()); copy.setCurrent(original.getCurrent()); copy.setModified(original.getModified()); http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java index 19f2de4..4f5af74 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/FlowRegistryDAO.java @@ -17,8 +17,14 @@ package org.apache.nifi.web.dao.impl; +import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.client.NiFiRegistryException; import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.flow.VersionedFlow; @@ -71,37 +77,48 @@ public class FlowRegistryDAO implements RegistryDAO { throw new IllegalArgumentException("The specified registry id is unknown to this NiFi."); } - return flowRegistry.getBuckets(user); - } catch (final IOException ioe) { - throw new NiFiCoreException("Unable to obtain bucket listing: " + ioe.getMessage(), ioe); + final Set<Bucket> buckets = flowRegistry.getBuckets(user); + final Set<Bucket> sortedBuckets = new TreeSet<>((b1, b2) -> b1.getName().compareTo(b2.getName())); + sortedBuckets.addAll(buckets); + return sortedBuckets; + } catch (final IOException | NiFiRegistryException ioe) { + throw new NiFiCoreException("Unable to obtain listing of buckets: " + ioe, ioe); } } @Override public Set<VersionedFlow> getFlowsForUser(String registryId, String bucketId, NiFiUser user) { - final Set<Bucket> bucketsForUser = getBucketsForUser(registryId, user); + try { + final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); + if (flowRegistry == null) { + throw new IllegalArgumentException("The specified registry id is unknown to this NiFi."); + } - // TODO - implement getBucket(bucketId, user) - final Bucket bucket = bucketsForUser.stream().filter(b -> b.getIdentifier().equals(bucketId)).findFirst().orElse(null); - if (bucket == null) { - throw new IllegalArgumentException("The specified bucket is not available."); + final Set<VersionedFlow> flows = flowRegistry.getFlows(bucketId, user); + final Set<VersionedFlow> sortedFlows = new TreeSet<>((f1, f2) -> f1.getName().compareTo(f2.getName())); + sortedFlows.addAll(flows); + return sortedFlows; + } catch (final IOException | NiFiRegistryException ioe) { + throw new NiFiCoreException("Unable to obtain listing of flows for bucket with ID " + bucketId + ": " + ioe, ioe); } - - return bucket.getVersionedFlows(); } @Override public Set<VersionedFlowSnapshotMetadata> getFlowVersionsForUser(String registryId, String bucketId, String flowId, NiFiUser user) { - final Set<VersionedFlow> flowsForUser = getFlowsForUser(registryId, bucketId, user); + try { + final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId); + if (flowRegistry == null) { + throw new IllegalArgumentException("The specified registry id is unknown to this NiFi."); + } - // TODO - implement getFlow(bucketId, flowId, user) - final VersionedFlow versionedFlow = flowsForUser.stream().filter(vf -> vf.getIdentifier().equals(flowId)).findFirst().orElse(null); - if (versionedFlow == null) { - throw new IllegalArgumentException("The specified flow is not available."); + final Set<VersionedFlowSnapshotMetadata> flowVersions = flowRegistry.getFlowVersions(bucketId, flowId, user); + final Set<VersionedFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>((f1, f2) -> Integer.compare(f1.getVersion(), f2.getVersion())); + sortedFlowVersions.addAll(flowVersions); + return sortedFlowVersions; + } catch (final IOException | NiFiRegistryException ioe) { + throw new NiFiCoreException("Unable to obtain listing of versions for bucket with ID " + bucketId + " and flow with ID " + flowId + ": " + ioe, ioe); } - - return versionedFlow.getSnapshotMetadata(); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java index 35c537d..f830e9b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java @@ -38,7 +38,11 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO { private Port locatePort(final String portId) { final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId()); - final Port port = rootGroup.findInputPort(portId); + Port port = rootGroup.findInputPort(portId); + + if (port == null) { + port = rootGroup.findOutputPort(portId); + } if (port == null) { throw new ResourceNotFoundException(String.format("Unable to find port with id '%s'.", portId)); @@ -50,7 +54,7 @@ public class StandardInputPortDAO extends ComponentDAO implements PortDAO { @Override public boolean hasPort(String portId) { final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId()); - return rootGroup.findInputPort(portId) != null; + return rootGroup.findInputPort(portId) != null || rootGroup.findOutputPort(portId) != null; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index 963220e..78f3e31 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -25,6 +25,7 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.registry.flow.FlowRegistry; import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; @@ -238,11 +239,15 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final ProcessGroup group = locateProcessGroup(flowController, groupId); final String registryId = versionControlInformation.getRegistryId(); - final String bucketId = versionControlInformation.getBucketId(); - final String flowId = versionControlInformation.getFlowId(); - final int version = versionControlInformation.getVersion(); + final FlowRegistry flowRegistry = flowController.getFlowRegistryClient().getFlowRegistry(registryId); + final String registryName = flowRegistry == null ? registryId : flowRegistry.getName(); + + final StandardVersionControlInformation vci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation) + .registryName(registryName) + .modified(false) + .current(true) + .build(); - final VersionControlInformation vci = new StandardVersionControlInformation(registryId, bucketId, flowId, version, null, false, true); group.setVersionControlInformation(vci, versionedComponentMapping); return group; @@ -261,14 +266,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final ProcessGroup group = locateProcessGroup(flowController, groupId); group.updateFlow(proposedSnapshot, componentIdSeed, verifyNotModified, updateSettings); - final StandardVersionControlInformation svci = new StandardVersionControlInformation( - versionControlInformation.getRegistryId(), - versionControlInformation.getBucketId(), - versionControlInformation.getFlowId(), - versionControlInformation.getVersion(), - proposedSnapshot.getFlowContents(), - versionControlInformation.getModified(), - versionControlInformation.getCurrent()); + final StandardVersionControlInformation svci = StandardVersionControlInformation.Builder.fromDto(versionControlInformation) + .flowSnapshot(proposedSnapshot.getFlowContents()) + .build(); group.setVersionControlInformation(svci, Collections.emptyMap()); return group; http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java index a6efb71..1f83a6f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java @@ -24,9 +24,10 @@ public class CancellableTimedPause implements Pause { private final long pauseNanos; private volatile boolean cancelled = false; - public CancellableTimedPause(final long pauseTime, final long expirationTime, final TimeUnit timeUnit) { - final long expirationNanos = TimeUnit.NANOSECONDS.convert(expirationTime, timeUnit); - expirationNanoTime = System.nanoTime() + expirationNanos; + public CancellableTimedPause(final long pauseTime, final long expirationPeriod, final TimeUnit timeUnit) { + final long expirationNanos = TimeUnit.NANOSECONDS.convert(expirationPeriod, timeUnit); + final long expirationTime = System.nanoTime() + expirationNanos; + expirationNanoTime = expirationTime < 0 ? Long.MAX_VALUE : expirationTime; pauseNanos = Math.max(1L, TimeUnit.NANOSECONDS.convert(pauseTime, timeUnit)); } @@ -44,7 +45,7 @@ public class CancellableTimedPause implements Pause { final long maxWaitTime = System.nanoTime() + pauseNanos; while (sysTime < maxWaitTime) { try { - TimeUnit.NANOSECONDS.wait(pauseNanos); + TimeUnit.NANOSECONDS.sleep(pauseNanos); } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); return false; http://git-wip-us.apache.org/repos/asf/nifi/blob/6b00dff1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 490862c..584cac6 100644 --- a/pom.xml +++ b/pom.xml @@ -1641,6 +1641,12 @@ <version>${nifi.registry.version}</version> </dependency> <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-client</artifactId> + <version>${nifi.registry.version}</version> + </dependency> + + <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> <version>2.0.0</version>
