NIFI-2635: - Re-using the original request during the second phase of the two phase commit. - Forwarding requests to the coordinator when received by a node.
This closes #933 Signed-off-by: jpercivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c2bfc4ef Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c2bfc4ef Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c2bfc4ef Branch: refs/heads/master Commit: c2bfc4ef245d5c95dc103139cf386896d6ec3023 Parents: e9da908 Author: Matt Gilman <[email protected]> Authored: Wed Aug 24 15:39:04 2016 -0400 Committer: jpercivall <[email protected]> Committed: Wed Aug 24 22:42:10 2016 -0400 ---------------------------------------------------------------------- nifi-commons/nifi-web-utils/pom.xml | 4 + .../http/replication/RequestReplicator.java | 24 +- .../ThreadPoolRequestReplicator.java | 52 +- .../TestThreadPoolRequestReplicator.java | 9 +- .../nifi/web/api/AccessPolicyResource.java | 95 +- .../nifi/web/api/ApplicationResource.java | 255 ++++- .../apache/nifi/web/api/ConnectionResource.java | 45 +- .../apache/nifi/web/api/ControllerResource.java | 176 +-- .../nifi/web/api/ControllerServiceResource.java | 123 ++- .../apache/nifi/web/api/CountersResource.java | 38 +- .../nifi/web/api/FlowFileQueueResource.java | 237 ++-- .../org/apache/nifi/web/api/FlowResource.java | 45 +- .../org/apache/nifi/web/api/FunnelResource.java | 33 +- .../apache/nifi/web/api/InputPortResource.java | 35 +- .../org/apache/nifi/web/api/LabelResource.java | 35 +- .../apache/nifi/web/api/OutputPortResource.java | 35 +- .../nifi/web/api/ProcessGroupResource.java | 1014 +++++++++--------- .../apache/nifi/web/api/ProcessorResource.java | 72 +- .../apache/nifi/web/api/ProvenanceResource.java | 182 ++-- .../web/api/RemoteProcessGroupResource.java | 122 ++- .../nifi/web/api/ReportingTaskResource.java | 72 +- .../apache/nifi/web/api/SnippetResource.java | 118 +- .../apache/nifi/web/api/TemplateResource.java | 39 +- .../apache/nifi/web/api/TenantsResource.java | 186 ++-- .../nifi/web/security/otp/OtpService.java | 42 +- .../apache/nifi/web/security/util/CacheKey.java | 61 ++ .../nifi-web-ui/src/main/webapp/css/header.css | 2 +- 27 files changed, 1804 insertions(+), 1347 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-commons/nifi-web-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-web-utils/pom.xml b/nifi-commons/nifi-web-utils/pom.xml index dd2f3ef..7cc0581 100644 --- a/nifi-commons/nifi-web-utils/pom.xml +++ b/nifi-commons/nifi-web-utils/pom.xml @@ -36,6 +36,10 @@ </dependency> <dependency> <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> <artifactId>jersey-client</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java index bfe528d..b9bce0a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java @@ -38,10 +38,14 @@ public interface RequestReplicator { public static final int NODE_CONTINUE_STATUS_CODE = 150; /** - * Indicates that the request is intended to cancel a lock that was previously obtained without performing the action + * Indicates that the request is intended to cancel a transaction that was previously created without performing the action */ - public static final String LOCK_CANCELATION_HEADER = "X-Cancel-Lock"; - public static final String LOCK_VERSION_ID_HEADER = "X-Lock-Version-Id"; + public static final String REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER = "X-Cancel-Transaction"; + + /** + * Indicates that this is the second phase of the two phase commit and the execution of the action should proceed. + */ + public static final String REQUEST_EXECUTION_HTTP_HEADER = "X-Execution-Continue"; /** * When we replicate a request across the cluster, we replicate it only from the cluster coordinator. @@ -106,6 +110,20 @@ public interface RequestReplicator { */ AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean performVerification); + + /** + * Forwards a request to the Cluster Coordinator so that it is able to replicate the request to all nodes in the cluster. + * + * @param coordinatorNodeId the node identifier of the Cluster Coordinator + * @param method the HTTP method (e.g., POST, PUT) + * @param uri the base request URI (up to, but not including, the query string) + * @param entity an entity + * @param headers any HTTP headers + * + * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later + */ + AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, String method, URI uri, Object entity, Map<String, String> headers); + /** * <p> * Returns an AsyncClusterResponse that provides the most up-to-date status of the request with the given identifier. http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index bd4e277..e22bb79 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -43,6 +43,7 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.security.ProxiedEntitiesUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +76,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.nifi.util.NiFiProperties; public class ThreadPoolRequestReplicator implements RequestReplicator { @@ -249,15 +249,29 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { lock.lock(); try { logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri); - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null); + return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification); } finally { lock.unlock(); } } else { - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null); + return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification); } } + @Override + public AsyncClusterResponse forwardToCoordinator(final NodeIdentifier coordinatorNodeId, final String method, final URI uri, final Object entity, final Map<String, String> headers) { + // If the user is authenticated, add them as a proxied entity so that when the receiving NiFi receives the request, + // it knows that we are acting as a proxy on behalf of the current user. + final Map<String, String> updatedHeaders = new HashMap<>(headers); + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + if (user != null && !user.isAnonymous()) { + final String proxiedEntitiesChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user); + updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain); + } + + return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false); + } + /** * Replicates the request to all nodes in the given set of node identifiers * @@ -268,11 +282,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @param headers the HTTP Headers * @param performVerification whether or not to verify that all nodes in the cluster are connected and that all nodes can perform request. Ignored if request is not mutable. * @param response the response to update with the results + * @param executionPhase <code>true</code> if this is the execution phase, <code>false</code> otherwise * * @return an AsyncClusterResponse that can be used to obtain the result */ private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification, - StandardAsyncClusterResponse response) { + StandardAsyncClusterResponse response, boolean executionPhase) { // state validation Objects.requireNonNull(nodeIds); @@ -355,6 +370,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { finalResponse.add(nodeResponse); }; + // instruct the node to actually perform the underlying action + if (mutableRequest && executionPhase) { + updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true"); + } + // replicate the request to all nodes final Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, nodeCompletionCallback); @@ -368,12 +388,8 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) { logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath()); - // Add the Lock Version ID to the headers so that it is used in all requests for this transaction - final String lockVersionId = UUID.randomUUID().toString(); - headers.put(RequestReplicator.LOCK_VERSION_ID_HEADER, lockVersionId); - - final Map<String, String> updatedHeaders = new HashMap<>(headers); - updatedHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE); + final Map<String, String> validationHeaders = new HashMap<>(headers); + validationHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE); final int numNodes = nodeIds.size(); final NodeRequestCompletionCallback completionCallback = new NodeRequestCompletionCallback() { @@ -404,12 +420,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // to all nodes and we are finished. if (dissentingCount == 0) { logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath()); - replicate(nodeIds, method, uri, entity, headers, false, clusterResponse); + replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true); return; } - final Map<String, String> cancelLockHeaders = new HashMap<>(updatedHeaders); - cancelLockHeaders.put(LOCK_CANCELATION_HEADER, "true"); + final Map<String, String> cancelLockHeaders = new HashMap<>(headers); + cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true"); final Thread cancelLockThread = new Thread(new Runnable() { @Override public void run() { @@ -482,10 +498,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { }; // Callback function for generating a NodeHttpRequestCallable that can be used to perform the work - final Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, updatedHeaders, completionCallback); + final Function<NodeIdentifier, NodeHttpRequest> requestFactory = nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, validationHeaders, completionCallback); // replicate the 'verification request' to all nodes - replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, updatedHeaders); + replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), requestFactory, validationHeaders); } @@ -500,9 +516,11 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } // Visible for testing - overriding this method makes it easy to verify behavior without actually making any web requests - protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId, + final Map<String, String> headers) { final ClientResponse clientResponse; final long startNanos = System.nanoTime(); + logger.debug("Replicating request to {} {}, request ID = {}, headers = {}", method, uri, requestId, headers); switch (method.toUpperCase()) { case HttpMethod.DELETE: @@ -703,7 +721,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final String requestId = headers.get("x-nifi-request-id"); logger.debug("Replicating request {} {} to {}", method, uri.getPath(), nodeId); - nodeResponse = replicateRequest(resourceBuilder, nodeId, method, uri, requestId); + nodeResponse = replicateRequest(resourceBuilder, nodeId, method, uri, requestId, headers); } catch (final Exception e) { nodeResponse = new NodeResponse(nodeId, method, uri, e); logger.warn("Failed to replicate request {} {} to {} due to {}", method, uri.getPath(), nodeId, e); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java index e699fca..ce7f452 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java @@ -164,7 +164,8 @@ public class TestThreadPoolRequestReplicator { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override - protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, + final URI uri, final String requestId, Map<String, String> givenHeaders) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata"); final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); @@ -285,7 +286,8 @@ public class TestThreadPoolRequestReplicator { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override - protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, + final URI uri, final String requestId, Map<String, String> givenHeaders) { // the resource builder will not expose its headers to us, so we are using Mockito's Whitebox class to extract them. final OutBoundHeaders headers = (OutBoundHeaders) Whitebox.getInternalState(resourceBuilder, "metadata"); final Object expectsHeader = headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER); @@ -327,7 +329,8 @@ public class TestThreadPoolRequestReplicator { final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", null, null, NiFiProperties.createBasicNiFiProperties(null, null)) { @Override - protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, final URI uri, final String requestId) { + protected NodeResponse replicateRequest(final WebResource.Builder resourceBuilder, final NodeIdentifier nodeId, final String method, + final URI uri, final String requestId, Map<String, String> givenHeaders) { if (delayMillis > 0L) { try { Thread.sleep(delayMillis); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java index 9bfcbc0..b547dc6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java @@ -170,7 +170,7 @@ public class AccessPolicyResource extends ApplicationResource { * Creates a new access policy. * * @param httpServletRequest request - * @param accessPolicyEntity An accessPolicyEntity. + * @param requestAccessPolicyEntity An accessPolicyEntity. * @return An accessPolicyEntity. */ @POST @@ -197,22 +197,22 @@ public class AccessPolicyResource extends ApplicationResource { @ApiParam( value = "The access policy configuration details.", required = true - ) final AccessPolicyEntity accessPolicyEntity) { + ) final AccessPolicyEntity requestAccessPolicyEntity) { // ensure we're running with a configurable authorizer if (!(authorizer instanceof AbstractPolicyBasedAuthorizer)) { throw new IllegalStateException(AccessPolicyDAO.MSG_NON_ABSTRACT_POLICY_BASED_AUTHORIZER); } - if (accessPolicyEntity == null || accessPolicyEntity.getComponent() == null) { + if (requestAccessPolicyEntity == null || requestAccessPolicyEntity.getComponent() == null) { throw new IllegalArgumentException("Access policy details must be specified."); } - if (accessPolicyEntity.getRevision() == null || (accessPolicyEntity.getRevision().getVersion() == null || accessPolicyEntity.getRevision().getVersion() != 0)) { + if (requestAccessPolicyEntity.getRevision() == null || (requestAccessPolicyEntity.getRevision().getVersion() == null || requestAccessPolicyEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Policy."); } - final AccessPolicyDTO requestAccessPolicy = accessPolicyEntity.getComponent(); + final AccessPolicyDTO requestAccessPolicy = requestAccessPolicyEntity.getComponent(); if (requestAccessPolicy.getId() != null) { throw new IllegalArgumentException("Access policy ID cannot be specified."); } @@ -225,35 +225,36 @@ public class AccessPolicyResource extends ApplicationResource { RequestAction.valueOfValue(requestAccessPolicy.getAction()); if (isReplicateRequest()) { - return replicate(HttpMethod.POST, accessPolicyEntity); + return replicate(HttpMethod.POST, requestAccessPolicyEntity); } // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable accessPolicies = lookup.getAccessPolicyByResource(requestAccessPolicy.getResource()); - accessPolicies.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } + return withWriteLock( + serviceFacade, + requestAccessPolicyEntity, + lookup -> { + final Authorizable accessPolicies = lookup.getAccessPolicyByResource(requestAccessPolicy.getResource()); + accessPolicies.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + null, + accessPolicyEntity -> { + final AccessPolicyDTO accessPolicy = accessPolicyEntity.getComponent(); - // set the access policy id as appropriate - requestAccessPolicy.setId(generateUuid()); + // set the access policy id as appropriate + accessPolicy.setId(generateUuid()); - // get revision from the config - final RevisionDTO revisionDTO = accessPolicyEntity.getRevision(); - Revision revision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), accessPolicyEntity.getComponent().getId()); + // get revision from the config + final RevisionDTO revisionDTO = accessPolicyEntity.getRevision(); + Revision revision = new Revision(revisionDTO.getVersion(), revisionDTO.getClientId(), accessPolicyEntity.getComponent().getId()); - // create the access policy and generate the json - final AccessPolicyEntity entity = serviceFacade.createAccessPolicy(revision, accessPolicyEntity.getComponent()); - populateRemainingAccessPolicyEntityContent(entity); + // create the access policy and generate the json + final AccessPolicyEntity entity = serviceFacade.createAccessPolicy(revision, accessPolicyEntity.getComponent()); + populateRemainingAccessPolicyEntityContent(entity); - // build the response - return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + // build the response + return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + } + ); } /** @@ -316,7 +317,7 @@ public class AccessPolicyResource extends ApplicationResource { * * @param httpServletRequest request * @param id The id of the access policy to update. - * @param accessPolicyEntity An accessPolicyEntity. + * @param requestAccessPolicyEntity An accessPolicyEntity. * @return An accessPolicyEntity. */ @PUT @@ -349,43 +350,46 @@ public class AccessPolicyResource extends ApplicationResource { @ApiParam( value = "The access policy configuration details.", required = true - ) final AccessPolicyEntity accessPolicyEntity) { + ) final AccessPolicyEntity requestAccessPolicyEntity) { // ensure we're running with a configurable authorizer if (!(authorizer instanceof AbstractPolicyBasedAuthorizer)) { throw new IllegalStateException(AccessPolicyDAO.MSG_NON_ABSTRACT_POLICY_BASED_AUTHORIZER); } - if (accessPolicyEntity == null || accessPolicyEntity.getComponent() == null) { + if (requestAccessPolicyEntity == null || requestAccessPolicyEntity.getComponent() == null) { throw new IllegalArgumentException("Access policy details must be specified."); } - if (accessPolicyEntity.getRevision() == null) { + if (requestAccessPolicyEntity.getRevision() == null) { throw new IllegalArgumentException("Revision must be specified."); } // ensure the ids are the same - final AccessPolicyDTO accessPolicyDTO = accessPolicyEntity.getComponent(); - if (!id.equals(accessPolicyDTO.getId())) { + final AccessPolicyDTO requestAccessPolicyDTO = requestAccessPolicyEntity.getComponent(); + if (!id.equals(requestAccessPolicyDTO.getId())) { throw new IllegalArgumentException(String.format("The access policy id (%s) in the request body does not equal the " - + "access policy id of the requested resource (%s).", accessPolicyDTO.getId(), id)); + + "access policy id of the requested resource (%s).", requestAccessPolicyDTO.getId(), id)); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, accessPolicyEntity); + return replicate(HttpMethod.PUT, requestAccessPolicyEntity); } // Extract the revision - final Revision revision = getRevision(accessPolicyEntity, id); + final Revision requestRevision = getRevision(requestAccessPolicyEntity, id); return withWriteLock( serviceFacade, - revision, + requestAccessPolicyEntity, + requestRevision, lookup -> { Authorizable authorizable = lookup.getAccessPolicyById(id); authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, null, - () -> { + (revision, accessPolicyEntity) -> { + final AccessPolicyDTO accessPolicyDTO = accessPolicyEntity.getComponent(); + // update the access policy final AccessPolicyEntity entity = serviceFacade.updateAccessPolicy(revision, accessPolicyDTO); populateRemainingAccessPolicyEntityContent(entity); @@ -454,20 +458,23 @@ public class AccessPolicyResource extends ApplicationResource { return replicate(HttpMethod.DELETE); } + final AccessPolicyEntity requestAccessPolicyEntity = new AccessPolicyEntity(); + requestAccessPolicyEntity.setId(id); + // handle expects request (usually from the cluster manager) - final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); return withWriteLock( serviceFacade, - revision, + requestAccessPolicyEntity, + requestRevision, lookup -> { final Authorizable accessPolicy = lookup.getAccessPolicyById(id); accessPolicy.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, - () -> { - }, - () -> { + null, + (revision, accessPolicyEntity) -> { // delete the specified access policy - final AccessPolicyEntity entity = serviceFacade.deleteAccessPolicy(revision, id); + final AccessPolicyEntity entity = serviceFacade.deleteAccessPolicy(revision, accessPolicyEntity.getId()); return clusterContext(generateOkResponse(entity)).build(); } ); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index b233a35..4f251dd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.web.api; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.sun.jersey.api.core.HttpContext; import com.sun.jersey.api.representation.Form; import com.sun.jersey.core.util.MultivaluedMapImpl; @@ -49,7 +51,10 @@ import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.SnippetDTO; import org.apache.nifi.web.api.entity.ComponentEntity; +import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.TransactionResultEntity; +import org.apache.nifi.web.security.ProxiedEntitiesUtils; +import org.apache.nifi.web.security.util.CacheKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,8 +80,10 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Supplier; +import java.util.function.Function; import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static org.apache.commons.lang3.StringUtils.isEmpty; @@ -114,6 +121,8 @@ public abstract class ApplicationResource { private RequestReplicator requestReplicator; private ClusterCoordinator clusterCoordinator; + private static final int MAX_CACHE_SOFT_LIMIT = 500; + private final Cache<CacheKey, Request<? extends Entity>> twoPhaseCommitCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build(); /** * Generate a resource uri based off of the specified parameters. @@ -348,8 +357,8 @@ public abstract class ApplicationResource { * @return <code>true</code> if the request represents a two-phase commit style request */ protected boolean isTwoPhaseRequest(final HttpServletRequest httpServletRequest) { - final String headerValue = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); - return headerValue != null; + final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); + return transactionId != null && isConnectedToCluster(); } /** @@ -366,6 +375,14 @@ public abstract class ApplicationResource { return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) != null; } + protected boolean isExecutionPhase(final HttpServletRequest httpServletRequest) { + return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER) != null; + } + + protected boolean isCancellationPhase(final HttpServletRequest httpServletRequest) { + return isTwoPhaseRequest(httpServletRequest) && httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER) != null; + } + /** * Checks whether or not the request should be replicated to the cluster * @@ -377,6 +394,7 @@ public abstract class ApplicationResource { return false; } + // If not connected to the cluster, we do not replicate if (!isConnectedToCluster()) { return false; } @@ -468,12 +486,43 @@ public abstract class ApplicationResource { * @param action executor * @return the response */ - protected Response withWriteLock(final NiFiServiceFacade serviceFacade, final Revision revision, final AuthorizeAccess authorizer, - final Runnable verifier, final Supplier<Response> action) { + protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final Revision revision, final AuthorizeAccess authorizer, + final Runnable verifier, final BiFunction<Revision, T, Response> action) { final NiFiUser user = NiFiUserUtils.getNiFiUser(); - return withWriteLock(serviceFacade, authorizer, verifier, action, - () -> serviceFacade.verifyRevision(revision, user)); + + if (isTwoPhaseRequest(httpServletRequest)) { + if (isValidationPhase(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(authorizer); + serviceFacade.verifyRevision(revision, user); + + // verify if necessary + if (verifier != null) { + verifier.run(); + } + + // store the request + phaseOneStoreTransaction(entity, revision, null); + + return generateContinueResponse().build(); + } else if (isExecutionPhase(httpServletRequest)) { + // get the original request and run the action + final Request<T> phaseOneRequest = phaseTwoVerifyTransaction(); + return action.apply(phaseOneRequest.getRevision(), phaseOneRequest.getRequest()); + } else if (isCancellationPhase(httpServletRequest)) { + cancelTransaction(); + return generateOkResponse().build(); + } else { + throw new IllegalStateException("This request does not appear to be part of the two phase commit."); + } + } else { + // authorize access and run the action + serviceFacade.authorizeAccess(authorizer); + serviceFacade.verifyRevision(revision, user); + + return action.apply(revision, entity); + } } /** @@ -486,43 +535,197 @@ public abstract class ApplicationResource { * @param action executor * @return the response */ - protected Response withWriteLock(final NiFiServiceFacade serviceFacade, final Set<Revision> revisions, final AuthorizeAccess authorizer, - final Runnable verifier, final Supplier<Response> action) { + protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final Set<Revision> revisions, final AuthorizeAccess authorizer, + final Runnable verifier, final BiFunction<Set<Revision>, T, Response> action) { + final NiFiUser user = NiFiUserUtils.getNiFiUser(); - return withWriteLock(serviceFacade, authorizer, verifier, action, - () -> serviceFacade.verifyRevisions(revisions, user)); - } + if (isTwoPhaseRequest(httpServletRequest)) { + if (isValidationPhase(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(authorizer); + serviceFacade.verifyRevisions(revisions, user); + + // verify if necessary + if (verifier != null) { + verifier.run(); + } + + // store the request + phaseOneStoreTransaction(entity, null, revisions); + + return generateContinueResponse().build(); + } else if (isExecutionPhase(httpServletRequest)) { + // get the original request and run the action + final Request<T> phaseOneRequest = phaseTwoVerifyTransaction(); + return action.apply(phaseOneRequest.getRevisions(), phaseOneRequest.getRequest()); + } else if (isCancellationPhase(httpServletRequest)) { + cancelTransaction(); + return generateOkResponse().build(); + } else { + throw new IllegalStateException("This request does not appear to be part of the two phase commit."); + } + } else { + // authorize access and run the action + serviceFacade.authorizeAccess(authorizer); + serviceFacade.verifyRevisions(revisions, user); + + return action.apply(revisions, entity); + } + } /** - * Executes an action through the service facade using the specified revision. + * Executes an action through the service facade. * * @param serviceFacade service facade * @param authorizer authorizer * @param verifier verifier * @param action the action to execute - * @param verifyRevision a callback that will claim the necessary revisions for the operation * @return the response */ - private Response withWriteLock( - final NiFiServiceFacade serviceFacade, final AuthorizeAccess authorizer, final Runnable verifier, final Supplier<Response> action, - final Runnable verifyRevision) { + protected <T extends Entity> Response withWriteLock(final NiFiServiceFacade serviceFacade, final T entity, final AuthorizeAccess authorizer, + final Runnable verifier, final Function<T, Response> action) { - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { + if (isTwoPhaseRequest(httpServletRequest)) { + if (isValidationPhase(httpServletRequest)) { + // authorize access + serviceFacade.authorizeAccess(authorizer); + + // verify if necessary + if (verifier != null) { + verifier.run(); + } + + // store the request + phaseOneStoreTransaction(entity, null, null); + + return generateContinueResponse().build(); + } else if (isExecutionPhase(httpServletRequest)) { + // get the original request and run the action + final Request<T> phaseOneRequest = phaseTwoVerifyTransaction(); + return action.apply(phaseOneRequest.getRequest()); + } else if (isCancellationPhase(httpServletRequest)) { + cancelTransaction(); + return generateOkResponse().build(); + } else { + throw new IllegalStateException("This request does not appear to be part of the two phase commit."); + } + } else { // authorize access serviceFacade.authorizeAccess(authorizer); - verifyRevision.run(); + + // run the action + return action.apply(entity); } + } - if (validationPhase) { - if (verifier != null) { - verifier.run(); + private <T extends Entity> void phaseOneStoreTransaction(final T requestEntity, final Revision revision, final Set<Revision> revisions) { + if (twoPhaseCommitCache.size() > MAX_CACHE_SOFT_LIMIT) { + throw new IllegalStateException("The maximum number of requests are in progress."); + } + + // get the transaction id + final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); + if (StringUtils.isBlank(transactionId)) { + throw new IllegalArgumentException("Two phase commit Transaction Id missing."); + } + + synchronized (twoPhaseCommitCache) { + final CacheKey key = new CacheKey(transactionId); + if (twoPhaseCommitCache.getIfPresent(key) != null) { + throw new IllegalStateException("Transaction " + transactionId + " is already in progress."); + } + + // store the entry for the second phase + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final Request<T> request = new Request<>(ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user), getAbsolutePath().toString(), revision, revisions, requestEntity); + twoPhaseCommitCache.put(key, request); + } + } + + private <T extends Entity> Request<T> phaseTwoVerifyTransaction() { + // get the transaction id + final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); + if (StringUtils.isBlank(transactionId)) { + throw new IllegalArgumentException("Two phase commit Transaction Id missing."); + } + + // get the entry for the second phase + final Request<T> request; + synchronized (twoPhaseCommitCache) { + final CacheKey key = new CacheKey(transactionId); + request = (Request<T>) twoPhaseCommitCache.getIfPresent(key); + if (request == null) { + throw new IllegalArgumentException("The request from phase one is missing."); } - return generateContinueResponse().build(); + + twoPhaseCommitCache.invalidate(key); } + final String phaseOneChain = request.getUserChain(); - return action.get(); + // build the chain for the current request + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final String phaseTwoChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user); + + if (phaseOneChain == null || !phaseOneChain.equals(phaseTwoChain)) { + throw new IllegalArgumentException("The same user must issue the request for phase one and two."); + } + + final String phaseOneUri = request.getUri(); + if (phaseOneUri == null || !phaseOneUri.equals(getAbsolutePath().toString())) { + throw new IllegalArgumentException("The URI must be the same for phase one and two."); + } + + return request; + } + + private void cancelTransaction() { + // get the transaction id + final String transactionId = httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER); + if (StringUtils.isBlank(transactionId)) { + throw new IllegalArgumentException("Two phase commit Transaction Id missing."); + } + + synchronized (twoPhaseCommitCache) { + final CacheKey key = new CacheKey(transactionId); + twoPhaseCommitCache.invalidate(key); + } + } + + private final class Request<T extends Entity> { + final String userChain; + final String uri; + final Revision revision; + final Set<Revision> revisions; + final T request; + + public Request(String userChain, String uri, Revision revision, Set<Revision> revisions, T request) { + this.userChain = userChain; + this.uri = uri; + this.revision = revision; + this.revisions = revisions; + this.request = request; + } + + public String getUserChain() { + return userChain; + } + + public String getUri() { + return uri; + } + + public Revision getRevision() { + return revision; + } + + public Set<Revision> getRevisions() { + return revisions; + } + + public T getRequest() { + return request; + } } /** @@ -713,7 +916,7 @@ public abstract class ApplicationResource { if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) { return requestReplicator.replicate(method, path, entity, headers).awaitMergedResponse(); } else { - return requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), method, path, entity, headers, false, true).awaitMergedResponse(); + return requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, path, entity, headers).awaitMergedResponse(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index f7fdadf..11abf86 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -148,7 +148,7 @@ public class ConnectionResource extends ApplicationResource { * * @param httpServletRequest request * @param id The id of the connection. - * @param connectionEntity A connectionEntity. + * @param requestConnectionEntity A connectionEntity. * @return A connectionEntity. * @throws InterruptedException if interrupted */ @@ -185,36 +185,37 @@ public class ConnectionResource extends ApplicationResource { @ApiParam( value = "The connection configuration details.", required = true - ) final ConnectionEntity connectionEntity) throws InterruptedException { + ) final ConnectionEntity requestConnectionEntity) throws InterruptedException { - if (connectionEntity == null || connectionEntity.getComponent() == null) { + if (requestConnectionEntity == null || requestConnectionEntity.getComponent() == null) { throw new IllegalArgumentException("Connection details must be specified."); } - if (connectionEntity.getRevision() == null) { + if (requestConnectionEntity.getRevision() == null) { throw new IllegalArgumentException("Revision must be specified."); } // ensure the ids are the same - final ConnectionDTO connection = connectionEntity.getComponent(); - if (!id.equals(connection.getId())) { + final ConnectionDTO requestConnection = requestConnectionEntity.getComponent(); + if (!id.equals(requestConnection.getId())) { throw new IllegalArgumentException(String.format("The connection id " + "(%s) in the request body does not equal the connection id of the " - + "requested resource (%s).", connection.getId(), id)); + + "requested resource (%s).", requestConnection.getId(), id)); } - if (connection.getDestination() != null && connection.getDestination().getId() == null) { + if (requestConnection.getDestination() != null && requestConnection.getDestination().getId() == null) { throw new IllegalArgumentException("When specifying a destination component, the destination id is required."); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, connectionEntity); + return replicate(HttpMethod.PUT, requestConnectionEntity); } - final Revision revision = getRevision(connectionEntity, id); + final Revision requestRevision = getRevision(requestConnectionEntity, id); return withWriteLock( serviceFacade, - revision, + requestConnectionEntity, + requestRevision, lookup -> { // verifies write access to this connection (this checks the current source and destination) ConnectionAuthorizable connAuth = lookup.getConnection(id); @@ -222,17 +223,19 @@ public class ConnectionResource extends ApplicationResource { // if a destination has been specified and is different final Connectable currentDestination = connAuth.getDestination(); - if (connection.getDestination() != null && currentDestination.getIdentifier().equals(connection.getDestination().getId())) { + if (requestConnection.getDestination() != null && currentDestination.getIdentifier().equals(requestConnection.getDestination().getId())) { // verify access of the new destination (current destination was already authorized as part of the connection check) - final Authorizable newDestinationAuthorizable = lookup.getConnectable(connection.getDestination().getId()); + final Authorizable newDestinationAuthorizable = lookup.getConnectable(requestConnection.getDestination().getId()); newDestinationAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); // verify access of the parent group (this is the same check that is performed when creating the connection) connAuth.getParentGroup().authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); } }, - () -> serviceFacade.verifyUpdateConnection(connection), - () -> { + () -> serviceFacade.verifyUpdateConnection(requestConnection), + (revision, connectionEntity) -> { + final ConnectionDTO connection = connectionEntity.getComponent(); + final ConnectionEntity entity = serviceFacade.updateConnection(revision, connection); populateRemainingConnectionEntityContent(entity); @@ -296,21 +299,25 @@ public class ConnectionResource extends ApplicationResource { // determine the specified version final Long clientVersion = version == null ? null : version.getLong(); - final Revision revision = new Revision(clientVersion, clientId.getClientId(), id); + final Revision requestRevision = new Revision(clientVersion, clientId.getClientId(), id); + + final ConnectionEntity requestConnectionEntity = new ConnectionEntity(); + requestConnectionEntity.setId(id); // get the current user return withWriteLock( serviceFacade, - revision, + requestConnectionEntity, + requestRevision, lookup -> { // verifies write access to the source and destination final Authorizable authorizable = lookup.getConnection(id).getAuthorizable(); authorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, () -> serviceFacade.verifyDeleteConnection(id), - () -> { + (revision, connectionEntity) -> { // delete the connection - final ConnectionEntity entity = serviceFacade.deleteConnection(revision, id); + final ConnectionEntity entity = serviceFacade.deleteConnection(revision, connectionEntity.getId()); // generate the response return clusterContext(generateOkResponse(entity)).build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index f700ced..f5ff188 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -47,6 +47,7 @@ import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.entity.ClusterEntity; import org.apache.nifi.web.api.entity.ControllerConfigurationEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.HistoryEntity; import org.apache.nifi.web.api.entity.NodeEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; @@ -67,6 +68,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.net.URI; +import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -159,7 +161,7 @@ public class ControllerResource extends ApplicationResource { * Update the configuration for this NiFi. * * @param httpServletRequest request - * @param configEntity A controllerConfigurationEntity. + * @param requestConfigEntity A controllerConfigurationEntity. * @return A controllerConfigurationEntity. */ @PUT @@ -186,29 +188,30 @@ public class ControllerResource extends ApplicationResource { @ApiParam( value = "The controller configuration.", required = true - ) final ControllerConfigurationEntity configEntity) { + ) final ControllerConfigurationEntity requestConfigEntity) { - if (configEntity == null || configEntity.getComponent() == null) { + if (requestConfigEntity == null || requestConfigEntity.getComponent() == null) { throw new IllegalArgumentException("Controller configuration must be specified"); } - if (configEntity.getRevision() == null) { + if (requestConfigEntity.getRevision() == null) { throw new IllegalArgumentException("Revision must be specified."); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, configEntity); + return replicate(HttpMethod.PUT, requestConfigEntity); } - final Revision revision = getRevision(configEntity.getRevision(), FlowController.class.getSimpleName()); + final Revision requestRevision = getRevision(requestConfigEntity.getRevision(), FlowController.class.getSimpleName()); return withWriteLock( serviceFacade, - revision, + requestConfigEntity, + requestRevision, lookup -> { authorizeController(RequestAction.WRITE); }, null, - () -> { + (revision, configEntity) -> { final ControllerConfigurationEntity entity = serviceFacade.updateControllerConfiguration(revision, configEntity.getComponent()); return clusterContext(generateOkResponse(entity)).build(); } @@ -223,7 +226,7 @@ public class ControllerResource extends ApplicationResource { * Creates a new Reporting Task. * * @param httpServletRequest request - * @param reportingTaskEntity A reportingTaskEntity. + * @param requestReportingTaskEntity A reportingTaskEntity. * @return A reportingTaskEntity. */ @POST @@ -251,17 +254,17 @@ public class ControllerResource extends ApplicationResource { @ApiParam( value = "The reporting task configuration details.", required = true - ) final ReportingTaskEntity reportingTaskEntity) { + ) final ReportingTaskEntity requestReportingTaskEntity) { - if (reportingTaskEntity == null || reportingTaskEntity.getComponent() == null) { + if (requestReportingTaskEntity == null || requestReportingTaskEntity.getComponent() == null) { throw new IllegalArgumentException("Reporting task details must be specified."); } - if (reportingTaskEntity.getRevision() == null || (reportingTaskEntity.getRevision().getVersion() == null || reportingTaskEntity.getRevision().getVersion() != 0)) { + if (requestReportingTaskEntity.getRevision() == null || (requestReportingTaskEntity.getRevision().getVersion() == null || requestReportingTaskEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Reporting task."); } - final ReportingTaskDTO requestReportingTask = reportingTaskEntity.getComponent(); + final ReportingTaskDTO requestReportingTask = requestReportingTaskEntity.getComponent(); if (requestReportingTask.getId() != null) { throw new IllegalArgumentException("Reporting task ID cannot be specified."); } @@ -271,36 +274,36 @@ public class ControllerResource extends ApplicationResource { } if (isReplicateRequest()) { - return replicate(HttpMethod.POST, reportingTaskEntity); + return replicate(HttpMethod.POST, requestReportingTaskEntity); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - authorizeController(RequestAction.WRITE); + return withWriteLock( + serviceFacade, + requestReportingTaskEntity, + lookup -> { + authorizeController(RequestAction.WRITE); - if (requestReportingTask.getProperties() != null) { - final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getReportingTaskByType(requestReportingTask.getType()); - AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestReportingTask.getProperties(), authorizable, authorizer, lookup); - } - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } + if (requestReportingTask.getProperties() != null) { + final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getReportingTaskByType(requestReportingTask.getType()); + AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestReportingTask.getProperties(), authorizable, authorizer, lookup); + } + }, + null, + (reportingTaskEntity) -> { + final ReportingTaskDTO reportingTask = reportingTaskEntity.getComponent(); - // set the processor id as appropriate - requestReportingTask.setId(generateUuid()); + // set the processor id as appropriate + reportingTask.setId(generateUuid()); - // create the reporting task and generate the json - final Revision revision = getRevision(reportingTaskEntity, requestReportingTask.getId()); - final ReportingTaskEntity entity = serviceFacade.createReportingTask(revision, requestReportingTask); - reportingTaskResource.populateRemainingReportingTaskEntityContent(entity); + // create the reporting task and generate the json + final Revision revision = getRevision(reportingTaskEntity, reportingTask.getId()); + final ReportingTaskEntity entity = serviceFacade.createReportingTask(revision, reportingTask); + reportingTaskResource.populateRemainingReportingTaskEntityContent(entity); - // build the response - return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + // build the response + return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + } + ); } // ------------------- @@ -311,7 +314,7 @@ public class ControllerResource extends ApplicationResource { * Creates a new Controller Service. * * @param httpServletRequest request - * @param controllerServiceEntity A controllerServiceEntity. + * @param requestControllerServiceEntity A controllerServiceEntity. * @return A controllerServiceEntity. */ @POST @@ -339,17 +342,17 @@ public class ControllerResource extends ApplicationResource { @ApiParam( value = "The controller service configuration details.", required = true - ) final ControllerServiceEntity controllerServiceEntity) { + ) final ControllerServiceEntity requestControllerServiceEntity) { - if (controllerServiceEntity == null || controllerServiceEntity.getComponent() == null) { + if (requestControllerServiceEntity == null || requestControllerServiceEntity.getComponent() == null) { throw new IllegalArgumentException("Controller service details must be specified."); } - if (controllerServiceEntity.getRevision() == null || (controllerServiceEntity.getRevision().getVersion() == null || controllerServiceEntity.getRevision().getVersion() != 0)) { + if (requestControllerServiceEntity.getRevision() == null || (requestControllerServiceEntity.getRevision().getVersion() == null || requestControllerServiceEntity.getRevision().getVersion() != 0)) { throw new IllegalArgumentException("A revision of 0 must be specified when creating a new Controller service."); } - final ControllerServiceDTO requestControllerService = controllerServiceEntity.getComponent(); + final ControllerServiceDTO requestControllerService = requestControllerServiceEntity.getComponent(); if (requestControllerService.getId() != null) { throw new IllegalArgumentException("Controller service ID cannot be specified."); } @@ -363,36 +366,36 @@ public class ControllerResource extends ApplicationResource { } if (isReplicateRequest()) { - return replicate(HttpMethod.POST, controllerServiceEntity); + return replicate(HttpMethod.POST, requestControllerServiceEntity); } - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - authorizeController(RequestAction.WRITE); + return withWriteLock( + serviceFacade, + requestControllerServiceEntity, + lookup -> { + authorizeController(RequestAction.WRITE); - if (requestControllerService.getProperties() != null) { - final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getControllerServiceByType(requestControllerService.getType()); - AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerService.getProperties(), authorizable, authorizer, lookup); - } - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } + if (requestControllerService.getProperties() != null) { + final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getControllerServiceByType(requestControllerService.getType()); + AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerService.getProperties(), authorizable, authorizer, lookup); + } + }, + null, + (controllerServiceEntity) -> { + final ControllerServiceDTO controllerService = controllerServiceEntity.getComponent(); - // set the processor id as appropriate - requestControllerService.setId(generateUuid()); + // set the processor id as appropriate + controllerService.setId(generateUuid()); - // create the controller service and generate the json - final Revision revision = getRevision(controllerServiceEntity, requestControllerService.getId()); - final ControllerServiceEntity entity = serviceFacade.createControllerService(revision, null, requestControllerService); - controllerServiceResource.populateRemainingControllerServiceEntityContent(entity); + // create the controller service and generate the json + final Revision revision = getRevision(controllerServiceEntity, controllerService.getId()); + final ControllerServiceEntity entity = serviceFacade.createControllerService(revision, null, controllerService); + controllerServiceResource.populateRemainingControllerServiceEntityContent(entity); - // build the response - return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + // build the response + return clusterContext(generateCreatedResponse(URI.create(entity.getUri()), entity)).build(); + } + ); } // ------- @@ -666,26 +669,33 @@ public class ControllerResource extends ApplicationResource { // Note: History requests are not replicated throughout the cluster and are instead handled by the nodes independently - // handle expects request (usually from the cluster manager) - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - authorizeController(RequestAction.WRITE); - }); - } - if (validationPhase) { - return generateContinueResponse().build(); - } + return withWriteLock( + serviceFacade, + new EndDateEntity(endDate.getDateTime()), + lookup -> { + authorizeController(RequestAction.WRITE); + }, + null, + (endDateEtity) -> { + // purge the actions + serviceFacade.deleteActions(endDateEtity.getEndDate()); - // purge the actions - serviceFacade.deleteActions(endDate.getDateTime()); + // generate the response + return generateOkResponse(new HistoryEntity()).build(); + } + ); + } - // create the response entity - final HistoryEntity entity = new HistoryEntity(); + private class EndDateEntity extends Entity { + final Date endDate; - // generate the response - return generateOkResponse(entity).build(); + public EndDateEntity(Date endDate) { + this.endDate = endDate; + } + + public Date getEndDate() { + return endDate; + } } // setters http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java index 0b9434a..9b3805a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java @@ -346,27 +346,28 @@ public class ControllerServiceResource extends ApplicationResource { return replicate(HttpMethod.POST); } - final boolean validationPhase = isValidationPhase(httpServletRequest); - if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) { - // authorize access - serviceFacade.authorizeAccess(lookup -> { - final Authorizable controllerService = lookup.getControllerService(id).getAuthorizable(); - controllerService.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); - }); - } - if (validationPhase) { - serviceFacade.verifyCanClearControllerServiceState(id); - return generateContinueResponse().build(); - } + final ControllerServiceEntity requestControllerServiceEntity = new ControllerServiceEntity(); + requestControllerServiceEntity.setId(id); - // get the component state - serviceFacade.clearControllerServiceState(id); + return withWriteLock( + serviceFacade, + requestControllerServiceEntity, + lookup -> { + final Authorizable controllerService = lookup.getControllerService(id).getAuthorizable(); + controllerService.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + () -> serviceFacade.verifyCanClearControllerServiceState(id), + (controllerServiceEntity) -> { + // get the component state + serviceFacade.clearControllerServiceState(controllerServiceEntity.getId()); - // generate the response entity - final ComponentStateEntity entity = new ComponentStateEntity(); + // generate the response entity + final ComponentStateEntity entity = new ComponentStateEntity(); - // generate the response - return clusterContext(generateOkResponse(entity)).build(); + // generate the response + return clusterContext(generateOkResponse(entity)).build(); + } + ); } /** @@ -422,7 +423,7 @@ public class ControllerServiceResource extends ApplicationResource { * Updates the references of the specified controller service. * * @param httpServletRequest request - * @param updateReferenceRequest The update request + * @param requestUpdateReferenceRequest The update request * @return A controllerServiceReferencingComponentsEntity. */ @PUT @@ -455,13 +456,13 @@ public class ControllerServiceResource extends ApplicationResource { @ApiParam( value = "The controller service request update request.", required = true - ) final UpdateControllerServiceReferenceRequestEntity updateReferenceRequest) { + ) final UpdateControllerServiceReferenceRequestEntity requestUpdateReferenceRequest) { - if (updateReferenceRequest.getId() == null) { + if (requestUpdateReferenceRequest.getId() == null) { throw new IllegalArgumentException("The controller service identifier must be specified."); } - if (updateReferenceRequest.getReferencingComponentRevisions() == null) { + if (requestUpdateReferenceRequest.getReferencingComponentRevisions() == null) { throw new IllegalArgumentException("The controller service referencing components revisions must be specified."); } @@ -471,14 +472,14 @@ public class ControllerServiceResource extends ApplicationResource { // but not referencing schedulable components ControllerServiceState requestControllerServiceState = null; try { - requestControllerServiceState = ControllerServiceState.valueOf(updateReferenceRequest.getState()); + requestControllerServiceState = ControllerServiceState.valueOf(requestUpdateReferenceRequest.getState()); } catch (final IllegalArgumentException iae) { // ignore } ScheduledState requestScheduledState = null; try { - requestScheduledState = ScheduledState.valueOf(updateReferenceRequest.getState()); + requestScheduledState = ScheduledState.valueOf(requestUpdateReferenceRequest.getState()); } catch (final IllegalArgumentException iae) { // ignore } @@ -498,30 +499,51 @@ public class ControllerServiceResource extends ApplicationResource { } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, updateReferenceRequest); + return replicate(HttpMethod.PUT, requestUpdateReferenceRequest); } // convert the referencing revisions - final Map<String, Revision> referencingRevisions = updateReferenceRequest.getReferencingComponentRevisions().entrySet().stream() + final Map<String, Revision> requestReferencingRevisions = requestUpdateReferenceRequest.getReferencingComponentRevisions().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> { final RevisionDTO rev = e.getValue(); return new Revision(rev.getVersion(), rev.getClientId(), e.getKey()); })); - final Set<Revision> revisions = new HashSet<>(referencingRevisions.values()); + final Set<Revision> requestRevisions = new HashSet<>(requestReferencingRevisions.values()); - final ScheduledState scheduledState = requestScheduledState; - final ControllerServiceState controllerServiceState = requestControllerServiceState; + final ScheduledState verifyScheduledState = requestScheduledState; + final ControllerServiceState verifyControllerServiceState = requestControllerServiceState; return withWriteLock( serviceFacade, - revisions, + requestUpdateReferenceRequest, + requestRevisions, lookup -> { - referencingRevisions.entrySet().stream().forEach(e -> { + requestReferencingRevisions.entrySet().stream().forEach(e -> { final Authorizable controllerService = lookup.getControllerServiceReferencingComponent(id, e.getKey()); controllerService.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }); }, - () -> serviceFacade.verifyUpdateControllerServiceReferencingComponents(updateReferenceRequest.getId(), scheduledState, controllerServiceState), - () -> { + () -> serviceFacade.verifyUpdateControllerServiceReferencingComponents(requestUpdateReferenceRequest.getId(), verifyScheduledState, verifyControllerServiceState), + (revisions, updateReferenceRequest) -> { + ScheduledState scheduledState = null; + try { + scheduledState = ScheduledState.valueOf(updateReferenceRequest.getState()); + } catch (final IllegalArgumentException e) { + // ignore + } + + ControllerServiceState controllerServiceState = null; + try { + controllerServiceState = ControllerServiceState.valueOf(updateReferenceRequest.getState()); + } catch (final IllegalArgumentException iae) { + // ignore + } + + final Map<String, Revision> referencingRevisions = updateReferenceRequest.getReferencingComponentRevisions().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> { + final RevisionDTO rev = e.getValue(); + return new Revision(rev.getVersion(), rev.getClientId(), e.getKey()); + })); + // update the controller service references final ControllerServiceReferencingComponentsEntity entity = serviceFacade.updateControllerServiceReferencingComponents( referencingRevisions, updateReferenceRequest.getId(), scheduledState, controllerServiceState); @@ -536,7 +558,7 @@ public class ControllerServiceResource extends ApplicationResource { * * @param httpServletRequest request * @param id The id of the controller service to update. - * @param controllerServiceEntity A controllerServiceEntity. + * @param requestControllerServiceEntity A controllerServiceEntity. * @return A controllerServiceEntity. */ @PUT @@ -570,32 +592,33 @@ public class ControllerServiceResource extends ApplicationResource { @ApiParam( value = "The controller service configuration details.", required = true - ) final ControllerServiceEntity controllerServiceEntity) { + ) final ControllerServiceEntity requestControllerServiceEntity) { - if (controllerServiceEntity == null || controllerServiceEntity.getComponent() == null) { + if (requestControllerServiceEntity == null || requestControllerServiceEntity.getComponent() == null) { throw new IllegalArgumentException("Controller service details must be specified."); } - if (controllerServiceEntity.getRevision() == null) { + if (requestControllerServiceEntity.getRevision() == null) { throw new IllegalArgumentException("Revision must be specified."); } // ensure the ids are the same - final ControllerServiceDTO requestControllerServiceDTO = controllerServiceEntity.getComponent(); + final ControllerServiceDTO requestControllerServiceDTO = requestControllerServiceEntity.getComponent(); if (!id.equals(requestControllerServiceDTO.getId())) { throw new IllegalArgumentException(String.format("The controller service id (%s) in the request body does not equal the " + "controller service id of the requested resource (%s).", requestControllerServiceDTO.getId(), id)); } if (isReplicateRequest()) { - return replicate(HttpMethod.PUT, controllerServiceEntity); + return replicate(HttpMethod.PUT, requestControllerServiceEntity); } // handle expects request (usually from the cluster manager) - final Revision revision = getRevision(controllerServiceEntity, id); + final Revision requestRevision = getRevision(requestControllerServiceEntity, id); return withWriteLock( serviceFacade, - revision, + requestControllerServiceEntity, + requestRevision, lookup -> { // authorize the service final ControllerServiceReferencingComponentAuthorizable authorizable = lookup.getControllerService(id); @@ -605,9 +628,11 @@ public class ControllerServiceResource extends ApplicationResource { AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerServiceDTO.getProperties(), authorizable, authorizer, lookup); }, () -> serviceFacade.verifyUpdateControllerService(requestControllerServiceDTO), - () -> { + (revision, controllerServiceEntity) -> { + final ControllerServiceDTO controllerService = controllerServiceEntity.getComponent(); + // update the controller service - final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, requestControllerServiceDTO); + final ControllerServiceEntity entity = serviceFacade.updateControllerService(revision, controllerService); populateRemainingControllerServiceEntityContent(entity); return clusterContext(generateOkResponse(entity)).build(); @@ -669,19 +694,23 @@ public class ControllerServiceResource extends ApplicationResource { return replicate(HttpMethod.DELETE); } + final ControllerServiceEntity requestControllerServiceEntity = new ControllerServiceEntity(); + requestControllerServiceEntity.setId(id); + // handle expects request (usually from the cluster manager) - final Revision revision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); + final Revision requestRevision = new Revision(version == null ? null : version.getLong(), clientId.getClientId(), id); return withWriteLock( serviceFacade, - revision, + requestControllerServiceEntity, + requestRevision, lookup -> { final Authorizable controllerService = lookup.getControllerService(id).getAuthorizable(); controllerService.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); }, () -> serviceFacade.verifyDeleteControllerService(id), - () -> { + (revision, controllerServiceEntity) -> { // delete the specified controller service - final ControllerServiceEntity entity = serviceFacade.deleteControllerService(revision, id); + final ControllerServiceEntity entity = serviceFacade.deleteControllerService(revision, controllerServiceEntity.getId()); return clusterContext(generateOkResponse(entity)).build(); } );
