NIFI-4436: - Addressing PR feedback. - Addressing two phase commit logic issue when changing the flow version.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b3e1584e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b3e1584e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b3e1584e Branch: refs/heads/master Commit: b3e1584ef4f5b7de8cf8517a8b950125f82832cb Parents: 63544c8 Author: Matt Gilman <[email protected]> Authored: Wed Jan 3 17:16:57 2018 -0500 Committer: Bryan Bende <[email protected]> Committed: Mon Jan 8 12:44:57 2018 -0500 ---------------------------------------------------------------------- .../apache/nifi/web/api/VersionsResource.java | 154 +++++++++++++------ .../main/webapp/js/nf/canvas/nf-flow-version.js | 27 +++- 2 files changed, 134 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b3e1584e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index af9a515..53dc091 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -58,6 +58,7 @@ import org.apache.nifi.web.api.dto.VersionedFlowDTO; import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO; import org.apache.nifi.web.api.entity.AffectedComponentEntity; import org.apache.nifi.web.api.entity.CreateActiveRequestEntity; +import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; @@ -724,19 +725,19 @@ public class VersionsResource extends ApplicationResource { throw new IllegalArgumentException("Process Group Revision must be specified."); } - final VersionedFlowSnapshot flowSnapshot = requestEntity.getVersionedFlowSnapshot(); - if (flowSnapshot == null) { + final VersionedFlowSnapshot requestFlowSnapshot = requestEntity.getVersionedFlowSnapshot(); + if (requestFlowSnapshot == null) { throw new IllegalArgumentException("Versioned Flow Snapshot must be supplied."); } - final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata(); - if (snapshotMetadata == null) { + final VersionedFlowSnapshotMetadata requestSnapshotMetadata = requestFlowSnapshot.getSnapshotMetadata(); + if (requestSnapshotMetadata == null) { throw new IllegalArgumentException("Snapshot Metadata must be supplied."); } - if (snapshotMetadata.getBucketIdentifier() == null) { + if (requestSnapshotMetadata.getBucketIdentifier() == null) { throw new IllegalArgumentException("The Bucket ID must be supplied."); } - if (snapshotMetadata.getFlowIdentifier() == null) { + if (requestSnapshotMetadata.getFlowIdentifier() == null) { throw new IllegalArgumentException("The Flow ID must be supplied."); } @@ -761,9 +762,12 @@ public class VersionsResource extends ApplicationResource { // the client has explicitly indicated the dataflow that the Process Group should // provide and provided the Revision to ensure that they have the most up-to-date // view of the Process Group. - serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, false); + serviceFacade.verifyCanUpdate(groupId, requestFlowSnapshot, true, false); }, (rev, entity) -> { + final VersionedFlowSnapshot flowSnapshot = entity.getVersionedFlowSnapshot(); + final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata(); + final Bucket bucket = flowSnapshot.getBucket(); final VersionedFlow flow = flowSnapshot.getFlow(); @@ -1000,7 +1004,7 @@ public class VersionsResource extends ApplicationResource { }) public Response initiateVersionControlUpdate( @ApiParam("The process group id.") @PathParam("id") final String groupId, - @ApiParam(value = "The controller service configuration details.", required = true) final VersionControlInformationEntity requestEntity) throws IOException { + @ApiParam(value = "The controller service configuration details.", required = true) final VersionControlInformationEntity requestEntity) { // Verify the request final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); @@ -1008,26 +1012,26 @@ public class VersionsResource extends ApplicationResource { throw new IllegalArgumentException("Process Group Revision must be specified"); } - final VersionControlInformationDTO versionControlInfoDto = requestEntity.getVersionControlInformation(); - if (versionControlInfoDto == null) { + final VersionControlInformationDTO requestVersionControlInfoDto = requestEntity.getVersionControlInformation(); + if (requestVersionControlInfoDto == null) { throw new IllegalArgumentException("Version Control Information must be supplied."); } - if (versionControlInfoDto.getGroupId() == null) { + if (requestVersionControlInfoDto.getGroupId() == null) { throw new IllegalArgumentException("The Process Group ID must be supplied."); } - if (!versionControlInfoDto.getGroupId().equals(groupId)) { + if (!requestVersionControlInfoDto.getGroupId().equals(groupId)) { throw new IllegalArgumentException("The Process Group ID in the request body does not match the Process Group ID of the requested resource."); } - if (versionControlInfoDto.getBucketId() == null) { + if (requestVersionControlInfoDto.getBucketId() == null) { throw new IllegalArgumentException("The Bucket ID must be supplied."); } - if (versionControlInfoDto.getFlowId() == null) { + if (requestVersionControlInfoDto.getFlowId() == null) { throw new IllegalArgumentException("The Flow ID must be supplied."); } - if (versionControlInfoDto.getRegistryId() == null) { + if (requestVersionControlInfoDto.getRegistryId() == null) { throw new IllegalArgumentException("The Registry ID must be supplied."); } - if (versionControlInfoDto.getVersion() == null) { + if (requestVersionControlInfoDto.getVersion() == null) { throw new IllegalArgumentException("The Version of the flow must be supplied."); } @@ -1037,7 +1041,6 @@ public class VersionsResource extends ApplicationResource { final boolean replicateRequest = isReplicateRequest(); final ComponentLifecycle componentLifecycle = replicateRequest ? clusterComponentLifecycle : localComponentLifecycle; final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final String idGenerationSeed = getIdGenerationSeed().orElse(null); // Workflow for this process: @@ -1082,11 +1085,14 @@ public class VersionsResource extends ApplicationResource { // Step 1: Determine which components will be affected by updating the version final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user); - final URI exampleUri = getAbsolutePath(); + // build a request wrapper + final InitiateChangeFlowVersionRequestWrapper requestWrapper = new InitiateChangeFlowVersionRequestWrapper(requestEntity, componentLifecycle, getAbsolutePath(), affectedComponents, + replicateRequest, flowSnapshot); + final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); return withWriteLock( serviceFacade, - requestEntity, + requestWrapper, requestRevision, lookup -> { // Step 2: Verify READ and WRITE permissions for user, for every component. @@ -1106,7 +1112,9 @@ public class VersionsResource extends ApplicationResource { // Step 5: Verify that Process Group is not 'dirty' serviceFacade.verifyCanUpdate(groupId, flowSnapshot, false, true); }, - (revision, processGroupEntity) -> { + (revision, wrapper) -> { + final String idGenerationSeed = getIdGenerationSeed().orElse(null); + // Create an asynchronous request that will occur in the background, because this request may // result in stopping components, which can take an indeterminate amount of time. final String requestId = UUID.randomUUID().toString(); @@ -1115,8 +1123,9 @@ public class VersionsResource extends ApplicationResource { // Submit the request to be performed in the background final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> { try { - final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, - affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, true, true); + final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, wrapper.getComponentLifecycle(), wrapper.getExampleUri(), + wrapper.getAffectedComponents(), user, wrapper.isReplicateRequest(), revision, wrapper.getVersionControlInformationEntity(), wrapper.getFlowSnapshot(), request, + idGenerationSeed, true, true); vcur.markComplete(updatedVersionControlEntity); } catch (final Exception e) { @@ -1139,7 +1148,7 @@ public class VersionsResource extends ApplicationResource { updateRequestDto.setState(request.getState()); final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity(); - final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revision); + final RevisionDTO groupRevision = serviceFacade.getProcessGroup(groupId).getRevision(); updateRequestEntity.setProcessGroupRevision(groupRevision); updateRequestEntity.setRequest(updateRequestDto); @@ -1187,26 +1196,26 @@ public class VersionsResource extends ApplicationResource { throw new IllegalArgumentException("Process Group Revision must be specified"); } - final VersionControlInformationDTO versionControlInfoDto = requestEntity.getVersionControlInformation(); - if (versionControlInfoDto == null) { + final VersionControlInformationDTO requestVersionControlInfoDto = requestEntity.getVersionControlInformation(); + if (requestVersionControlInfoDto == null) { throw new IllegalArgumentException("Version Control Information must be supplied."); } - if (versionControlInfoDto.getGroupId() == null) { + if (requestVersionControlInfoDto.getGroupId() == null) { throw new IllegalArgumentException("The Process Group ID must be supplied."); } - if (!versionControlInfoDto.getGroupId().equals(groupId)) { + if (!requestVersionControlInfoDto.getGroupId().equals(groupId)) { throw new IllegalArgumentException("The Process Group ID in the request body does not match the Process Group ID of the requested resource."); } - if (versionControlInfoDto.getBucketId() == null) { + if (requestVersionControlInfoDto.getBucketId() == null) { throw new IllegalArgumentException("The Bucket ID must be supplied."); } - if (versionControlInfoDto.getFlowId() == null) { + if (requestVersionControlInfoDto.getFlowId() == null) { throw new IllegalArgumentException("The Flow ID must be supplied."); } - if (versionControlInfoDto.getRegistryId() == null) { + if (requestVersionControlInfoDto.getRegistryId() == null) { throw new IllegalArgumentException("The Registry ID must be supplied."); } - if (versionControlInfoDto.getVersion() == null) { + if (requestVersionControlInfoDto.getVersion() == null) { throw new IllegalArgumentException("The Version of the flow must be supplied."); } @@ -1216,7 +1225,6 @@ public class VersionsResource extends ApplicationResource { final boolean replicateRequest = isReplicateRequest(); final ComponentLifecycle componentLifecycle = replicateRequest ? clusterComponentLifecycle : localComponentLifecycle; final NiFiUser user = NiFiUserUtils.getNiFiUser(); - final String idGenerationSeed = getIdGenerationSeed().orElse(null); // Step 0: Get the Versioned Flow Snapshot from the Flow Registry final VersionedFlowSnapshot flowSnapshot = serviceFacade.getVersionedFlowSnapshot(requestEntity.getVersionControlInformation(), true); @@ -1228,11 +1236,14 @@ public class VersionsResource extends ApplicationResource { // Step 1: Determine which components will be affected by updating the version final Set<AffectedComponentEntity> affectedComponents = serviceFacade.getComponentsAffectedByVersionChange(groupId, flowSnapshot, user); - final URI exampleUri = getAbsolutePath(); + // build a request wrapper + final InitiateChangeFlowVersionRequestWrapper requestWrapper = new InitiateChangeFlowVersionRequestWrapper(requestEntity, componentLifecycle, getAbsolutePath(), affectedComponents, + replicateRequest, flowSnapshot); + final Revision requestRevision = getRevision(requestEntity.getProcessGroupRevision(), groupId); return withWriteLock( serviceFacade, - requestEntity, + requestWrapper, requestRevision, lookup -> { // Step 2: Verify READ and WRITE permissions for user, for every component. @@ -1251,7 +1262,10 @@ public class VersionsResource extends ApplicationResource { // Step 4: Verify that Process Group is already under version control. If not, must start Version Control instead of updating flow serviceFacade.verifyCanRevertLocalModifications(groupId, flowSnapshot); }, - (revision, processGroupEntity) -> { + (revision, wrapper) -> { + final VersionControlInformationEntity versionControlInformationEntity = wrapper.getVersionControlInformationEntity(); + final VersionControlInformationDTO versionControlInformationDTO = versionControlInformationEntity.getVersionControlInformation(); + // Ensure that the information passed in is correct final VersionControlInformationEntity currentVersionEntity = serviceFacade.getVersionControlInformation(groupId); if (currentVersionEntity == null) { @@ -1259,19 +1273,21 @@ public class VersionsResource extends ApplicationResource { } final VersionControlInformationDTO currentVersion = currentVersionEntity.getVersionControlInformation(); - if (!currentVersion.getBucketId().equals(versionControlInfoDto.getBucketId())) { + if (!currentVersion.getBucketId().equals(versionControlInformationDTO.getBucketId())) { throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with."); } - if (!currentVersion.getFlowId().equals(versionControlInfoDto.getFlowId())) { + if (!currentVersion.getFlowId().equals(versionControlInformationDTO.getFlowId())) { throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with."); } - if (!currentVersion.getRegistryId().equals(versionControlInfoDto.getRegistryId())) { + if (!currentVersion.getRegistryId().equals(versionControlInformationDTO.getRegistryId())) { throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with."); } - if (!currentVersion.getVersion().equals(versionControlInfoDto.getVersion())) { + if (!currentVersion.getVersion().equals(versionControlInformationDTO.getVersion())) { throw new IllegalArgumentException("The Version Control Information provided does not match the flow that the Process Group is currently synchronized with."); } + final String idGenerationSeed = getIdGenerationSeed().orElse(null); + // Create an asynchronous request that will occur in the background, because this request may // result in stopping components, which can take an indeterminate amount of time. final String requestId = UUID.randomUUID().toString(); @@ -1280,8 +1296,9 @@ public class VersionsResource extends ApplicationResource { // Submit the request to be performed in the background final Consumer<AsynchronousWebRequest<VersionControlInformationEntity>> updateTask = vcur -> { try { - final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, componentLifecycle, exampleUri, - affectedComponents, user, replicateRequest, processGroupEntity, flowSnapshot, request, idGenerationSeed, false, true); + final VersionControlInformationEntity updatedVersionControlEntity = updateFlowVersion(groupId, wrapper.getComponentLifecycle(), wrapper.getExampleUri(), + wrapper.getAffectedComponents(), user, wrapper.isReplicateRequest(), revision, versionControlInformationEntity, wrapper.getFlowSnapshot(), request, + idGenerationSeed, false, true); vcur.markComplete(updatedVersionControlEntity); } catch (final Exception e) { @@ -1303,8 +1320,9 @@ public class VersionsResource extends ApplicationResource { updateRequestDto.setPercentCompleted(request.getPercentComplete()); updateRequestDto.setUri(generateResourceUri("versions", "revert-requests", requestId)); + final VersionedFlowUpdateRequestEntity updateRequestEntity = new VersionedFlowUpdateRequestEntity(); - final RevisionDTO groupRevision = dtoFactory.createRevisionDTO(revision); + final RevisionDTO groupRevision = serviceFacade.getProcessGroup(groupId).getRevision(); updateRequestEntity.setProcessGroupRevision(groupRevision); updateRequestEntity.setRequest(updateRequestDto); @@ -1313,7 +1331,7 @@ public class VersionsResource extends ApplicationResource { } private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri, - final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final VersionControlInformationEntity requestEntity, + final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final Revision revision, final VersionControlInformationEntity requestEntity, final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest, final String idGenerationSeed, final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException { @@ -1372,7 +1390,7 @@ public class VersionsResource extends ApplicationResource { headers.put("content-type", MediaType.APPLICATION_JSON); final VersionedFlowSnapshotEntity snapshotEntity = new VersionedFlowSnapshotEntity(); - snapshotEntity.setProcessGroupRevision(requestEntity.getProcessGroupRevision()); + snapshotEntity.setProcessGroupRevision(dtoFactory.createRevisionDTO(revision)); snapshotEntity.setRegistryId(requestEntity.getVersionControlInformation().getRegistryId()); snapshotEntity.setVersionedFlow(flowSnapshot); snapshotEntity.setUpdateDescendantVersionedFlows(updateDescendantVersionedFlows); @@ -1408,8 +1426,6 @@ public class VersionsResource extends ApplicationResource { serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, verifyNotModified); // Step 11-12. Update Process Group to the new flow and update variable registry with any Variables that were added or removed - final RevisionDTO revisionDto = requestEntity.getProcessGroupRevision(); - final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), groupId); final VersionControlInformationDTO requestVci = requestEntity.getVersionControlInformation(); final Bucket bucket = flowSnapshot.getBucket(); @@ -1571,4 +1587,50 @@ public class VersionsResource extends ApplicationResource { return updatePerformed; } } + + + private static class InitiateChangeFlowVersionRequestWrapper extends Entity { + final VersionControlInformationEntity versionControlInformationEntity; + final ComponentLifecycle componentLifecycle; + final URI exampleUri; + final Set<AffectedComponentEntity> affectedComponents; + final boolean replicateRequest; + final VersionedFlowSnapshot flowSnapshot; + + public InitiateChangeFlowVersionRequestWrapper(final VersionControlInformationEntity versionControlInformationEntity, final ComponentLifecycle componentLifecycle, + final URI exampleUri, final Set<AffectedComponentEntity> affectedComponents, final boolean replicateRequest, + final VersionedFlowSnapshot flowSnapshot) { + + this.versionControlInformationEntity = versionControlInformationEntity; + this.componentLifecycle = componentLifecycle; + this.exampleUri = exampleUri; + this.affectedComponents = affectedComponents; + this.replicateRequest = replicateRequest; + this.flowSnapshot = flowSnapshot; + } + + public VersionControlInformationEntity getVersionControlInformationEntity() { + return versionControlInformationEntity; + } + + public ComponentLifecycle getComponentLifecycle() { + return componentLifecycle; + } + + public URI getExampleUri() { + return exampleUri; + } + + public Set<AffectedComponentEntity> getAffectedComponents() { + return affectedComponents; + } + + public boolean isReplicateRequest() { + return replicateRequest; + } + + public VersionedFlowSnapshot getFlowSnapshot() { + return flowSnapshot; + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b3e1584e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-flow-version.js ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-flow-version.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-flow-version.js index b676f5b..96c467a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-flow-version.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-flow-version.js @@ -214,8 +214,14 @@ }); } + // determine the max registry height + var windowHeight = $(window).height(); + var registryOffset = $('#import-flow-version-registry-combo').offset(); + var registryMaxHeight = windowHeight - registryOffset.top - 64; + // load the registries registryCombo.combo({ + maxHeight: registryMaxHeight, options: registries, select: function (selectedOption) { selectRegistry(dialog, selectedOption, bucketCombo, flowCombo, selectBucket, bucketCheck) @@ -290,8 +296,14 @@ } } + // determine the max bucket height + var windowHeight = $(window).height(); + var bucketOffset = $('#import-flow-version-bucket-combo').offset(); + var bucketMaxHeight = windowHeight - bucketOffset.top - 64; + // load the buckets bucketCombo.combo('destroy').combo({ + maxHeight: bucketMaxHeight, options: buckets, select: selectBucket }); @@ -890,8 +902,14 @@ }); } + // determine the max flow height + var windowHeight = $(window).height(); + var flowOffset = $('#import-flow-version-name-combo').offset(); + var flowMaxHeight = windowHeight - flowOffset.top - 64; + // load the buckets $('#import-flow-version-name-combo').combo('destroy').combo({ + maxHeight: flowMaxHeight, options: versionedFlows, select: function (selectedFlow) { if (nfCommon.isDefinedAndNotNull(selectedFlow.value)) { @@ -1284,7 +1302,6 @@ if (nfCanvasUtils.getGroupId() === processGroupId) { // if reverting/changing current PG... reload/refresh this group/canvas - // TODO consider implementing this differently $.ajax({ type: 'GET', url: '../nifi-api/flow/process-groups/' + encodeURIComponent(processGroupId), @@ -1295,6 +1312,14 @@ // update the component visibility nfGraph.updateVisibility(); + + // update the breadcrumbs + var breadcrumbsCtrl = nfNgBridge.injector.get('breadcrumbsCtrl'); + breadcrumbsCtrl.resetBreadcrumbs(); + breadcrumbsCtrl.generateBreadcrumbs(response.processGroupFlow.breadcrumb); + + // inform Angular app values have changed + nfNgBridge.digest(); }).fail(nfErrorHandler.handleAjaxError); } else { // if reverting selected PG... reload selected PG to update counts, etc
