NIFI-2635: - Re-using the original request during the second phase of the two 
phase commit. - Forwarding requests to the coordinator when received by a node.

This closes #933

Signed-off-by: jpercivall <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c2bfc4ef
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c2bfc4ef
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c2bfc4ef

Branch: refs/heads/master
Commit: c2bfc4ef245d5c95dc103139cf386896d6ec3023
Parents: e9da908
Author: Matt Gilman <[email protected]>
Authored: Wed Aug 24 15:39:04 2016 -0400
Committer: jpercivall <[email protected]>
Committed: Wed Aug 24 22:42:10 2016 -0400

----------------------------------------------------------------------
 nifi-commons/nifi-web-utils/pom.xml             |    4 +
 .../http/replication/RequestReplicator.java     |   24 +-
 .../ThreadPoolRequestReplicator.java            |   52 +-
 .../TestThreadPoolRequestReplicator.java        |    9 +-
 .../nifi/web/api/AccessPolicyResource.java      |   95 +-
 .../nifi/web/api/ApplicationResource.java       |  255 ++++-
 .../apache/nifi/web/api/ConnectionResource.java |   45 +-
 .../apache/nifi/web/api/ControllerResource.java |  176 +--
 .../nifi/web/api/ControllerServiceResource.java |  123 ++-
 .../apache/nifi/web/api/CountersResource.java   |   38 +-
 .../nifi/web/api/FlowFileQueueResource.java     |  237 ++--
 .../org/apache/nifi/web/api/FlowResource.java   |   45 +-
 .../org/apache/nifi/web/api/FunnelResource.java |   33 +-
 .../apache/nifi/web/api/InputPortResource.java  |   35 +-
 .../org/apache/nifi/web/api/LabelResource.java  |   35 +-
 .../apache/nifi/web/api/OutputPortResource.java |   35 +-
 .../nifi/web/api/ProcessGroupResource.java      | 1014 +++++++++---------
 .../apache/nifi/web/api/ProcessorResource.java  |   72 +-
 .../apache/nifi/web/api/ProvenanceResource.java |  182 ++--
 .../web/api/RemoteProcessGroupResource.java     |  122 ++-
 .../nifi/web/api/ReportingTaskResource.java     |   72 +-
 .../apache/nifi/web/api/SnippetResource.java    |  118 +-
 .../apache/nifi/web/api/TemplateResource.java   |   39 +-
 .../apache/nifi/web/api/TenantsResource.java    |  186 ++--
 .../nifi/web/security/otp/OtpService.java       |   42 +-
 .../apache/nifi/web/security/util/CacheKey.java |   61 ++
 .../nifi-web-ui/src/main/webapp/css/header.css  |    2 +-
 27 files changed, 1804 insertions(+), 1347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-commons/nifi-web-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-web-utils/pom.xml 
b/nifi-commons/nifi-web-utils/pom.xml
index dd2f3ef..7cc0581 100644
--- a/nifi-commons/nifi-web-utils/pom.xml
+++ b/nifi-commons/nifi-web-utils/pom.xml
@@ -36,6 +36,10 @@
         </dependency>
         <dependency>
             <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
             <artifactId>jersey-client</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
index bfe528d..b9bce0a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -38,10 +38,14 @@ public interface RequestReplicator {
     public static final int NODE_CONTINUE_STATUS_CODE = 150;
 
     /**
-     * Indicates that the request is intended to cancel a lock that was 
previously obtained without performing the action
+     * Indicates that the request is intended to cancel a transaction that was 
previously created without performing the action
      */
-    public static final String LOCK_CANCELATION_HEADER = "X-Cancel-Lock";
-    public static final String LOCK_VERSION_ID_HEADER = "X-Lock-Version-Id";
+    public static final String REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER = 
"X-Cancel-Transaction";
+
+    /**
+     * Indicates that this is the second phase of the two phase commit and the 
execution of the action should proceed.
+     */
+    public static final String REQUEST_EXECUTION_HTTP_HEADER = 
"X-Execution-Continue";
 
     /**
      * When we replicate a request across the cluster, we replicate it only 
from the cluster coordinator.
@@ -106,6 +110,20 @@ public interface RequestReplicator {
      */
     AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, 
URI uri, Object entity, Map<String, String> headers, boolean 
indicateReplicated, boolean performVerification);
 
+    
+    /**
+     * Forwards a request to the Cluster Coordinator so that it is able to 
replicate the request to all nodes in the cluster.
+     * 
+     * @param coordinatorNodeId the node identifier of the Cluster Coordinator
+     * @param method the HTTP method (e.g., POST, PUT)
+     * @param uri the base request URI (up to, but not including, the query 
string)
+     * @param entity an entity
+     * @param headers any HTTP headers
+     *
+     * @return an AsyncClusterResponse that indicates the current status of 
the request and provides an identifier for obtaining an updated response later
+     */
+    AsyncClusterResponse forwardToCoordinator(NodeIdentifier 
coordinatorNodeId, String method, URI uri, Object entity, Map<String, String> 
headers);
+    
     /**
      * <p>
      * Returns an AsyncClusterResponse that provides the most up-to-date 
status of the request with the given identifier.

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index bd4e277..e22bb79 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -43,6 +43,7 @@ import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.security.ProxiedEntitiesUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,7 +76,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import org.apache.nifi.util.NiFiProperties;
 
 public class ThreadPoolRequestReplicator implements RequestReplicator {
 
@@ -249,15 +249,29 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             lock.lock();
             try {
                 logger.debug("Lock {} obtained in order to replicate request 
{} {}", method, uri);
-                return replicate(nodeIds, method, uri, entity, updatedHeaders, 
performVerification, null);
+                return replicate(nodeIds, method, uri, entity, updatedHeaders, 
performVerification, null, !performVerification);
             } finally {
                 lock.unlock();
             }
         } else {
-            return replicate(nodeIds, method, uri, entity, updatedHeaders, 
performVerification, null);
+            return replicate(nodeIds, method, uri, entity, updatedHeaders, 
performVerification, null, !performVerification);
         }
     }
 
+    @Override
+    public AsyncClusterResponse forwardToCoordinator(final NodeIdentifier 
coordinatorNodeId, final String method, final URI uri, final Object entity, 
final Map<String, String> headers) {
+        // If the user is authenticated, add them as a proxied entity so that 
when the receiving NiFi receives the request,
+        // it knows that we are acting as a proxy on behalf of the current 
user.
+       final Map<String, String> updatedHeaders = new HashMap<>(headers);
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        if (user != null && !user.isAnonymous()) {
+            final String proxiedEntitiesChain = 
ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user);
+            updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, 
proxiedEntitiesChain);
+        }
+       
+        return replicate(Collections.singleton(coordinatorNodeId), method, 
uri, entity, updatedHeaders, false, null, false);
+    }
+    
     /**
      * Replicates the request to all nodes in the given set of node identifiers
      *
@@ -268,11 +282,12 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
      * @param headers the HTTP Headers
      * @param performVerification whether or not to verify that all nodes in 
the cluster are connected and that all nodes can perform request. Ignored if 
request is not mutable.
      * @param response the response to update with the results
+     * @param executionPhase <code>true</code> if this is the execution phase, 
<code>false</code> otherwise
      *
      * @return an AsyncClusterResponse that can be used to obtain the result
      */
     private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String 
method, URI uri, Object entity, Map<String, String> headers, boolean 
performVerification,
-            StandardAsyncClusterResponse response) {
+            StandardAsyncClusterResponse response, boolean executionPhase) {
 
         // state validation
         Objects.requireNonNull(nodeIds);
@@ -355,6 +370,11 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             finalResponse.add(nodeResponse);
         };
 
+        // instruct the node to actually perform the underlying action
+        if (mutableRequest && executionPhase) {
+            updatedHeaders.put(REQUEST_EXECUTION_HTTP_HEADER, "true");
+        }
+
         // replicate the request to all nodes
         final Function<NodeIdentifier, NodeHttpRequest> requestFactory =
             nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, 
nodeId), entity, updatedHeaders, nodeCompletionCallback);
@@ -368,12 +388,8 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
     private void performVerification(Set<NodeIdentifier> nodeIds, String 
method, URI uri, Object entity, Map<String, String> headers, 
StandardAsyncClusterResponse clusterResponse) {
         logger.debug("Verifying that mutable request {} {} can be made", 
method, uri.getPath());
 
-        // Add the Lock Version ID to the headers so that it is used in all 
requests for this transaction
-        final String lockVersionId = UUID.randomUUID().toString();
-        headers.put(RequestReplicator.LOCK_VERSION_ID_HEADER, lockVersionId);
-
-        final Map<String, String> updatedHeaders = new HashMap<>(headers);
-        updatedHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
+        final Map<String, String> validationHeaders = new HashMap<>(headers);
+        validationHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
 
         final int numNodes = nodeIds.size();
         final NodeRequestCompletionCallback completionCallback = new 
NodeRequestCompletionCallback() {
@@ -404,12 +420,12 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
                         // to all nodes and we are finished.
                         if (dissentingCount == 0) {
                             logger.debug("Received verification from all {} 
nodes that mutable request {} {} can be made", numNodes, method, uri.getPath());
-                            replicate(nodeIds, method, uri, entity, headers, 
false, clusterResponse);
+                            replicate(nodeIds, method, uri, entity, headers, 
false, clusterResponse, true);
                             return;
                         }
 
-                        final Map<String, String> cancelLockHeaders = new 
HashMap<>(updatedHeaders);
-                        cancelLockHeaders.put(LOCK_CANCELATION_HEADER, "true");
+                        final Map<String, String> cancelLockHeaders = new 
HashMap<>(headers);
+                        
cancelLockHeaders.put(REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER, "true");
                         final Thread cancelLockThread = new Thread(new 
Runnable() {
                             @Override
                             public void run() {
@@ -482,10 +498,10 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         };
 
         // Callback function for generating a NodeHttpRequestCallable that can 
be used to perform the work
-        final Function<NodeIdentifier, NodeHttpRequest> requestFactory = 
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, 
updatedHeaders, completionCallback);
+        final Function<NodeIdentifier, NodeHttpRequest> requestFactory = 
nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, nodeId), entity, 
validationHeaders, completionCallback);
 
         // replicate the 'verification request' to all nodes
-        replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), 
requestFactory, updatedHeaders);
+        replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), 
requestFactory, validationHeaders);
     }
 
 
@@ -500,9 +516,11 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
     }
 
     // Visible for testing - overriding this method makes it easy to verify 
behavior without actually making any web requests
-    protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
+    protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId, 
+               final Map<String, String> headers) {
         final ClientResponse clientResponse;
         final long startNanos = System.nanoTime();
+        logger.debug("Replicating request to {} {}, request ID = {}, headers = 
{}", method, uri, requestId, headers);
 
         switch (method.toUpperCase()) {
             case HttpMethod.DELETE:
@@ -703,7 +721,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
                 final String requestId = headers.get("x-nifi-request-id");
 
                 logger.debug("Replicating request {} {} to {}", method, 
uri.getPath(), nodeId);
-                nodeResponse = replicateRequest(resourceBuilder, nodeId, 
method, uri, requestId);
+                nodeResponse = replicateRequest(resourceBuilder, nodeId, 
method, uri, requestId, headers);
             } catch (final Exception e) {
                 nodeResponse = new NodeResponse(nodeId, method, uri, e);
                 logger.warn("Failed to replicate request {} {} to {} due to 
{}", method, uri.getPath(), nodeId, e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index e699fca..ce7f452 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -164,7 +164,8 @@ public class TestThreadPoolRequestReplicator {
         final ThreadPoolRequestReplicator replicator
                 = new ThreadPoolRequestReplicator(2, new Client(), 
coordinator, "1 sec", "1 sec", null, null, 
NiFiProperties.createBasicNiFiProperties(null, null)) {
             @Override
-            protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
+            protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, 
+                       final URI uri, final String requestId, Map<String, 
String> givenHeaders) {
                 // the resource builder will not expose its headers to us, so 
we are using Mockito's Whitebox class to extract them.
                 final OutBoundHeaders headers = (OutBoundHeaders) 
Whitebox.getInternalState(resourceBuilder, "metadata");
                 final Object expectsHeader = 
headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
@@ -285,7 +286,8 @@ public class TestThreadPoolRequestReplicator {
         final ThreadPoolRequestReplicator replicator
                 = new ThreadPoolRequestReplicator(2, new Client(), 
coordinator, "1 sec", "1 sec", null, null, 
NiFiProperties.createBasicNiFiProperties(null, null)) {
             @Override
-            protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
+            protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method,
+                       final URI uri, final String requestId, Map<String, 
String> givenHeaders) {
                 // the resource builder will not expose its headers to us, so 
we are using Mockito's Whitebox class to extract them.
                 final OutBoundHeaders headers = (OutBoundHeaders) 
Whitebox.getInternalState(resourceBuilder, "metadata");
                 final Object expectsHeader = 
headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
@@ -327,7 +329,8 @@ public class TestThreadPoolRequestReplicator {
         final ThreadPoolRequestReplicator replicator
                 = new ThreadPoolRequestReplicator(2, new Client(), 
coordinator, "1 sec", "1 sec", null, null, 
NiFiProperties.createBasicNiFiProperties(null, null)) {
             @Override
-            protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
+            protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method,
+                       final URI uri, final String requestId, Map<String, 
String> givenHeaders) {
                 if (delayMillis > 0L) {
                     try {
                         Thread.sleep(delayMillis);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
index 9bfcbc0..b547dc6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/AccessPolicyResource.java
@@ -170,7 +170,7 @@ public class AccessPolicyResource extends 
ApplicationResource {
      * Creates a new access policy.
      *
      * @param httpServletRequest request
-     * @param accessPolicyEntity An accessPolicyEntity.
+     * @param requestAccessPolicyEntity An accessPolicyEntity.
      * @return An accessPolicyEntity.
      */
     @POST
@@ -197,22 +197,22 @@ public class AccessPolicyResource extends 
ApplicationResource {
             @ApiParam(
                     value = "The access policy configuration details.",
                     required = true
-            ) final AccessPolicyEntity accessPolicyEntity) {
+            ) final AccessPolicyEntity requestAccessPolicyEntity) {
 
         // ensure we're running with a configurable authorizer
         if (!(authorizer instanceof AbstractPolicyBasedAuthorizer)) {
             throw new 
IllegalStateException(AccessPolicyDAO.MSG_NON_ABSTRACT_POLICY_BASED_AUTHORIZER);
         }
 
-        if (accessPolicyEntity == null || accessPolicyEntity.getComponent() == 
null) {
+        if (requestAccessPolicyEntity == null || 
requestAccessPolicyEntity.getComponent() == null) {
             throw new IllegalArgumentException("Access policy details must be 
specified.");
         }
 
-        if (accessPolicyEntity.getRevision() == null || 
(accessPolicyEntity.getRevision().getVersion() == null || 
accessPolicyEntity.getRevision().getVersion() != 0)) {
+        if (requestAccessPolicyEntity.getRevision() == null || 
(requestAccessPolicyEntity.getRevision().getVersion() == null || 
requestAccessPolicyEntity.getRevision().getVersion() != 0)) {
             throw new IllegalArgumentException("A revision of 0 must be 
specified when creating a new Policy.");
         }
 
-        final AccessPolicyDTO requestAccessPolicy = 
accessPolicyEntity.getComponent();
+        final AccessPolicyDTO requestAccessPolicy = 
requestAccessPolicyEntity.getComponent();
         if (requestAccessPolicy.getId() != null) {
             throw new IllegalArgumentException("Access policy ID cannot be 
specified.");
         }
@@ -225,35 +225,36 @@ public class AccessPolicyResource extends 
ApplicationResource {
         RequestAction.valueOfValue(requestAccessPolicy.getAction());
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.POST, accessPolicyEntity);
+            return replicate(HttpMethod.POST, requestAccessPolicyEntity);
         }
 
         // handle expects request (usually from the cluster manager)
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            // authorize access
-            serviceFacade.authorizeAccess(lookup -> {
-                final Authorizable accessPolicies = 
lookup.getAccessPolicyByResource(requestAccessPolicy.getResource());
-                accessPolicies.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
-            });
-        }
-        if (validationPhase) {
-            return generateContinueResponse().build();
-        }
+        return withWriteLock(
+                serviceFacade,
+                requestAccessPolicyEntity,
+                lookup -> {
+                    final Authorizable accessPolicies = 
lookup.getAccessPolicyByResource(requestAccessPolicy.getResource());
+                    accessPolicies.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
+                },
+                null,
+                accessPolicyEntity -> {
+                    final AccessPolicyDTO accessPolicy = 
accessPolicyEntity.getComponent();
 
-        // set the access policy id as appropriate
-        requestAccessPolicy.setId(generateUuid());
+                    // set the access policy id as appropriate
+                    accessPolicy.setId(generateUuid());
 
-        // get revision from the config
-        final RevisionDTO revisionDTO = accessPolicyEntity.getRevision();
-        Revision revision = new Revision(revisionDTO.getVersion(), 
revisionDTO.getClientId(), accessPolicyEntity.getComponent().getId());
+                    // get revision from the config
+                    final RevisionDTO revisionDTO = 
accessPolicyEntity.getRevision();
+                    Revision revision = new Revision(revisionDTO.getVersion(), 
revisionDTO.getClientId(), accessPolicyEntity.getComponent().getId());
 
-        // create the access policy and generate the json
-        final AccessPolicyEntity entity = 
serviceFacade.createAccessPolicy(revision, accessPolicyEntity.getComponent());
-        populateRemainingAccessPolicyEntityContent(entity);
+                    // create the access policy and generate the json
+                    final AccessPolicyEntity entity = 
serviceFacade.createAccessPolicy(revision, accessPolicyEntity.getComponent());
+                    populateRemainingAccessPolicyEntityContent(entity);
 
-        // build the response
-        return 
clusterContext(generateCreatedResponse(URI.create(entity.getUri()), 
entity)).build();
+                    // build the response
+                    return 
clusterContext(generateCreatedResponse(URI.create(entity.getUri()), 
entity)).build();
+                }
+        );
     }
 
     /**
@@ -316,7 +317,7 @@ public class AccessPolicyResource extends 
ApplicationResource {
      *
      * @param httpServletRequest request
      * @param id                 The id of the access policy to update.
-     * @param accessPolicyEntity An accessPolicyEntity.
+     * @param requestAccessPolicyEntity An accessPolicyEntity.
      * @return An accessPolicyEntity.
      */
     @PUT
@@ -349,43 +350,46 @@ public class AccessPolicyResource extends 
ApplicationResource {
             @ApiParam(
                     value = "The access policy configuration details.",
                     required = true
-            ) final AccessPolicyEntity accessPolicyEntity) {
+            ) final AccessPolicyEntity requestAccessPolicyEntity) {
 
         // ensure we're running with a configurable authorizer
         if (!(authorizer instanceof AbstractPolicyBasedAuthorizer)) {
             throw new 
IllegalStateException(AccessPolicyDAO.MSG_NON_ABSTRACT_POLICY_BASED_AUTHORIZER);
         }
 
-        if (accessPolicyEntity == null || accessPolicyEntity.getComponent() == 
null) {
+        if (requestAccessPolicyEntity == null || 
requestAccessPolicyEntity.getComponent() == null) {
             throw new IllegalArgumentException("Access policy details must be 
specified.");
         }
 
-        if (accessPolicyEntity.getRevision() == null) {
+        if (requestAccessPolicyEntity.getRevision() == null) {
             throw new IllegalArgumentException("Revision must be specified.");
         }
 
         // ensure the ids are the same
-        final AccessPolicyDTO accessPolicyDTO = 
accessPolicyEntity.getComponent();
-        if (!id.equals(accessPolicyDTO.getId())) {
+        final AccessPolicyDTO requestAccessPolicyDTO = 
requestAccessPolicyEntity.getComponent();
+        if (!id.equals(requestAccessPolicyDTO.getId())) {
             throw new IllegalArgumentException(String.format("The access 
policy id (%s) in the request body does not equal the "
-                    + "access policy id of the requested resource (%s).", 
accessPolicyDTO.getId(), id));
+                    + "access policy id of the requested resource (%s).", 
requestAccessPolicyDTO.getId(), id));
         }
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.PUT, accessPolicyEntity);
+            return replicate(HttpMethod.PUT, requestAccessPolicyEntity);
         }
 
         // Extract the revision
-        final Revision revision = getRevision(accessPolicyEntity, id);
+        final Revision requestRevision = 
getRevision(requestAccessPolicyEntity, id);
         return withWriteLock(
                 serviceFacade,
-                revision,
+                requestAccessPolicyEntity,
+                requestRevision,
                 lookup -> {
                     Authorizable authorizable = lookup.getAccessPolicyById(id);
                     authorizable.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
                 },
                 null,
-                () -> {
+                (revision, accessPolicyEntity) -> {
+                    final AccessPolicyDTO accessPolicyDTO = 
accessPolicyEntity.getComponent();
+
                     // update the access policy
                     final AccessPolicyEntity entity = 
serviceFacade.updateAccessPolicy(revision, accessPolicyDTO);
                     populateRemainingAccessPolicyEntityContent(entity);
@@ -454,20 +458,23 @@ public class AccessPolicyResource extends 
ApplicationResource {
             return replicate(HttpMethod.DELETE);
         }
 
+        final AccessPolicyEntity requestAccessPolicyEntity = new 
AccessPolicyEntity();
+        requestAccessPolicyEntity.setId(id);
+
         // handle expects request (usually from the cluster manager)
-        final Revision revision = new Revision(version == null ? null : 
version.getLong(), clientId.getClientId(), id);
+        final Revision requestRevision = new Revision(version == null ? null : 
version.getLong(), clientId.getClientId(), id);
         return withWriteLock(
                 serviceFacade,
-                revision,
+                requestAccessPolicyEntity,
+                requestRevision,
                 lookup -> {
                     final Authorizable accessPolicy = 
lookup.getAccessPolicyById(id);
                     accessPolicy.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
                 },
-                () -> {
-                },
-                () -> {
+                null,
+                (revision, accessPolicyEntity) -> {
                     // delete the specified access policy
-                    final AccessPolicyEntity entity = 
serviceFacade.deleteAccessPolicy(revision, id);
+                    final AccessPolicyEntity entity = 
serviceFacade.deleteAccessPolicy(revision, accessPolicyEntity.getId());
                     return clusterContext(generateOkResponse(entity)).build();
                 }
         );

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index b233a35..4f251dd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.web.api;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.sun.jersey.api.core.HttpContext;
 import com.sun.jersey.api.representation.Form;
 import com.sun.jersey.core.util.MultivaluedMapImpl;
@@ -49,7 +51,10 @@ import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.dto.RevisionDTO;
 import org.apache.nifi.web.api.dto.SnippetDTO;
 import org.apache.nifi.web.api.entity.ComponentEntity;
+import org.apache.nifi.web.api.entity.Entity;
 import org.apache.nifi.web.api.entity.TransactionResultEntity;
+import org.apache.nifi.web.security.ProxiedEntitiesUtils;
+import org.apache.nifi.web.security.util.CacheKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,8 +80,10 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
+import java.util.function.Function;
 
 import static javax.ws.rs.core.Response.Status.NOT_FOUND;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
@@ -114,6 +121,8 @@ public abstract class ApplicationResource {
     private RequestReplicator requestReplicator;
     private ClusterCoordinator clusterCoordinator;
 
+    private static final int MAX_CACHE_SOFT_LIMIT = 500;
+    private final Cache<CacheKey, Request<? extends Entity>> 
twoPhaseCommitCache = CacheBuilder.newBuilder().expireAfterWrite(1, 
TimeUnit.MINUTES).build();
 
     /**
      * Generate a resource uri based off of the specified parameters.
@@ -348,8 +357,8 @@ public abstract class ApplicationResource {
      * @return <code>true</code> if the request represents a two-phase commit 
style request
      */
     protected boolean isTwoPhaseRequest(final HttpServletRequest 
httpServletRequest) {
-        final String headerValue = 
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
-        return headerValue != null;
+        final String transactionId = 
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+        return transactionId != null && isConnectedToCluster();
     }
 
     /**
@@ -366,6 +375,14 @@ public abstract class ApplicationResource {
         return isTwoPhaseRequest(httpServletRequest) && 
httpServletRequest.getHeader(RequestReplicator.REQUEST_VALIDATION_HTTP_HEADER) 
!= null;
     }
 
+    protected boolean isExecutionPhase(final HttpServletRequest 
httpServletRequest) {
+        return isTwoPhaseRequest(httpServletRequest) && 
httpServletRequest.getHeader(RequestReplicator.REQUEST_EXECUTION_HTTP_HEADER) 
!= null;
+    }
+
+    protected boolean isCancellationPhase(final HttpServletRequest 
httpServletRequest) {
+        return isTwoPhaseRequest(httpServletRequest) && 
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_CANCELATION_HTTP_HEADER)
 != null;
+    }
+
     /**
      * Checks whether or not the request should be replicated to the cluster
      *
@@ -377,6 +394,7 @@ public abstract class ApplicationResource {
             return false;
         }
 
+        // If not connected to the cluster, we do not replicate
         if (!isConnectedToCluster()) {
             return false;
         }
@@ -468,12 +486,43 @@ public abstract class ApplicationResource {
      * @param action        executor
      * @return the response
      */
-    protected Response withWriteLock(final NiFiServiceFacade serviceFacade, 
final Revision revision, final AuthorizeAccess authorizer,
-                                     final Runnable verifier, final 
Supplier<Response> action) {
+    protected <T extends Entity> Response withWriteLock(final 
NiFiServiceFacade serviceFacade, final T entity, final Revision revision, final 
AuthorizeAccess authorizer,
+                                     final Runnable verifier, final 
BiFunction<Revision, T, Response> action) {
 
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
-        return withWriteLock(serviceFacade, authorizer, verifier, action,
-                () -> serviceFacade.verifyRevision(revision, user));
+
+        if (isTwoPhaseRequest(httpServletRequest)) {
+            if (isValidationPhase(httpServletRequest)) {
+                // authorize access
+                serviceFacade.authorizeAccess(authorizer);
+                serviceFacade.verifyRevision(revision, user);
+
+                // verify if necessary
+                if (verifier != null) {
+                    verifier.run();
+                }
+
+                // store the request
+                phaseOneStoreTransaction(entity, revision, null);
+
+                return generateContinueResponse().build();
+            } else if (isExecutionPhase(httpServletRequest)) {
+                // get the original request and run the action
+                final Request<T> phaseOneRequest = phaseTwoVerifyTransaction();
+                return action.apply(phaseOneRequest.getRevision(), 
phaseOneRequest.getRequest());
+            } else if (isCancellationPhase(httpServletRequest)) {
+                cancelTransaction();
+                return generateOkResponse().build();
+            } else {
+                throw new IllegalStateException("This request does not appear 
to be part of the two phase commit.");
+            }
+        } else {
+            // authorize access and run the action
+            serviceFacade.authorizeAccess(authorizer);
+            serviceFacade.verifyRevision(revision, user);
+
+            return action.apply(revision, entity);
+        }
     }
 
     /**
@@ -486,43 +535,197 @@ public abstract class ApplicationResource {
      * @param action        executor
      * @return the response
      */
-    protected Response withWriteLock(final NiFiServiceFacade serviceFacade, 
final Set<Revision> revisions, final AuthorizeAccess authorizer,
-                                     final Runnable verifier, final 
Supplier<Response> action) {
+    protected <T extends Entity> Response withWriteLock(final 
NiFiServiceFacade serviceFacade, final T entity, final Set<Revision> revisions, 
final AuthorizeAccess authorizer,
+                                     final Runnable verifier, final 
BiFunction<Set<Revision>, T, Response> action) {
+
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
-        return withWriteLock(serviceFacade, authorizer, verifier, action,
-                () -> serviceFacade.verifyRevisions(revisions, user));
-    }
 
+        if (isTwoPhaseRequest(httpServletRequest)) {
+            if (isValidationPhase(httpServletRequest)) {
+                // authorize access
+                serviceFacade.authorizeAccess(authorizer);
+                serviceFacade.verifyRevisions(revisions, user);
+
+                // verify if necessary
+                if (verifier != null) {
+                    verifier.run();
+                }
+
+                // store the request
+                phaseOneStoreTransaction(entity, null, revisions);
+
+                return generateContinueResponse().build();
+            } else if (isExecutionPhase(httpServletRequest)) {
+                // get the original request and run the action
+                final Request<T> phaseOneRequest = phaseTwoVerifyTransaction();
+                return action.apply(phaseOneRequest.getRevisions(), 
phaseOneRequest.getRequest());
+            } else if (isCancellationPhase(httpServletRequest)) {
+                cancelTransaction();
+                return generateOkResponse().build();
+            } else {
+                throw new IllegalStateException("This request does not appear 
to be part of the two phase commit.");
+            }
+        } else {
+            // authorize access and run the action
+            serviceFacade.authorizeAccess(authorizer);
+            serviceFacade.verifyRevisions(revisions, user);
+
+            return action.apply(revisions, entity);
+        }
+    }
 
     /**
-     * Executes an action through the service facade using the specified 
revision.
+     * Executes an action through the service facade.
      *
      * @param serviceFacade  service facade
      * @param authorizer     authorizer
      * @param verifier       verifier
      * @param action         the action to execute
-     * @param verifyRevision a callback that will claim the necessary 
revisions for the operation
      * @return the response
      */
-    private Response withWriteLock(
-            final NiFiServiceFacade serviceFacade, final AuthorizeAccess 
authorizer, final Runnable verifier, final Supplier<Response> action,
-            final Runnable verifyRevision) {
+    protected <T extends Entity> Response withWriteLock(final 
NiFiServiceFacade serviceFacade, final T entity, final AuthorizeAccess 
authorizer,
+                                                        final Runnable 
verifier, final Function<T, Response> action) {
 
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
+        if (isTwoPhaseRequest(httpServletRequest)) {
+            if (isValidationPhase(httpServletRequest)) {
+                // authorize access
+                serviceFacade.authorizeAccess(authorizer);
+
+                // verify if necessary
+                if (verifier != null) {
+                    verifier.run();
+                }
+
+                // store the request
+                phaseOneStoreTransaction(entity, null, null);
+
+                return generateContinueResponse().build();
+            } else if (isExecutionPhase(httpServletRequest)) {
+                // get the original request and run the action
+                final Request<T> phaseOneRequest = phaseTwoVerifyTransaction();
+                return action.apply(phaseOneRequest.getRequest());
+            } else if (isCancellationPhase(httpServletRequest)) {
+                cancelTransaction();
+                return generateOkResponse().build();
+            } else {
+                throw new IllegalStateException("This request does not appear 
to be part of the two phase commit.");
+            }
+        } else {
             // authorize access
             serviceFacade.authorizeAccess(authorizer);
-            verifyRevision.run();
+
+            // run the action
+            return action.apply(entity);
         }
+    }
 
-        if (validationPhase) {
-            if (verifier != null) {
-                verifier.run();
+    private <T extends Entity> void phaseOneStoreTransaction(final T 
requestEntity, final Revision revision, final Set<Revision> revisions) {
+        if (twoPhaseCommitCache.size() > MAX_CACHE_SOFT_LIMIT) {
+            throw new IllegalStateException("The maximum number of requests 
are in progress.");
+        }
+
+        // get the transaction id
+        final String transactionId = 
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+        if (StringUtils.isBlank(transactionId)) {
+            throw new IllegalArgumentException("Two phase commit Transaction 
Id missing.");
+        }
+
+        synchronized (twoPhaseCommitCache) {
+            final CacheKey key = new CacheKey(transactionId);
+            if (twoPhaseCommitCache.getIfPresent(key) != null) {
+                throw new IllegalStateException("Transaction " + transactionId 
+ " is already in progress.");
+            }
+
+            // store the entry for the second phase
+            final NiFiUser user = NiFiUserUtils.getNiFiUser();
+            final Request<T> request = new 
Request<>(ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user), 
getAbsolutePath().toString(), revision, revisions, requestEntity);
+            twoPhaseCommitCache.put(key, request);
+        }
+    }
+
+    private <T extends Entity> Request<T> phaseTwoVerifyTransaction() {
+        // get the transaction id
+        final String transactionId = 
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+        if (StringUtils.isBlank(transactionId)) {
+            throw new IllegalArgumentException("Two phase commit Transaction 
Id missing.");
+        }
+
+        // get the entry for the second phase
+        final Request<T> request;
+        synchronized (twoPhaseCommitCache) {
+            final CacheKey key = new CacheKey(transactionId);
+            request = (Request<T>) twoPhaseCommitCache.getIfPresent(key);
+            if (request == null) {
+                throw new IllegalArgumentException("The request from phase one 
is missing.");
             }
-            return generateContinueResponse().build();
+
+            twoPhaseCommitCache.invalidate(key);
         }
+        final String phaseOneChain = request.getUserChain();
 
-        return action.get();
+        // build the chain for the current request
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        final String phaseTwoChain = 
ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user);
+
+        if (phaseOneChain == null || !phaseOneChain.equals(phaseTwoChain)) {
+            throw new IllegalArgumentException("The same user must issue the 
request for phase one and two.");
+        }
+
+        final String phaseOneUri = request.getUri();
+        if (phaseOneUri == null || 
!phaseOneUri.equals(getAbsolutePath().toString())) {
+            throw new IllegalArgumentException("The URI must be the same for 
phase one and two.");
+        }
+
+        return request;
+    }
+
+    private void cancelTransaction() {
+        // get the transaction id
+        final String transactionId = 
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
+        if (StringUtils.isBlank(transactionId)) {
+            throw new IllegalArgumentException("Two phase commit Transaction 
Id missing.");
+        }
+
+        synchronized (twoPhaseCommitCache) {
+            final CacheKey key = new CacheKey(transactionId);
+            twoPhaseCommitCache.invalidate(key);
+        }
+    }
+
+    private final class Request<T extends Entity> {
+        final String userChain;
+        final String uri;
+        final Revision revision;
+        final Set<Revision> revisions;
+        final T request;
+
+        public Request(String userChain, String uri, Revision revision, 
Set<Revision> revisions, T request) {
+            this.userChain = userChain;
+            this.uri = uri;
+            this.revision = revision;
+            this.revisions = revisions;
+            this.request = request;
+        }
+
+        public String getUserChain() {
+            return userChain;
+        }
+
+        public String getUri() {
+            return uri;
+        }
+
+        public Revision getRevision() {
+            return revision;
+        }
+
+        public Set<Revision> getRevisions() {
+            return revisions;
+        }
+
+        public T getRequest() {
+            return request;
+        }
     }
 
     /**
@@ -713,7 +916,7 @@ public abstract class ApplicationResource {
         if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
             return requestReplicator.replicate(method, path, entity, 
headers).awaitMergedResponse();
         } else {
-            return 
requestReplicator.replicate(Collections.singleton(getClusterCoordinatorNode()), 
method, path, entity, headers, false, true).awaitMergedResponse();
+               return 
requestReplicator.forwardToCoordinator(getClusterCoordinatorNode(), method, 
path, entity, headers).awaitMergedResponse();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
index f7fdadf..11abf86 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java
@@ -148,7 +148,7 @@ public class ConnectionResource extends ApplicationResource 
{
      *
      * @param httpServletRequest request
      * @param id                 The id of the connection.
-     * @param connectionEntity   A connectionEntity.
+     * @param requestConnectionEntity   A connectionEntity.
      * @return A connectionEntity.
      * @throws InterruptedException if interrupted
      */
@@ -185,36 +185,37 @@ public class ConnectionResource extends 
ApplicationResource {
             @ApiParam(
                     value = "The connection configuration details.",
                     required = true
-            ) final ConnectionEntity connectionEntity) throws 
InterruptedException {
+            ) final ConnectionEntity requestConnectionEntity) throws 
InterruptedException {
 
-        if (connectionEntity == null || connectionEntity.getComponent() == 
null) {
+        if (requestConnectionEntity == null || 
requestConnectionEntity.getComponent() == null) {
             throw new IllegalArgumentException("Connection details must be 
specified.");
         }
 
-        if (connectionEntity.getRevision() == null) {
+        if (requestConnectionEntity.getRevision() == null) {
             throw new IllegalArgumentException("Revision must be specified.");
         }
 
         // ensure the ids are the same
-        final ConnectionDTO connection = connectionEntity.getComponent();
-        if (!id.equals(connection.getId())) {
+        final ConnectionDTO requestConnection = 
requestConnectionEntity.getComponent();
+        if (!id.equals(requestConnection.getId())) {
             throw new IllegalArgumentException(String.format("The connection 
id "
                     + "(%s) in the request body does not equal the connection 
id of the "
-                    + "requested resource (%s).", connection.getId(), id));
+                    + "requested resource (%s).", requestConnection.getId(), 
id));
         }
 
-        if (connection.getDestination() != null && 
connection.getDestination().getId() == null) {
+        if (requestConnection.getDestination() != null && 
requestConnection.getDestination().getId() == null) {
             throw new IllegalArgumentException("When specifying a destination 
component, the destination id is required.");
         }
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.PUT, connectionEntity);
+            return replicate(HttpMethod.PUT, requestConnectionEntity);
         }
 
-        final Revision revision = getRevision(connectionEntity, id);
+        final Revision requestRevision = getRevision(requestConnectionEntity, 
id);
         return withWriteLock(
                 serviceFacade,
-                revision,
+                requestConnectionEntity,
+                requestRevision,
                 lookup -> {
                     // verifies write access to this connection (this checks 
the current source and destination)
                     ConnectionAuthorizable connAuth = lookup.getConnection(id);
@@ -222,17 +223,19 @@ public class ConnectionResource extends 
ApplicationResource {
 
                     // if a destination has been specified and is different
                     final Connectable currentDestination = 
connAuth.getDestination();
-                    if (connection.getDestination() != null && 
currentDestination.getIdentifier().equals(connection.getDestination().getId())) 
{
+                    if (requestConnection.getDestination() != null && 
currentDestination.getIdentifier().equals(requestConnection.getDestination().getId()))
 {
                         // verify access of the new destination (current 
destination was already authorized as part of the connection check)
-                        final Authorizable newDestinationAuthorizable = 
lookup.getConnectable(connection.getDestination().getId());
+                        final Authorizable newDestinationAuthorizable = 
lookup.getConnectable(requestConnection.getDestination().getId());
                         newDestinationAuthorizable.authorize(authorizer, 
RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
 
                         // verify access of the parent group (this is the same 
check that is performed when creating the connection)
                         connAuth.getParentGroup().authorize(authorizer, 
RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
                     }
                 },
-                () -> serviceFacade.verifyUpdateConnection(connection),
-                () -> {
+                () -> serviceFacade.verifyUpdateConnection(requestConnection),
+                (revision, connectionEntity) -> {
+                    final ConnectionDTO connection = 
connectionEntity.getComponent();
+
                     final ConnectionEntity entity = 
serviceFacade.updateConnection(revision, connection);
                     populateRemainingConnectionEntityContent(entity);
 
@@ -296,21 +299,25 @@ public class ConnectionResource extends 
ApplicationResource {
 
         // determine the specified version
         final Long clientVersion = version == null ? null : version.getLong();
-        final Revision revision = new Revision(clientVersion, 
clientId.getClientId(), id);
+        final Revision requestRevision = new Revision(clientVersion, 
clientId.getClientId(), id);
+
+        final ConnectionEntity requestConnectionEntity = new 
ConnectionEntity();
+        requestConnectionEntity.setId(id);
 
         // get the current user
         return withWriteLock(
                 serviceFacade,
-                revision,
+                requestConnectionEntity,
+                requestRevision,
                 lookup -> {
                     // verifies write access to the source and destination
                     final Authorizable authorizable = 
lookup.getConnection(id).getAuthorizable();
                     authorizable.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
                 },
                 () -> serviceFacade.verifyDeleteConnection(id),
-                () -> {
+                (revision, connectionEntity) -> {
                     // delete the connection
-                    final ConnectionEntity entity = 
serviceFacade.deleteConnection(revision, id);
+                    final ConnectionEntity entity = 
serviceFacade.deleteConnection(revision, connectionEntity.getId());
 
                     // generate the response
                     return clusterContext(generateOkResponse(entity)).build();

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index f700ced..f5ff188 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -47,6 +47,7 @@ import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.apache.nifi.web.api.entity.ClusterEntity;
 import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.Entity;
 import org.apache.nifi.web.api.entity.HistoryEntity;
 import org.apache.nifi.web.api.entity.NodeEntity;
 import org.apache.nifi.web.api.entity.ReportingTaskEntity;
@@ -67,6 +68,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.net.URI;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -159,7 +161,7 @@ public class ControllerResource extends ApplicationResource 
{
      * Update the configuration for this NiFi.
      *
      * @param httpServletRequest request
-     * @param configEntity       A controllerConfigurationEntity.
+     * @param requestConfigEntity       A controllerConfigurationEntity.
      * @return A controllerConfigurationEntity.
      */
     @PUT
@@ -186,29 +188,30 @@ public class ControllerResource extends 
ApplicationResource {
             @ApiParam(
                     value = "The controller configuration.",
                     required = true
-            ) final ControllerConfigurationEntity configEntity) {
+            ) final ControllerConfigurationEntity requestConfigEntity) {
 
-        if (configEntity == null || configEntity.getComponent() == null) {
+        if (requestConfigEntity == null || requestConfigEntity.getComponent() 
== null) {
             throw new IllegalArgumentException("Controller configuration must 
be specified");
         }
 
-        if (configEntity.getRevision() == null) {
+        if (requestConfigEntity.getRevision() == null) {
             throw new IllegalArgumentException("Revision must be specified.");
         }
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.PUT, configEntity);
+            return replicate(HttpMethod.PUT, requestConfigEntity);
         }
 
-        final Revision revision = getRevision(configEntity.getRevision(), 
FlowController.class.getSimpleName());
+        final Revision requestRevision = 
getRevision(requestConfigEntity.getRevision(), 
FlowController.class.getSimpleName());
         return withWriteLock(
                 serviceFacade,
-                revision,
+                requestConfigEntity,
+                requestRevision,
                 lookup -> {
                     authorizeController(RequestAction.WRITE);
                 },
                 null,
-                () -> {
+                (revision, configEntity) -> {
                     final ControllerConfigurationEntity entity = 
serviceFacade.updateControllerConfiguration(revision, 
configEntity.getComponent());
                     return clusterContext(generateOkResponse(entity)).build();
                 }
@@ -223,7 +226,7 @@ public class ControllerResource extends ApplicationResource 
{
      * Creates a new Reporting Task.
      *
      * @param httpServletRequest  request
-     * @param reportingTaskEntity A reportingTaskEntity.
+     * @param requestReportingTaskEntity A reportingTaskEntity.
      * @return A reportingTaskEntity.
      */
     @POST
@@ -251,17 +254,17 @@ public class ControllerResource extends 
ApplicationResource {
             @ApiParam(
                     value = "The reporting task configuration details.",
                     required = true
-            ) final ReportingTaskEntity reportingTaskEntity) {
+            ) final ReportingTaskEntity requestReportingTaskEntity) {
 
-        if (reportingTaskEntity == null || reportingTaskEntity.getComponent() 
== null) {
+        if (requestReportingTaskEntity == null || 
requestReportingTaskEntity.getComponent() == null) {
             throw new IllegalArgumentException("Reporting task details must be 
specified.");
         }
 
-        if (reportingTaskEntity.getRevision() == null || 
(reportingTaskEntity.getRevision().getVersion() == null || 
reportingTaskEntity.getRevision().getVersion() != 0)) {
+        if (requestReportingTaskEntity.getRevision() == null || 
(requestReportingTaskEntity.getRevision().getVersion() == null || 
requestReportingTaskEntity.getRevision().getVersion() != 0)) {
             throw new IllegalArgumentException("A revision of 0 must be 
specified when creating a new Reporting task.");
         }
 
-        final ReportingTaskDTO requestReportingTask = 
reportingTaskEntity.getComponent();
+        final ReportingTaskDTO requestReportingTask = 
requestReportingTaskEntity.getComponent();
         if (requestReportingTask.getId() != null) {
             throw new IllegalArgumentException("Reporting task ID cannot be 
specified.");
         }
@@ -271,36 +274,36 @@ public class ControllerResource extends 
ApplicationResource {
         }
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.POST, reportingTaskEntity);
+            return replicate(HttpMethod.POST, requestReportingTaskEntity);
         }
 
-        // handle expects request (usually from the cluster manager)
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            // authorize access
-            serviceFacade.authorizeAccess(lookup -> {
-                authorizeController(RequestAction.WRITE);
+        return withWriteLock(
+                serviceFacade,
+                requestReportingTaskEntity,
+                lookup -> {
+                    authorizeController(RequestAction.WRITE);
 
-                if (requestReportingTask.getProperties() != null) {
-                    final ControllerServiceReferencingComponentAuthorizable 
authorizable = lookup.getReportingTaskByType(requestReportingTask.getType());
-                    
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestReportingTask.getProperties(),
 authorizable, authorizer, lookup);
-                }
-            });
-        }
-        if (validationPhase) {
-            return generateContinueResponse().build();
-        }
+                    if (requestReportingTask.getProperties() != null) {
+                        final 
ControllerServiceReferencingComponentAuthorizable authorizable = 
lookup.getReportingTaskByType(requestReportingTask.getType());
+                        
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestReportingTask.getProperties(),
 authorizable, authorizer, lookup);
+                    }
+                },
+                null,
+                (reportingTaskEntity) -> {
+                    final ReportingTaskDTO reportingTask = 
reportingTaskEntity.getComponent();
 
-        // set the processor id as appropriate
-        requestReportingTask.setId(generateUuid());
+                    // set the processor id as appropriate
+                    reportingTask.setId(generateUuid());
 
-        // create the reporting task and generate the json
-        final Revision revision = getRevision(reportingTaskEntity, 
requestReportingTask.getId());
-        final ReportingTaskEntity entity = 
serviceFacade.createReportingTask(revision, requestReportingTask);
-        
reportingTaskResource.populateRemainingReportingTaskEntityContent(entity);
+                    // create the reporting task and generate the json
+                    final Revision revision = getRevision(reportingTaskEntity, 
reportingTask.getId());
+                    final ReportingTaskEntity entity = 
serviceFacade.createReportingTask(revision, reportingTask);
+                    
reportingTaskResource.populateRemainingReportingTaskEntityContent(entity);
 
-        // build the response
-        return 
clusterContext(generateCreatedResponse(URI.create(entity.getUri()), 
entity)).build();
+                    // build the response
+                    return 
clusterContext(generateCreatedResponse(URI.create(entity.getUri()), 
entity)).build();
+                }
+        );
     }
 
     // -------------------
@@ -311,7 +314,7 @@ public class ControllerResource extends ApplicationResource 
{
      * Creates a new Controller Service.
      *
      * @param httpServletRequest      request
-     * @param controllerServiceEntity A controllerServiceEntity.
+     * @param requestControllerServiceEntity A controllerServiceEntity.
      * @return A controllerServiceEntity.
      */
     @POST
@@ -339,17 +342,17 @@ public class ControllerResource extends 
ApplicationResource {
             @ApiParam(
                     value = "The controller service configuration details.",
                     required = true
-            ) final ControllerServiceEntity controllerServiceEntity) {
+            ) final ControllerServiceEntity requestControllerServiceEntity) {
 
-        if (controllerServiceEntity == null || 
controllerServiceEntity.getComponent() == null) {
+        if (requestControllerServiceEntity == null || 
requestControllerServiceEntity.getComponent() == null) {
             throw new IllegalArgumentException("Controller service details 
must be specified.");
         }
 
-        if (controllerServiceEntity.getRevision() == null || 
(controllerServiceEntity.getRevision().getVersion() == null || 
controllerServiceEntity.getRevision().getVersion() != 0)) {
+        if (requestControllerServiceEntity.getRevision() == null || 
(requestControllerServiceEntity.getRevision().getVersion() == null || 
requestControllerServiceEntity.getRevision().getVersion() != 0)) {
             throw new IllegalArgumentException("A revision of 0 must be 
specified when creating a new Controller service.");
         }
 
-        final ControllerServiceDTO requestControllerService = 
controllerServiceEntity.getComponent();
+        final ControllerServiceDTO requestControllerService = 
requestControllerServiceEntity.getComponent();
         if (requestControllerService.getId() != null) {
             throw new IllegalArgumentException("Controller service ID cannot 
be specified.");
         }
@@ -363,36 +366,36 @@ public class ControllerResource extends 
ApplicationResource {
         }
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.POST, controllerServiceEntity);
+            return replicate(HttpMethod.POST, requestControllerServiceEntity);
         }
 
-        // handle expects request (usually from the cluster manager)
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            // authorize access
-            serviceFacade.authorizeAccess(lookup -> {
-                authorizeController(RequestAction.WRITE);
+        return withWriteLock(
+                serviceFacade,
+                requestControllerServiceEntity,
+                lookup -> {
+                    authorizeController(RequestAction.WRITE);
 
-                if (requestControllerService.getProperties() != null) {
-                    final ControllerServiceReferencingComponentAuthorizable 
authorizable = 
lookup.getControllerServiceByType(requestControllerService.getType());
-                    
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerService.getProperties(),
 authorizable, authorizer, lookup);
-                }
-            });
-        }
-        if (validationPhase) {
-            return generateContinueResponse().build();
-        }
+                    if (requestControllerService.getProperties() != null) {
+                        final 
ControllerServiceReferencingComponentAuthorizable authorizable = 
lookup.getControllerServiceByType(requestControllerService.getType());
+                        
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerService.getProperties(),
 authorizable, authorizer, lookup);
+                    }
+                },
+                null,
+                (controllerServiceEntity) -> {
+                    final ControllerServiceDTO controllerService = 
controllerServiceEntity.getComponent();
 
-        // set the processor id as appropriate
-        requestControllerService.setId(generateUuid());
+                    // set the processor id as appropriate
+                    controllerService.setId(generateUuid());
 
-        // create the controller service and generate the json
-        final Revision revision = getRevision(controllerServiceEntity, 
requestControllerService.getId());
-        final ControllerServiceEntity entity = 
serviceFacade.createControllerService(revision, null, requestControllerService);
-        
controllerServiceResource.populateRemainingControllerServiceEntityContent(entity);
+                    // create the controller service and generate the json
+                    final Revision revision = 
getRevision(controllerServiceEntity, controllerService.getId());
+                    final ControllerServiceEntity entity = 
serviceFacade.createControllerService(revision, null, controllerService);
+                    
controllerServiceResource.populateRemainingControllerServiceEntityContent(entity);
 
-        // build the response
-        return 
clusterContext(generateCreatedResponse(URI.create(entity.getUri()), 
entity)).build();
+                    // build the response
+                    return 
clusterContext(generateCreatedResponse(URI.create(entity.getUri()), 
entity)).build();
+                }
+        );
     }
 
     // -------
@@ -666,26 +669,33 @@ public class ControllerResource extends 
ApplicationResource {
 
         // Note: History requests are not replicated throughout the cluster 
and are instead handled by the nodes independently
 
-        // handle expects request (usually from the cluster manager)
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            // authorize access
-            serviceFacade.authorizeAccess(lookup -> {
-                authorizeController(RequestAction.WRITE);
-            });
-        }
-        if (validationPhase) {
-            return generateContinueResponse().build();
-        }
+        return withWriteLock(
+                serviceFacade,
+                new EndDateEntity(endDate.getDateTime()),
+                lookup -> {
+                    authorizeController(RequestAction.WRITE);
+                },
+                null,
+                (endDateEtity) -> {
+                    // purge the actions
+                    serviceFacade.deleteActions(endDateEtity.getEndDate());
 
-        // purge the actions
-        serviceFacade.deleteActions(endDate.getDateTime());
+                    // generate the response
+                    return generateOkResponse(new HistoryEntity()).build();
+                }
+        );
+    }
 
-        // create the response entity
-        final HistoryEntity entity = new HistoryEntity();
+    private class EndDateEntity extends Entity {
+        final Date endDate;
 
-        // generate the response
-        return generateOkResponse(entity).build();
+        public EndDateEntity(Date endDate) {
+            this.endDate = endDate;
+        }
+
+        public Date getEndDate() {
+            return endDate;
+        }
     }
 
     // setters

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2bfc4ef/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
index 0b9434a..9b3805a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
@@ -346,27 +346,28 @@ public class ControllerServiceResource extends 
ApplicationResource {
             return replicate(HttpMethod.POST);
         }
 
-        final boolean validationPhase = isValidationPhase(httpServletRequest);
-        if (validationPhase || !isTwoPhaseRequest(httpServletRequest)) {
-            // authorize access
-            serviceFacade.authorizeAccess(lookup -> {
-                final Authorizable controllerService = 
lookup.getControllerService(id).getAuthorizable();
-                controllerService.authorize(authorizer, RequestAction.WRITE, 
NiFiUserUtils.getNiFiUser());
-            });
-        }
-        if (validationPhase) {
-            serviceFacade.verifyCanClearControllerServiceState(id);
-            return generateContinueResponse().build();
-        }
+        final ControllerServiceEntity requestControllerServiceEntity = new 
ControllerServiceEntity();
+        requestControllerServiceEntity.setId(id);
 
-        // get the component state
-        serviceFacade.clearControllerServiceState(id);
+        return withWriteLock(
+                serviceFacade,
+                requestControllerServiceEntity,
+                lookup -> {
+                    final Authorizable controllerService = 
lookup.getControllerService(id).getAuthorizable();
+                    controllerService.authorize(authorizer, 
RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
+                },
+                () -> serviceFacade.verifyCanClearControllerServiceState(id),
+                (controllerServiceEntity) -> {
+                    // get the component state
+                    
serviceFacade.clearControllerServiceState(controllerServiceEntity.getId());
 
-        // generate the response entity
-        final ComponentStateEntity entity = new ComponentStateEntity();
+                    // generate the response entity
+                    final ComponentStateEntity entity = new 
ComponentStateEntity();
 
-        // generate the response
-        return clusterContext(generateOkResponse(entity)).build();
+                    // generate the response
+                    return clusterContext(generateOkResponse(entity)).build();
+                }
+        );
     }
 
     /**
@@ -422,7 +423,7 @@ public class ControllerServiceResource extends 
ApplicationResource {
      * Updates the references of the specified controller service.
      *
      * @param httpServletRequest     request
-     * @param updateReferenceRequest The update request
+     * @param requestUpdateReferenceRequest The update request
      * @return A controllerServiceReferencingComponentsEntity.
      */
     @PUT
@@ -455,13 +456,13 @@ public class ControllerServiceResource extends 
ApplicationResource {
             @ApiParam(
                     value = "The controller service request update request.",
                     required = true
-            ) final UpdateControllerServiceReferenceRequestEntity 
updateReferenceRequest) {
+            ) final UpdateControllerServiceReferenceRequestEntity 
requestUpdateReferenceRequest) {
 
-        if (updateReferenceRequest.getId() == null) {
+        if (requestUpdateReferenceRequest.getId() == null) {
             throw new IllegalArgumentException("The controller service 
identifier must be specified.");
         }
 
-        if (updateReferenceRequest.getReferencingComponentRevisions() == null) 
{
+        if (requestUpdateReferenceRequest.getReferencingComponentRevisions() 
== null) {
             throw new IllegalArgumentException("The controller service 
referencing components revisions must be specified.");
         }
 
@@ -471,14 +472,14 @@ public class ControllerServiceResource extends 
ApplicationResource {
         // but not referencing schedulable components
         ControllerServiceState requestControllerServiceState = null;
         try {
-            requestControllerServiceState = 
ControllerServiceState.valueOf(updateReferenceRequest.getState());
+            requestControllerServiceState = 
ControllerServiceState.valueOf(requestUpdateReferenceRequest.getState());
         } catch (final IllegalArgumentException iae) {
             // ignore
         }
 
         ScheduledState requestScheduledState = null;
         try {
-            requestScheduledState = 
ScheduledState.valueOf(updateReferenceRequest.getState());
+            requestScheduledState = 
ScheduledState.valueOf(requestUpdateReferenceRequest.getState());
         } catch (final IllegalArgumentException iae) {
             // ignore
         }
@@ -498,30 +499,51 @@ public class ControllerServiceResource extends 
ApplicationResource {
         }
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.PUT, updateReferenceRequest);
+            return replicate(HttpMethod.PUT, requestUpdateReferenceRequest);
         }
 
         // convert the referencing revisions
-        final Map<String, Revision> referencingRevisions = 
updateReferenceRequest.getReferencingComponentRevisions().entrySet().stream()
+        final Map<String, Revision> requestReferencingRevisions = 
requestUpdateReferenceRequest.getReferencingComponentRevisions().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> {
                     final RevisionDTO rev = e.getValue();
                     return new Revision(rev.getVersion(), rev.getClientId(), 
e.getKey());
                 }));
-        final Set<Revision> revisions = new 
HashSet<>(referencingRevisions.values());
+        final Set<Revision> requestRevisions = new 
HashSet<>(requestReferencingRevisions.values());
 
-        final ScheduledState scheduledState = requestScheduledState;
-        final ControllerServiceState controllerServiceState = 
requestControllerServiceState;
+        final ScheduledState verifyScheduledState = requestScheduledState;
+        final ControllerServiceState verifyControllerServiceState = 
requestControllerServiceState;
         return withWriteLock(
                 serviceFacade,
-                revisions,
+                requestUpdateReferenceRequest,
+                requestRevisions,
                 lookup -> {
-                    referencingRevisions.entrySet().stream().forEach(e -> {
+                    requestReferencingRevisions.entrySet().stream().forEach(e 
-> {
                         final Authorizable controllerService = 
lookup.getControllerServiceReferencingComponent(id, e.getKey());
                         controllerService.authorize(authorizer, 
RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
                     });
                 },
-                () -> 
serviceFacade.verifyUpdateControllerServiceReferencingComponents(updateReferenceRequest.getId(),
 scheduledState, controllerServiceState),
-                () -> {
+                () -> 
serviceFacade.verifyUpdateControllerServiceReferencingComponents(requestUpdateReferenceRequest.getId(),
 verifyScheduledState, verifyControllerServiceState),
+                (revisions, updateReferenceRequest) -> {
+                    ScheduledState scheduledState = null;
+                    try {
+                        scheduledState = 
ScheduledState.valueOf(updateReferenceRequest.getState());
+                    } catch (final IllegalArgumentException e) {
+                        // ignore
+                    }
+
+                    ControllerServiceState controllerServiceState = null;
+                    try {
+                        controllerServiceState = 
ControllerServiceState.valueOf(updateReferenceRequest.getState());
+                    } catch (final IllegalArgumentException iae) {
+                        // ignore
+                    }
+
+                    final Map<String, Revision> referencingRevisions = 
updateReferenceRequest.getReferencingComponentRevisions().entrySet().stream()
+                            .collect(Collectors.toMap(Map.Entry::getKey, e -> {
+                                final RevisionDTO rev = e.getValue();
+                                return new Revision(rev.getVersion(), 
rev.getClientId(), e.getKey());
+                            }));
+
                     // update the controller service references
                     final ControllerServiceReferencingComponentsEntity entity 
= serviceFacade.updateControllerServiceReferencingComponents(
                             referencingRevisions, 
updateReferenceRequest.getId(), scheduledState, controllerServiceState);
@@ -536,7 +558,7 @@ public class ControllerServiceResource extends 
ApplicationResource {
      *
      * @param httpServletRequest      request
      * @param id                      The id of the controller service to 
update.
-     * @param controllerServiceEntity A controllerServiceEntity.
+     * @param requestControllerServiceEntity A controllerServiceEntity.
      * @return A controllerServiceEntity.
      */
     @PUT
@@ -570,32 +592,33 @@ public class ControllerServiceResource extends 
ApplicationResource {
             @ApiParam(
                     value = "The controller service configuration details.",
                     required = true
-            ) final ControllerServiceEntity controllerServiceEntity) {
+            ) final ControllerServiceEntity requestControllerServiceEntity) {
 
-        if (controllerServiceEntity == null || 
controllerServiceEntity.getComponent() == null) {
+        if (requestControllerServiceEntity == null || 
requestControllerServiceEntity.getComponent() == null) {
             throw new IllegalArgumentException("Controller service details 
must be specified.");
         }
 
-        if (controllerServiceEntity.getRevision() == null) {
+        if (requestControllerServiceEntity.getRevision() == null) {
             throw new IllegalArgumentException("Revision must be specified.");
         }
 
         // ensure the ids are the same
-        final ControllerServiceDTO requestControllerServiceDTO = 
controllerServiceEntity.getComponent();
+        final ControllerServiceDTO requestControllerServiceDTO = 
requestControllerServiceEntity.getComponent();
         if (!id.equals(requestControllerServiceDTO.getId())) {
             throw new IllegalArgumentException(String.format("The controller 
service id (%s) in the request body does not equal the "
                     + "controller service id of the requested resource (%s).", 
requestControllerServiceDTO.getId(), id));
         }
 
         if (isReplicateRequest()) {
-            return replicate(HttpMethod.PUT, controllerServiceEntity);
+            return replicate(HttpMethod.PUT, requestControllerServiceEntity);
         }
 
         // handle expects request (usually from the cluster manager)
-        final Revision revision = getRevision(controllerServiceEntity, id);
+        final Revision requestRevision = 
getRevision(requestControllerServiceEntity, id);
         return withWriteLock(
                 serviceFacade,
-                revision,
+                requestControllerServiceEntity,
+                requestRevision,
                 lookup -> {
                     // authorize the service
                     final ControllerServiceReferencingComponentAuthorizable 
authorizable = lookup.getControllerService(id);
@@ -605,9 +628,11 @@ public class ControllerServiceResource extends 
ApplicationResource {
                     
AuthorizeControllerServiceReference.authorizeControllerServiceReferences(requestControllerServiceDTO.getProperties(),
 authorizable, authorizer, lookup);
                 },
                 () -> 
serviceFacade.verifyUpdateControllerService(requestControllerServiceDTO),
-                () -> {
+                (revision, controllerServiceEntity) -> {
+                    final ControllerServiceDTO controllerService = 
controllerServiceEntity.getComponent();
+
                     // update the controller service
-                    final ControllerServiceEntity entity = 
serviceFacade.updateControllerService(revision, requestControllerServiceDTO);
+                    final ControllerServiceEntity entity = 
serviceFacade.updateControllerService(revision, controllerService);
                     populateRemainingControllerServiceEntityContent(entity);
 
                     return clusterContext(generateOkResponse(entity)).build();
@@ -669,19 +694,23 @@ public class ControllerServiceResource extends 
ApplicationResource {
             return replicate(HttpMethod.DELETE);
         }
 
+        final ControllerServiceEntity requestControllerServiceEntity = new 
ControllerServiceEntity();
+        requestControllerServiceEntity.setId(id);
+
         // handle expects request (usually from the cluster manager)
-        final Revision revision = new Revision(version == null ? null : 
version.getLong(), clientId.getClientId(), id);
+        final Revision requestRevision = new Revision(version == null ? null : 
version.getLong(), clientId.getClientId(), id);
         return withWriteLock(
                 serviceFacade,
-                revision,
+                requestControllerServiceEntity,
+                requestRevision,
                 lookup -> {
                     final Authorizable controllerService = 
lookup.getControllerService(id).getAuthorizable();
                     controllerService.authorize(authorizer, 
RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
                 },
                 () -> serviceFacade.verifyDeleteControllerService(id),
-                () -> {
+                (revision, controllerServiceEntity) -> {
                     // delete the specified controller service
-                    final ControllerServiceEntity entity = 
serviceFacade.deleteControllerService(revision, id);
+                    final ControllerServiceEntity entity = 
serviceFacade.deleteControllerService(revision, 
controllerServiceEntity.getId());
                     return clusterContext(generateOkResponse(entity)).build();
                 }
         );

Reply via email to