http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java index 2eb7bd6..acef302 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java @@ -370,8 +370,8 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration } processor = entity.getComponent(); } else { - final ConfigurationSnapshot<ProcessorDTO> response = serviceFacade.setProcessorAnnotationData(revision, id, annotationData); - processor = response.getConfiguration(); + final ProcessorEntity entity = serviceFacade.setProcessorAnnotationData(revision, id, annotationData); + processor = entity.getComponent(); } // return the processor info @@ -453,8 +453,8 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration controllerServiceDto.setId(id); controllerServiceDto.setAnnotationData(annotationData); - final ConfigurationSnapshot<ControllerServiceDTO> response = serviceFacade.updateControllerService(revision, controllerServiceDto); - controllerService = response.getConfiguration(); + final UpdateResult<ControllerServiceEntity> updateResult = serviceFacade.updateControllerService(revision, controllerServiceDto); + controllerService = updateResult.getResult().getControllerService(); } else { // if this is a standalone instance the service should have been found above... there should // no cluster to replicate the request to @@ -588,8 +588,8 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration reportingTaskDto.setId(id); reportingTaskDto.setAnnotationData(annotationData); - final ConfigurationSnapshot<ReportingTaskDTO> response = serviceFacade.updateReportingTask(revision, reportingTaskDto); - reportingTask = response.getConfiguration(); + final UpdateResult<ReportingTaskEntity> updateResult = serviceFacade.updateReportingTask(revision, reportingTaskDto); + reportingTask = updateResult.getResult().getReportingTask(); } else { // if this is a standalone instance the task should have been found above... there should // no cluster to replicate the request to
http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 8026400..12283e7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -16,10 +16,26 @@ */ package org.apache.nifi.web.api; -import com.sun.jersey.api.core.HttpContext; -import com.sun.jersey.api.representation.Form; -import com.sun.jersey.core.util.MultivaluedMapImpl; -import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.CacheControl; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriBuilderException; +import javax.ws.rs.core.UriInfo; + import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -29,8 +45,12 @@ import org.apache.nifi.action.Operation; import org.apache.nifi.authorization.user.NiFiUserDetails; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextThreadLocal; +import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.entity.ComponentEntity; import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter; @@ -40,25 +60,10 @@ import org.slf4j.LoggerFactory; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.core.CacheControl; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.ResponseBuilder; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriBuilderException; -import javax.ws.rs.core.UriInfo; -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collection; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import com.sun.jersey.api.core.HttpContext; +import com.sun.jersey.api.representation.Form; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import com.sun.jersey.server.impl.model.method.dispatch.FormDispatchProvider; /** * Base class for controllers. @@ -311,7 +316,7 @@ public abstract class ApplicationResource { // get the form that jersey processed and use it if it exists (only exist for requests with a body and application form urlencoded final Form form = (Form) httpContext.getProperties().get(FormDispatchProvider.FORM_PROPERTY); if (form == null) { - for (Map.Entry<String, String[]> entry : (Set<Map.Entry<String, String[]>>) httpServletRequest.getParameterMap().entrySet()) { + for (Map.Entry<String, String[]> entry : httpServletRequest.getParameterMap().entrySet()) { if (entry.getValue() == null) { entity.add(entry.getKey(), null); } else { @@ -407,4 +412,50 @@ public abstract class ApplicationResource { strb.append(ReflectionToStringBuilder.toString(action, ToStringStyle.MULTI_LINE_STYLE)).append("\n"); } } + + /** + * Checks whether the request is part of a two-phase commit style request (either phase 1 or phase 2) + * + * @param httpServletRequest the request + * @return <code>true</code> if the request represents a two-phase commit style request + */ + protected boolean isTwoPhaseRequest(HttpServletRequest httpServletRequest) { + final String headerValue = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID); + return headerValue != null; + } + + /** + * When a two-phase commit style request is used, the first phase (generally referred to + * as the "commit-request stage") is intended to validate that the request can be completed. + * In NiFi, we use this phase to validate that the request can complete. This method determines + * whether or not the request is the first phase of a two-phase commit. + * + * @param httpServletRequest the request + * @return <code>true</code> if the request represents a two-phase commit style request and is the + * first of the two phases. + */ + protected boolean isValidationPhase(HttpServletRequest httpServletRequest) { + return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER) != null; + } + + /** + * Converts a Revision DTO and an associated Component ID into a Revision object + * + * @param revisionDto the Revision DTO + * @param componentId the ID of the component that the Revision DTO belongs to + * @return a Revision that has the same client ID and Version as the Revision DTO and the Component ID specified + */ + protected Revision getRevision(RevisionDTO revisionDto, String componentId) { + return new Revision(revisionDto.getVersion(), revisionDto.getClientId(), componentId); + } + + /** + * Extracts a Revision object from the Revision DTO and ID provided by the Component Entity + * + * @param entity the ComponentEntity that contains the Revision DTO & ID + * @return the Revision specified in the ComponentEntity + */ + protected Revision getRevision(ComponentEntity entity, String componentId) { + return getRevision(entity.getRevision(), componentId); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index d02c1ef..224dda7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -16,25 +16,10 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.ConnectionDTO; -import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; -import org.apache.nifi.web.api.dto.ListingRequestDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.entity.ConnectionEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -50,10 +35,26 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; +import org.apache.nifi.web.api.dto.ListingRequestDTO; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; + +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing a Connection. @@ -268,16 +269,20 @@ public class ConnectionResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), updateClientId(connectionEntity), getHeaders(headersToOverride)).getResponse(); } - // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + // handle expects request + final Revision revision = getRevision(connectionEntity, id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + + if (validationPhase) { serviceFacade.verifyUpdateConnection(connection); return generateContinueResponse().build(); } // update the relationship target - final RevisionDTO revision = connectionEntity.getRevision(); - final UpdateResult<ConnectionEntity> updateResult = serviceFacade.updateConnection(new Revision(revision.getVersion(), revision.getClientId()), connection); + final UpdateResult<ConnectionEntity> updateResult = serviceFacade.updateConnection(revision, connection); final ConnectionEntity entity = updateResult.getResult(); populateRemainingConnectionEntityContent(entity); @@ -343,21 +348,23 @@ public class ConnectionResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // determine the specified version + final Long clientVersion = version == null ? null : version.getLong(); + final Revision revision = new Revision(clientVersion, clientId.getClientId(), id); + // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { - serviceFacade.verifyDeleteConnection(id); - return generateContinueResponse().build(); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); } - // determine the specified version - Long clientVersion = null; - if (version != null) { - clientVersion = version.getLong(); + if (validationPhase) { + serviceFacade.verifyDeleteConnection(id); + return generateContinueResponse().build(); } // delete the connection - final ConnectionEntity entity = serviceFacade.deleteConnection(new Revision(clientVersion, clientId.getClientId()), id); + final ConnectionEntity entity = serviceFacade.deleteConnection(revision, id); // generate the response return clusterContext(generateOkResponse(entity)).build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index 8add2bb..62082ff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -155,20 +155,11 @@ public class ControllerResource extends ApplicationResource { // create the archive final RevisionDTO requestRevision = revisionEntity.getRevision(); - final ConfigurationSnapshot<Void> controllerResponse = serviceFacade.createArchive(new Revision(requestRevision.getVersion(), requestRevision.getClientId())); - - // create the revision - final RevisionDTO updatedRevision = new RevisionDTO(); - updatedRevision.setClientId(requestRevision.getClientId()); - updatedRevision.setVersion(controllerResponse.getVersion()); - - // create the response entity - final ProcessGroupEntity controllerEntity = new ProcessGroupEntity(); - controllerEntity.setRevision(updatedRevision); + final ProcessGroupEntity entity = serviceFacade.createArchive(new Revision(requestRevision.getVersion(), requestRevision.getClientId())); // generate the response URI uri = URI.create(generateResourceUri("controller", "archive")); - return clusterContext(generateCreatedResponse(uri, controllerEntity)).build(); + return clusterContext(generateCreatedResponse(uri, entity)).build(); } /** @@ -432,15 +423,17 @@ public class ControllerResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), updateClientId(configEntity), getHeaders()).getResponse(); } + final RevisionDTO revisionDto = configEntity.getRevision(); + final Revision revision = new Revision(revisionDto.getVersion(), revisionDto.getClientId(), "controller"); + // handle expects request (usually from the cluster manager) final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); if (expects != null) { return generateContinueResponse().build(); } - final RevisionDTO revision = configEntity.getRevision(); final ConfigurationSnapshot<ControllerConfigurationDTO> controllerResponse - = serviceFacade.updateControllerConfiguration(new Revision(revision.getVersion(), revision.getClientId()), configEntity.getConfig()); + = serviceFacade.updateControllerConfiguration(revision, configEntity.getConfig()); final ControllerConfigurationDTO controllerConfig = controllerResponse.getConfiguration(); // get the updated revision @@ -580,9 +573,6 @@ public class ControllerResource extends ApplicationResource { throw new IllegalArgumentException("The type of reporting task to create must be specified."); } - // get the revision - final RevisionDTO revision = reportingTaskEntity.getRevision(); - if (properties.isClusterManager()) { return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), updateClientId(reportingTaskEntity), getHeaders()).getResponse(); } @@ -602,22 +592,13 @@ public class ControllerResource extends ApplicationResource { } // create the reporting task and generate the json - final ConfigurationSnapshot<ReportingTaskDTO> controllerResponse = serviceFacade.createReportingTask( - new Revision(revision.getVersion(), revision.getClientId()), reportingTaskEntity.getReportingTask()); - final ReportingTaskDTO reportingTask = controllerResponse.getConfiguration(); + final Revision revision = getRevision(reportingTaskEntity.getRevision(), reportingTaskEntity.getReportingTask().getId()); + final ReportingTaskEntity entity = serviceFacade.createReportingTask(revision, reportingTaskEntity.getReportingTask()); - // get the updated revision - final RevisionDTO updatedRevision = new RevisionDTO(); - updatedRevision.setClientId(revision.getClientId()); - updatedRevision.setVersion(controllerResponse.getVersion()); - - // build the response entity - final ReportingTaskEntity entity = new ReportingTaskEntity(); - entity.setRevision(updatedRevision); - entity.setReportingTask(reportingTaskResource.populateRemainingReportingTaskContent(availability, reportingTask)); + reportingTaskResource.populateRemainingReportingTaskContent(availability, entity.getReportingTask()); // build the response - return clusterContext(generateCreatedResponse(URI.create(reportingTask.getUri()), entity)).build(); + return clusterContext(generateCreatedResponse(URI.create(entity.getReportingTask().getUri()), entity)).build(); } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java index 3cfae3a..205fa1f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java @@ -16,12 +16,29 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.controller.ScheduledState; @@ -33,9 +50,9 @@ import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UiExtensionType; +import org.apache.nifi.web.UpdateResult; import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; -import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.entity.ComponentStateEntity; @@ -50,27 +67,12 @@ import org.apache.nifi.web.util.Availability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.ServletContext; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing a Controller Service. @@ -402,19 +404,22 @@ public class ControllerServiceResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(revisionEntity.getRevision(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyCanClearControllerServiceState(id); return generateContinueResponse().build(); } // get the component state - final RevisionDTO requestRevision = revisionEntity.getRevision(); - final ConfigurationSnapshot<Void> snapshot = serviceFacade.clearControllerServiceState(new Revision(requestRevision.getVersion(), requestRevision.getClientId()), id); + final ConfigurationSnapshot<Void> snapshot = serviceFacade.clearControllerServiceState(revision, id); // create the revision final RevisionDTO responseRevision = new RevisionDTO(); - responseRevision.setClientId(requestRevision.getClientId()); + responseRevision.setClientId(revision.getClientId()); responseRevision.setVersion(snapshot.getVersion()); // generate the response entity @@ -478,11 +483,7 @@ public class ControllerServiceResource extends ApplicationResource { } // get the controller service - final Set<ControllerServiceReferencingComponentDTO> controllerServiceReferences = serviceFacade.getControllerServiceReferencingComponents(id); - - // create the response entity - final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity(); - entity.setControllerServiceReferencingComponents(controllerServiceReferences); + final ControllerServiceReferencingComponentsEntity entity = serviceFacade.getControllerServiceReferencingComponents(id); return clusterContext(generateOkResponse(entity)).build(); } @@ -574,26 +575,19 @@ public class ControllerServiceResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision controllerServiceRevision = getRevision(updateReferenceRequest.getRevision(), updateReferenceRequest.getId()); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(controllerServiceRevision); + } + if (validationPhase) { serviceFacade.verifyUpdateControllerServiceReferencingComponents(updateReferenceRequest.getId(), scheduledState, controllerServiceState); return generateContinueResponse().build(); } // get the controller service - final RevisionDTO requestRevision = updateReferenceRequest.getRevision(); - final ConfigurationSnapshot<Set<ControllerServiceReferencingComponentDTO>> response = serviceFacade.updateControllerServiceReferencingComponents( - new Revision(requestRevision.getVersion(), requestRevision.getClientId()), updateReferenceRequest.getId(), scheduledState, controllerServiceState); - - // create the revision - final RevisionDTO revision = new RevisionDTO(); - revision.setClientId(requestRevision.getClientId()); - revision.setVersion(response.getVersion()); - - // create the response entity - final ControllerServiceReferencingComponentsEntity entity = new ControllerServiceReferencingComponentsEntity(); - entity.setRevision(revision); - entity.setControllerServiceReferencingComponents(response.getConfiguration()); + final ControllerServiceReferencingComponentsEntity entity = serviceFacade.updateControllerServiceReferencingComponents( + controllerServiceRevision, updateReferenceRequest.getId(), scheduledState, controllerServiceState); return clusterContext(generateOkResponse(entity)).build(); } @@ -676,32 +670,25 @@ public class ControllerServiceResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(controllerServiceEntity, id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyUpdateControllerService(requestControllerServiceDTO); return generateContinueResponse().build(); } // update the controller service - final RevisionDTO revision = controllerServiceEntity.getRevision(); - final ConfigurationSnapshot<ControllerServiceDTO> controllerResponse = serviceFacade.updateControllerService( - new Revision(revision.getVersion(), revision.getClientId()), requestControllerServiceDTO); - - // get the results - final ControllerServiceDTO responseControllerServiceDTO = controllerResponse.getConfiguration(); - - // get the updated revision - final RevisionDTO updatedRevision = new RevisionDTO(); - updatedRevision.setClientId(revision.getClientId()); - updatedRevision.setVersion(controllerResponse.getVersion()); + final UpdateResult<ControllerServiceEntity> updateResult = serviceFacade.updateControllerService(revision, requestControllerServiceDTO); // build the response entity - final ControllerServiceEntity entity = new ControllerServiceEntity(); - entity.setRevision(updatedRevision); - entity.setControllerService(populateRemainingControllerServiceContent(availability, responseControllerServiceDTO)); + final ControllerServiceEntity entity = updateResult.getResult(); + populateRemainingControllerServiceContent(availability, entity.getControllerService()); - if (controllerResponse.isNew()) { - return clusterContext(generateCreatedResponse(URI.create(responseControllerServiceDTO.getUri()), entity)).build(); + if (updateResult.isNew()) { + return clusterContext(generateCreatedResponse(URI.create(entity.getControllerService().getUri()), entity)).build(); } else { return clusterContext(generateOkResponse(entity)).build(); } @@ -775,30 +762,18 @@ public class ControllerServiceResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyDeleteControllerService(id); return generateContinueResponse().build(); } - // determine the specified version - Long clientVersion = null; - if (version != null) { - clientVersion = version.getLong(); - } - // delete the specified controller service - final ConfigurationSnapshot<Void> controllerResponse = serviceFacade.deleteControllerService(new Revision(clientVersion, clientId.getClientId()), id); - - // get the updated revision - final RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - revision.setVersion(controllerResponse.getVersion()); - - // build the response entity - final ControllerServiceEntity entity = new ControllerServiceEntity(); - entity.setRevision(revision); - + final ControllerServiceEntity entity = serviceFacade.deleteControllerService(revision, id); return clusterContext(generateOkResponse(entity)).build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java index 6874ad5..534c372 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java @@ -77,6 +77,7 @@ import java.util.UUID; value = "/flowfile-queues", description = "Endpoint for managing a FlowFile Queue." ) +// TODO: Need revisions of the Connections for these endpoints! public class FlowFileQueueResource extends ApplicationResource { private NiFiServiceFacade serviceFacade; http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index 337cb67..82488c7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -357,6 +357,10 @@ public class FlowResource extends ApplicationResource { public Response getRevision() { authorizeFlow(); + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + // create the current revision final RevisionDTO revision = serviceFacade.getRevision(); http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java index b3a050f..987f544 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FunnelResource.java @@ -16,25 +16,10 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.FunnelDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.entity.FunnelEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -50,10 +35,27 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.FunnelDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.entity.FunnelEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing a Funnel. @@ -233,16 +235,21 @@ public class FunnelResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.PUT, getAbsolutePath(), updateClientId(funnelEntity), getHeaders(headersToOverride)).getResponse(); } + // Extract the revision + final Revision revision = getRevision(funnelEntity, id); + // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { + serviceFacade.claimRevision(revision); return generateContinueResponse().build(); } // update the funnel - final RevisionDTO revision = funnelEntity.getRevision(); - final UpdateResult<FunnelEntity> updateResult = serviceFacade.updateFunnel( - new Revision(revision.getVersion(), revision.getClientId()), requestFunnelDTO); + final UpdateResult<FunnelEntity> updateResult = serviceFacade.updateFunnel(revision, requestFunnelDTO); // get the results final FunnelEntity entity = updateResult.getResult(); @@ -312,20 +319,18 @@ public class FunnelResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyDeleteFunnel(id); return generateContinueResponse().build(); } - // determine the specified version - Long clientVersion = null; - if (version != null) { - clientVersion = version.getLong(); - } - // delete the specified funnel - final FunnelEntity entity = serviceFacade.deleteFunnel(new Revision(clientVersion, clientId.getClientId()), id); + final FunnelEntity entity = serviceFacade.deleteFunnel(revision, id); return clusterContext(generateOkResponse(entity)).build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java index a378ec3..52f2392 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/InputPortResource.java @@ -16,23 +16,10 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.entity.PortEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -48,10 +35,24 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; + +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing an Input Port. @@ -230,16 +231,18 @@ public class InputPortResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(portEntity, id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyUpdateInputPort(requestPortDTO); return generateContinueResponse().build(); } // update the input port - final RevisionDTO revision = portEntity.getRevision(); - final UpdateResult<PortEntity> updateResult = serviceFacade.updateInputPort( - new Revision(revision.getVersion(), revision.getClientId()), requestPortDTO); + final UpdateResult<PortEntity> updateResult = serviceFacade.updateInputPort(revision, requestPortDTO); // build the response entity final PortEntity entity = updateResult.getResult(); @@ -306,20 +309,18 @@ public class InputPortResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyDeleteInputPort(id); return generateContinueResponse().build(); } - // determine the specified version - Long clientVersion = null; - if (version != null) { - clientVersion = version.getLong(); - } - // delete the specified input port - final PortEntity entity = serviceFacade.deleteInputPort(new Revision(clientVersion, clientId.getClientId()), id); + final PortEntity entity = serviceFacade.deleteInputPort(revision, id); return clusterContext(generateOkResponse(entity)).build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java index 65dbb75..db17647 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/LabelResource.java @@ -16,25 +16,10 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.LabelDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.entity.LabelEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -50,10 +35,26 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.LabelDTO; +import org.apache.nifi.web.api.entity.LabelEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing a Label. @@ -234,15 +235,17 @@ public class LabelResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(labelEntity, id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { return generateContinueResponse().build(); } // update the label - final RevisionDTO revision = labelEntity.getRevision(); - final UpdateResult<LabelEntity> result = serviceFacade.updateLabel( - new Revision(revision.getVersion(), revision.getClientId()), requestLabelDTO); + final UpdateResult<LabelEntity> result = serviceFacade.updateLabel(revision, requestLabelDTO); final LabelEntity entity = result.getResult(); populateRemainingLabelEntityContent(entity); @@ -307,19 +310,17 @@ public class LabelResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { - return generateContinueResponse().build(); + final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); } - - // determine the specified version - Long clientVersion = null; - if (version != null) { - clientVersion = version.getLong(); + if (validationPhase) { + return generateContinueResponse().build(); } // delete the specified label - final LabelEntity entity = serviceFacade.deleteLabel(new Revision(clientVersion, clientId.getClientId()), id); + final LabelEntity entity = serviceFacade.deleteLabel(revision, id); return clusterContext(generateOkResponse(entity)).build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java index 398350f..584ecc9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/OutputPortResource.java @@ -16,25 +16,10 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.PortDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.entity.PortEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -50,10 +35,26 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.PortDTO; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing an Output Port. @@ -234,16 +235,18 @@ public class OutputPortResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(portEntity, id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyUpdateOutputPort(requestPortDTO); return generateContinueResponse().build(); } // update the output port - final RevisionDTO revision = portEntity.getRevision(); - final UpdateResult<PortEntity> updateResult = serviceFacade.updateOutputPort( - new Revision(revision.getVersion(), revision.getClientId()), requestPortDTO); + final UpdateResult<PortEntity> updateResult = serviceFacade.updateOutputPort(revision, requestPortDTO); // get the results final PortEntity entity = updateResult.getResult(); @@ -310,20 +313,18 @@ public class OutputPortResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyDeleteOutputPort(id); return generateContinueResponse().build(); } - // determine the specified version - Long clientVersion = null; - if (version != null) { - clientVersion = version.getLong(); - } - // delete the specified output port - final PortEntity entity = serviceFacade.deleteOutputPort(new Revision(clientVersion, clientId.getClientId()), id); + final PortEntity entity = serviceFacade.deleteOutputPort(revision, id); return clusterContext(generateOkResponse(entity)).build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java index 49a3264..b0e240e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java @@ -16,14 +16,38 @@ */ package org.apache.nifi.web.api; -import com.sun.jersey.api.core.ResourceContext; -import com.sun.jersey.multipart.FormDataParam; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.stream.StreamSource; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextThreadLocal; @@ -72,36 +96,14 @@ import org.apache.nifi.web.util.Availability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; -import javax.xml.transform.stream.StreamSource; -import java.io.InputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import com.sun.jersey.api.core.ResourceContext; +import com.sun.jersey.multipart.FormDataParam; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing a Group. @@ -332,15 +334,18 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(processGroupEntity, id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyUpdateProcessGroup(requestProcessGroupDTO); return generateContinueResponse().build(); } // update the process group - final RevisionDTO revision = processGroupEntity.getRevision(); - final UpdateResult<ProcessGroupEntity> updateResult = serviceFacade.updateProcessGroup(new Revision(revision.getVersion(), revision.getClientId()), requestProcessGroupDTO); + final UpdateResult<ProcessGroupEntity> updateResult = serviceFacade.updateProcessGroup(revision, requestProcessGroupDTO); final ProcessGroupEntity entity = updateResult.getResult(); populateRemainingProcessGroupEntityContent(entity); @@ -405,20 +410,18 @@ public class ProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyDeleteProcessGroup(id); return generateContinueResponse().build(); } - // determine the specified version - Long clientVersion = null; - if (version != null) { - clientVersion = version.getLong(); - } - // delete the process group - final ProcessGroupEntity entity = serviceFacade.deleteProcessGroup(new Revision(clientVersion, clientId.getClientId()), id); + final ProcessGroupEntity entity = serviceFacade.deleteProcessGroup(revision, id); // create the response return clusterContext(generateOkResponse(entity)).build(); @@ -1687,7 +1690,7 @@ public class ProcessGroupResource extends ApplicationResource { snippetEntity.getSnippet().setParentGroupId(groupId); if (properties.isClusterManager()) { - return (Response) clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), updateClientId(snippetEntity), getHeaders()).getResponse(); + return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), updateClientId(snippetEntity), getHeaders()).getResponse(); } // handle expects request (usually from the cluster manager) @@ -1959,18 +1962,9 @@ public class ProcessGroupResource extends ApplicationResource { } // delete the specified snippet - final ConfigurationSnapshot<Void> controllerResponse = serviceFacade.deleteSnippet(new Revision(clientVersion, clientId.getClientId()), id); - - // get the updated revision - final RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - revision.setVersion(controllerResponse.getVersion()); - - // build the response entity - SnippetEntity entity = new SnippetEntity(); - entity.setRevision(revision); + final SnippetEntity snippetEntity = serviceFacade.deleteSnippet(new Revision(clientVersion, clientId.getClientId()), id); - return clusterContext(generateOkResponse(entity)).build(); + return clusterContext(generateOkResponse(snippetEntity)).build(); } // ---------------- @@ -2039,30 +2033,24 @@ public class ProcessGroupResource extends ApplicationResource { // copy the specified snippet final RevisionDTO requestRevision = copySnippetEntity.getRevision(); - final ConfigurationSnapshot<FlowDTO> controllerResponse = serviceFacade.copySnippet( + final FlowEntity flowEntity = serviceFacade.copySnippet( new Revision(requestRevision.getVersion(), requestRevision.getClientId()), groupId, copySnippetEntity.getSnippetId(), copySnippetEntity.getOriginX(), copySnippetEntity.getOriginY()); // get the snippet - final FlowDTO flow = controllerResponse.getConfiguration(); + final FlowDTO flow = flowEntity.getFlow(); // prune response as necessary for (ProcessGroupEntity childGroupEntity : flow.getProcessGroups()) { childGroupEntity.getComponent().setContents(null); } - // get the updated revision - final RevisionDTO revision = new RevisionDTO(); - revision.setClientId(requestRevision.getClientId()); - revision.setVersion(controllerResponse.getVersion()); // create the response entity - final FlowEntity entity = new FlowEntity(); - entity.setRevision(revision); - entity.setFlow(populateRemainingSnippetContent(flow)); + populateRemainingSnippetContent(flow); // generate the response - return clusterContext(generateCreatedResponse(getAbsolutePath(), entity)).build(); + return clusterContext(generateCreatedResponse(getAbsolutePath(), flowEntity)).build(); } // ----------------- @@ -2131,26 +2119,20 @@ public class ProcessGroupResource extends ApplicationResource { // create the template and generate the json final RevisionDTO requestRevision = instantiateTemplateRequestEntity.getRevision(); - final ConfigurationSnapshot<FlowDTO> response = serviceFacade.createTemplateInstance( + final FlowEntity entity = serviceFacade.createTemplateInstance( new Revision(requestRevision.getVersion(), requestRevision.getClientId()), groupId, instantiateTemplateRequestEntity.getOriginX(), instantiateTemplateRequestEntity.getOriginY(), instantiateTemplateRequestEntity.getTemplateId()); - final FlowDTO flowSnippet = response.getConfiguration(); + final FlowDTO flowSnippet = entity.getFlow(); // prune response as necessary for (ProcessGroupEntity childGroupEntity : flowSnippet.getProcessGroups()) { childGroupEntity.getComponent().setContents(null); } - // get the updated revision - final RevisionDTO revision = new RevisionDTO(); - revision.setClientId(requestRevision.getClientId()); - revision.setVersion(response.getVersion()); // create the response entity - final FlowEntity entity = new FlowEntity(); - entity.setRevision(revision); - entity.setFlow(populateRemainingSnippetContent(flowSnippet)); + populateRemainingSnippetContent(flowSnippet); // generate the response return clusterContext(generateCreatedResponse(getAbsolutePath(), entity)).build(); @@ -2531,22 +2513,14 @@ public class ProcessGroupResource extends ApplicationResource { } // create the controller service and generate the json - final ConfigurationSnapshot<ControllerServiceDTO> controllerResponse = serviceFacade.createControllerService( + final ControllerServiceEntity entity = serviceFacade.createControllerService( new Revision(revision.getVersion(), revision.getClientId()), controllerServiceEntity.getControllerService()); - final ControllerServiceDTO controllerService = controllerResponse.getConfiguration(); - - // get the updated revision - final RevisionDTO updatedRevision = new RevisionDTO(); - updatedRevision.setClientId(revision.getClientId()); - updatedRevision.setVersion(controllerResponse.getVersion()); // build the response entity - final ControllerServiceEntity entity = new ControllerServiceEntity(); - entity.setControllerService(controllerServiceResource.populateRemainingControllerServiceContent(availability, controllerService)); - entity.setRevision(updatedRevision); + controllerServiceResource.populateRemainingControllerServiceContent(availability, entity.getControllerService()); // build the response - return clusterContext(generateCreatedResponse(URI.create(controllerService.getUri()), entity)).build(); + return clusterContext(generateCreatedResponse(URI.create(entity.getControllerService().getUri()), entity)).build(); } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java index 9dddd70..e0cc08b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java @@ -379,19 +379,23 @@ public class ProcessorResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(revisionEntity.getRevision(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + + if (validationPhase) { serviceFacade.verifyCanClearProcessorState(id); return generateContinueResponse().build(); } // get the component state - final RevisionDTO requestRevision = revisionEntity.getRevision(); - final ConfigurationSnapshot<Void> snapshot = serviceFacade.clearProcessorState(new Revision(requestRevision.getVersion(), requestRevision.getClientId()), id); + final ConfigurationSnapshot<Void> snapshot = serviceFacade.clearProcessorState(revision, id); // create the revision final RevisionDTO responseRevision = new RevisionDTO(); - responseRevision.setClientId(requestRevision.getClientId()); + responseRevision.setClientId(revision.getClientId()); responseRevision.setVersion(snapshot.getVersion()); // generate the response entity @@ -475,15 +479,18 @@ public class ProcessorResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(processorEntity, id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyUpdateProcessor(requestProcessorDTO); return generateContinueResponse().build(); } // update the processor - final RevisionDTO revision = processorEntity.getRevision(); - final UpdateResult<ProcessorEntity> result = serviceFacade.updateProcessor(new Revision(revision.getVersion(), revision.getClientId()), requestProcessorDTO); + final UpdateResult<ProcessorEntity> result = serviceFacade.updateProcessor(revision, requestProcessorDTO); final ProcessorEntity entity = result.getResult(); populateRemainingProcessorEntityContent(entity); @@ -547,21 +554,23 @@ public class ProcessorResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final boolean validationPhase = isValidationPhase(httpServletRequest); + + // We need to claim the revision for the Processor if either this is the first phase of a two-phase + // request, or if this is not a two-phase request. + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyDeleteProcessor(id); return generateContinueResponse().build(); } - // determine the specified version - Long clientVersion = null; - if (version != null) { - clientVersion = version.getLong(); - } - // delete the processor - final ProcessorEntity entity = serviceFacade.deleteProcessor(new Revision(clientVersion, clientId.getClientId()), id); + final ProcessorEntity entity = serviceFacade.deleteProcessor(revision, id); // generate the response return clusterContext(generateOkResponse(entity)).build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/04c41c06/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java index 64fe273..489cc1d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/RemoteProcessGroupResource.java @@ -16,26 +16,10 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.impl.WebClusterManager; -import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.web.ConfigurationSnapshot; -import org.apache.nifi.web.NiFiServiceFacade; -import org.apache.nifi.web.Revision; -import org.apache.nifi.web.UpdateResult; -import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; -import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; -import org.apache.nifi.web.api.dto.RevisionDTO; -import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; -import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; -import org.apache.nifi.web.api.request.ClientIdParameter; -import org.apache.nifi.web.api.request.LongParameter; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -51,10 +35,27 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.manager.impl.WebClusterManager; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.NiFiServiceFacade; +import org.apache.nifi.web.Revision; +import org.apache.nifi.web.UpdateResult; +import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; +import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; +import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; +import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; +import org.apache.nifi.web.api.request.ClientIdParameter; +import org.apache.nifi.web.api.request.LongParameter; + +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing a Remote group. @@ -239,19 +240,17 @@ public class RemoteProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { serviceFacade.verifyDeleteRemoteProcessGroup(id); return generateContinueResponse().build(); } - // determine the specified version - Long clientVersion = null; - if (version != null) { - clientVersion = version.getLong(); - } - - final RemoteProcessGroupEntity entity = serviceFacade.deleteRemoteProcessGroup(new Revision(clientVersion, clientId.getClientId()), id); + final RemoteProcessGroupEntity entity = serviceFacade.deleteRemoteProcessGroup(revision, id); return clusterContext(generateOkResponse(entity)).build(); } @@ -318,28 +317,27 @@ public class RemoteProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(remoteProcessGroupPortEntity, id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { // verify the update at this time serviceFacade.verifyUpdateRemoteProcessGroupInputPort(id, requestRemoteProcessGroupPort); return generateContinueResponse().build(); } // update the specified remote process group - final RevisionDTO revision = remoteProcessGroupPortEntity.getRevision(); - final ConfigurationSnapshot<RemoteProcessGroupPortDTO> controllerResponse - = serviceFacade.updateRemoteProcessGroupInputPort(new Revision(revision.getVersion(), - revision.getClientId()), id, requestRemoteProcessGroupPort); + final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupInputPort(revision, id, requestRemoteProcessGroupPort); // get the updated revision - final RevisionDTO updatedRevision = new RevisionDTO(); - updatedRevision.setClientId(revision.getClientId()); - updatedRevision.setVersion(controllerResponse.getVersion()); + final RevisionDTO updatedRevision = controllerResponse.getRevision(); // build the response entity final RemoteProcessGroupPortEntity entity = new RemoteProcessGroupPortEntity(); entity.setRevision(updatedRevision); - entity.setRemoteProcessGroupPort(controllerResponse.getConfiguration()); + entity.setRemoteProcessGroupPort(controllerResponse.getRemoteProcessGroupPort()); return clusterContext(generateOkResponse(entity)).build(); } @@ -407,28 +405,27 @@ public class RemoteProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(remoteProcessGroupPortEntity, portId); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { // verify the update at this time serviceFacade.verifyUpdateRemoteProcessGroupOutputPort(id, requestRemoteProcessGroupPort); return generateContinueResponse().build(); } // update the specified remote process group - final RevisionDTO revision = remoteProcessGroupPortEntity.getRevision(); - final ConfigurationSnapshot<RemoteProcessGroupPortDTO> controllerResponse - = serviceFacade.updateRemoteProcessGroupOutputPort(new Revision(revision.getVersion(), - revision.getClientId()), id, requestRemoteProcessGroupPort); + final RemoteProcessGroupPortEntity controllerResponse = serviceFacade.updateRemoteProcessGroupOutputPort(revision, id, requestRemoteProcessGroupPort); // get the updated revision - final RevisionDTO updatedRevision = new RevisionDTO(); - updatedRevision.setClientId(revision.getClientId()); - updatedRevision.setVersion(controllerResponse.getVersion()); + final RevisionDTO updatedRevision = controllerResponse.getRevision(); // build the response entity RemoteProcessGroupPortEntity entity = new RemoteProcessGroupPortEntity(); entity.setRevision(updatedRevision); - entity.setRemoteProcessGroupPort(controllerResponse.getConfiguration()); + entity.setRemoteProcessGroupPort(controllerResponse.getRemoteProcessGroupPort()); return clusterContext(generateOkResponse(entity)).build(); } @@ -493,8 +490,12 @@ public class RemoteProcessGroupResource extends ApplicationResource { } // handle expects request (usually from the cluster manager) - final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); - if (expects != null) { + final Revision revision = getRevision(remoteProcessGroupEntity, id); + final boolean validationPhase = isValidationPhase(httpServletRequest); + if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + serviceFacade.claimRevision(revision); + } + if (validationPhase) { // verify the update at this time serviceFacade.verifyUpdateRemoteProcessGroup(requestRemoteProcessGroup); return generateContinueResponse().build(); @@ -533,9 +534,7 @@ public class RemoteProcessGroupResource extends ApplicationResource { } // update the specified remote process group - final RevisionDTO revision = remoteProcessGroupEntity.getRevision(); - final UpdateResult<RemoteProcessGroupEntity> updateResult - = serviceFacade.updateRemoteProcessGroup(new Revision(revision.getVersion(), revision.getClientId()), requestRemoteProcessGroup); + final UpdateResult<RemoteProcessGroupEntity> updateResult = serviceFacade.updateRemoteProcessGroup(revision, requestRemoteProcessGroup); final RemoteProcessGroupEntity entity = updateResult.getResult(); populateRemainingRemoteProcessGroupEntityContent(entity);
