NIFI-1745: Refactor how revisions are handled at NCM/Distributed to Node. This 
closes #454


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4b74e4de
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4b74e4de
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4b74e4de

Branch: refs/heads/master
Commit: 4b74e4de7403b1798b604f21450a8687e45d7978
Parents: afc8c64
Author: Mark Payne <[email protected]>
Authored: Tue May 17 11:51:09 2016 -0400
Committer: Matt Gilman <[email protected]>
Committed: Fri May 20 14:04:24 2016 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   2 +
 .../org/apache/nifi/util/NiFiProperties.java    |   4 +
 .../nifi-framework-cluster-web/.gitignore       |   1 -
 .../nifi-framework-cluster-web/pom.xml          |  44 -----
 .../nifi/cluster/context/ClusterContext.java    |  65 -------
 .../cluster/context/ClusterContextImpl.java     |  69 -------
 .../context/ClusterContextThreadLocal.java      |  42 -----
 .../nifi-framework-cluster/pom.xml              |   4 -
 .../http/replication/RequestReplicator.java     |  14 +-
 .../ThreadPoolRequestReplicator.java            |  68 +++----
 .../manager/impl/HttpRequestReplicatorImpl.java |   9 +-
 .../cluster/manager/impl/WebClusterManager.java | 155 ++--------------
 .../spring/WebClusterManagerFactoryBean.java    |  10 +-
 .../resources/nifi-cluster-manager-context.xml  |   4 -
 .../TestThreadPoolRequestReplicator.java        |  14 +-
 .../src/main/resources/conf/nifi.properties     |   3 +
 .../nifi-web/nifi-web-api/pom.xml               |   5 -
 .../java/org/apache/nifi/audit/NiFiAuditor.java |  12 +-
 .../org/apache/nifi/web/NiFiServiceFacade.java  |  13 +-
 .../nifi/web/StandardNiFiServiceFacade.java     | 183 +++++++-----------
 .../nifi/web/api/ApplicationResource.java       | 157 +++++-----------
 .../apache/nifi/web/api/ControllerResource.java |  74 ++++----
 .../nifi/web/api/FlowFileQueueResource.java     |  89 ++++-----
 .../org/apache/nifi/web/api/FlowResource.java   |  59 +++---
 .../nifi/web/api/ProcessGroupResource.java      | 186 +++++++------------
 .../apache/nifi/web/api/ProvenanceResource.java |  84 ++++-----
 .../org/apache/nifi/web/dao/SnippetDAO.java     |   3 +-
 .../org/apache/nifi/web/dao/TemplateDAO.java    |   3 +-
 .../nifi/web/dao/impl/StandardSnippetDAO.java   |   4 +-
 .../nifi/web/dao/impl/StandardTemplateDAO.java  |   4 +-
 .../nifi/web/filter/NodeRequestFilter.java      | 114 ------------
 .../nifi/web/filter/ThreadLocalFilter.java      |  53 ------
 .../org/apache/nifi/web/filter/TimerFilter.java |   9 +-
 .../org/apache/nifi/web/util/SnippetUtils.java  |  39 ++--
 .../src/main/resources/nifi-web-api-context.xml |   4 +-
 .../src/main/webapp/WEB-INF/web.xml             |  16 --
 .../nifi-web-optimistic-locking/pom.xml         |   4 -
 .../web/StandardOptimisticLockingManager.java   |  17 +-
 .../nifi/web/revision/NaiveRevisionManager.java | 169 ++++++++++++-----
 .../nifi/web/revision/RevisionManager.java      |  33 +++-
 .../web/revision/TestNaiveRevisionManager.java  | 147 +++++++++++----
 .../nifi-framework/pom.xml                      |   1 -
 nifi-nar-bundles/nifi-framework-bundle/pom.xml  |   5 -
 43 files changed, 685 insertions(+), 1310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index d07dfb1..00d30e9 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -467,6 +467,8 @@ language governing permissions and limitations under the 
License. -->
         
<nifi.cluster.manager.protocol.threads>10</nifi.cluster.manager.protocol.threads>
         <nifi.cluster.manager.safemode.duration>0 
sec</nifi.cluster.manager.safemode.duration>
 
+        <nifi.cluster.request.replication.claim.timeout>1 
min</nifi.cluster.request.replication.claim.timeout>
+
         <!--  nifi.properties: zookeeper properties -->
         <nifi.zookeeper.connect.string></nifi.zookeeper.connect.string>
         <nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 63693bf..1b560fc 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -186,6 +186,8 @@ public class NiFiProperties extends Properties {
     public static final String CLUSTER_MANAGER_PROTOCOL_THREADS = 
"nifi.cluster.manager.protocol.threads";
     public static final String CLUSTER_MANAGER_SAFEMODE_DURATION = 
"nifi.cluster.manager.safemode.duration";
 
+    public static final String REQUEST_REPLICATION_CLAIM_TIMEOUT = 
"nifi.cluster.request.replication.claim.timeout";
+
     // kerberos properties
     public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file";
     public static final String KERBEROS_SERVICE_PRINCIPAL = 
"nifi.kerberos.service.principal";
@@ -249,6 +251,8 @@ public class NiFiProperties extends Properties {
     public static final int DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS = 10;
     public static final String DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION = "0 
sec";
 
+    public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "1 
min";
+
     // state management defaults
     public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = 
"conf/state-management.xml";
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/.gitignore
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/.gitignore
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/.gitignore
deleted file mode 100755
index ea8c4bf..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
deleted file mode 100644
index e559816..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
+++ /dev/null
@@ -1,44 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-framework</artifactId>
-        <version>1.0.0-SNAPSHOT</version>
-    </parent>
-    <artifactId>nifi-framework-cluster-web</artifactId>
-    <packaging>jar</packaging>
-    <description>The clustering software for communicating with the NiFi web 
api.</description>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-properties</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-administration</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-user-actions</artifactId>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
deleted file mode 100644
index 8c3e41b..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.context;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.nifi.action.Action;
-import org.apache.nifi.web.Revision;
-
-/**
- * Contains contextual information about clustering that may be serialized
- * between manager and node when communicating over HTTP.
- */
-public interface ClusterContext extends Serializable {
-
-    /**
-     * Returns a list of auditable actions. The list is modifiable and will
-     * never be null.
-     *
-     * @return a collection of actions
-     */
-    List<Action> getActions();
-
-    Revision getRevision();
-
-    void setRevision(Revision revision);
-
-    /**
-     * @return true if the request was sent by the cluster manager; false
-     * otherwise
-     */
-    boolean isRequestSentByClusterManager();
-
-    /**
-     * Sets the flag to indicate if a request was sent by the cluster manager.
-     *
-     * @param flag true if the request was sent by the cluster manager; false
-     * otherwise
-     */
-    void setRequestSentByClusterManager(boolean flag);
-
-    /**
-     * Gets an id generation seed. This is used to ensure that nodes are able 
to
-     * generate the same id across the cluster. This is usually handled by the
-     * cluster manager creating the id, however for some actions (snippets,
-     * templates, etc) this is not possible.
-     *
-     * @return generated id seed
-     */
-    String getIdGenerationSeed();
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
deleted file mode 100644
index 43e7c2d..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.context;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import org.apache.nifi.action.Action;
-import org.apache.nifi.web.Revision;
-
-/**
- * A basic implementation of the context.
- */
-public class ClusterContextImpl implements ClusterContext, Serializable {
-
-    private final List<Action> actions = new ArrayList<>();
-
-    private Revision revision;
-
-    private boolean requestSentByClusterManager;
-
-    private final String idGenerationSeed = UUID.randomUUID().toString();
-
-    @Override
-    public List<Action> getActions() {
-        return actions;
-    }
-
-    @Override
-    public Revision getRevision() {
-        return revision;
-    }
-
-    @Override
-    public void setRevision(Revision revision) {
-        this.revision = revision;
-    }
-
-    @Override
-    public boolean isRequestSentByClusterManager() {
-        return requestSentByClusterManager;
-    }
-
-    @Override
-    public void setRequestSentByClusterManager(boolean 
requestSentByClusterManager) {
-        this.requestSentByClusterManager = requestSentByClusterManager;
-    }
-
-    @Override
-    public String getIdGenerationSeed() {
-        return this.idGenerationSeed;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
deleted file mode 100644
index 79900fb..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.context;
-
-/**
- * Manages a cluster context on a threadlocal.
- */
-public class ClusterContextThreadLocal {
-
-    private static final ThreadLocal<ClusterContext> contextHolder = new 
ThreadLocal<>();
-
-    public static void removeContext() {
-        contextHolder.remove();
-    }
-
-    public static ClusterContext createEmptyContext() {
-        return new ClusterContextImpl();
-    }
-
-    public static ClusterContext getContext() {
-        return contextHolder.get();
-    }
-
-    public static void setContext(final ClusterContext context) {
-        contextHolder.set(context);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index e5a1a7d..8a06467 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@ -67,10 +67,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-framework-cluster-web</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-web-utils</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
index 6c9f18c..b64c766 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
@@ -24,7 +24,19 @@ import java.util.Set;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
 public interface RequestReplicator {
-    public static final String REQUEST_TRANSACTION_ID = 
"X-RequestTransactionId";
+
+    public static final String REQUEST_TRANSACTION_ID_HEADER = 
"X-RequestTransactionId";
+    public static final String CLUSTER_ID_GENERATION_SEED_HEADER = 
"X-Cluster-Id-Generation-Seed";
+
+    /**
+     * The HTTP header that the requestor specifies to ask a node if they are 
able to process a given request. The value
+     * is always 150-NodeContinue. The node will respond with 150 CONTINUE if 
it is able to
+     * process the request, 417 EXPECTATION_FAILED otherwise.
+     */
+    public static final String REQUEST_VALIDATION_HTTP_HEADER = 
"X-Validation-Expects";
+    public static final String NODE_CONTINUE = "150-NodeContinue";
+    public static final int NODE_CONTINUE_STATUS_CODE = 150;
+
 
     /**
      * Starts the instance for replicating requests. Calling this method on an 
already started instance has no effect.

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index d218af2..dd4d2ce 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -43,8 +43,6 @@ import javax.ws.rs.HttpMethod;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 
-import org.apache.nifi.cluster.context.ClusterContext;
-import org.apache.nifi.cluster.context.ClusterContextImpl;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
 import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
@@ -62,8 +60,6 @@ import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.logging.NiFiLog;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
-import org.apache.nifi.web.OptimisticLockingManager;
-import org.apache.nifi.web.util.WebUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,21 +71,6 @@ import 
com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
 import com.sun.jersey.core.util.MultivaluedMapImpl;
 
 public class ThreadPoolRequestReplicator implements RequestReplicator {
-    /**
-     * The HTTP header to store a cluster context. An example of what may be 
stored in the context is a node's
-     * auditable actions in response to a cluster request. The cluster context 
is serialized
-     * using Java's serialization mechanism and hex encoded.
-     */
-    static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext";
-
-    /**
-     * The HTTP header that the NCM specifies to ask a node if they are able 
to process a given request. The value
-     * is always 150-NodeContinue. The node will respond with 150 CONTINUE if 
it is able to
-     * process the request, 417 EXPECTATION_FAILED otherwise.
-     */
-    static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects";
-    static final String NODE_CONTINUE = "150-NodeContinue";
-    static final int NODE_CONTINUE_STATUS_CODE = 150;
 
     private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(ThreadPoolRequestReplicator.class));
     private static final int MAX_CONCURRENT_REQUESTS = 100;
@@ -102,7 +83,6 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
     private final EventReporter eventReporter;
     private final RequestCompletionCallback callback;
     private final ClusterCoordinator clusterCoordinator;
-    private final OptimisticLockingManager lockingManager;
     private final DataFlowManagementService dfmService;
 
     private ExecutorService executorService;
@@ -121,8 +101,8 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
      * @param eventReporter an EventReporter that can be used to notify users 
of interesting events. May be null.
      */
     public ThreadPoolRequestReplicator(final int numThreads, final Client 
client, final ClusterCoordinator clusterCoordinator,
-        final RequestCompletionCallback callback, final EventReporter 
eventReporter, final OptimisticLockingManager lockingManager, final 
DataFlowManagementService dfmService) {
-        this(numThreads, client, clusterCoordinator, "3 sec", "3 sec", 
callback, eventReporter, null, lockingManager, dfmService);
+        final RequestCompletionCallback callback, final EventReporter 
eventReporter, final DataFlowManagementService dfmService) {
+        this(numThreads, client, clusterCoordinator, "3 sec", "3 sec", 
callback, eventReporter, null, dfmService);
     }
 
     /**
@@ -138,7 +118,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
      */
     public ThreadPoolRequestReplicator(final int numThreads, final Client 
client, final ClusterCoordinator clusterCoordinator,
         final String connectionTimeout, final String readTimeout, final 
RequestCompletionCallback callback, final EventReporter eventReporter,
-        final WebClusterManager clusterManager, final OptimisticLockingManager 
lockingManager, final DataFlowManagementService dfmService) {
+        final WebClusterManager clusterManager, final 
DataFlowManagementService dfmService) {
         if (numThreads <= 0) {
             throw new IllegalArgumentException("The number of threads must be 
greater than zero.");
         } else if (client == null) {
@@ -153,7 +133,6 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         this.responseMerger = new StandardHttpResponseMerger(clusterManager);
         this.eventReporter = eventReporter;
         this.callback = callback;
-        this.lockingManager = lockingManager;
         this.dfmService = dfmService;
 
         client.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, 
connectionTimeoutMs);
@@ -198,7 +177,9 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
 
     @Override
     public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String 
method, URI uri, Object entity, Map<String, String> headers) {
-        return replicate(nodeIds, method, uri, entity, headers, true, null);
+        final Map<String, String> headersPlusIdGenerationSeed = new 
HashMap<>(headers);
+        
headersPlusIdGenerationSeed.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER,
 UUID.randomUUID().toString());
+        return replicate(nodeIds, method, uri, entity, 
headersPlusIdGenerationSeed, true, null);
     }
 
     /**
@@ -233,7 +214,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         // Update headers to indicate the current revision so that we can
         // prevent multiple users changing the flow at the same time
         final Map<String, String> updatedHeaders = new HashMap<>(headers);
-        final String requestId = 
updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID, key -> 
UUID.randomUUID().toString());
+        final String requestId = 
updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> 
UUID.randomUUID().toString());
 
         if (performVerification) {
             verifyState(method, uri.getPath());
@@ -284,29 +265,16 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             nodeId -> new NodeHttpRequest(nodeId, method, createURI(uri, 
nodeId), entity, updatedHeaders, nodeCompletionCallback);
         replicateRequest(nodeIds, uri.getScheme(), uri.getPath(), 
requestFactory, updatedHeaders);
 
-
-        // TODO: Must handle revisions!!
-
         return response;
     }
 
 
-    private void setRevision(final Map<String, String> headers) {
-        final ClusterContext clusterCtx = new ClusterContextImpl();
-        clusterCtx.setRequestSentByClusterManager(true); // indicate request 
is sent from cluster manager
-        
clusterCtx.setRevision(lockingManager.getLastModification().getRevision());
-
-        // serialize cluster context and add to request header
-        final String serializedClusterCtx = 
WebUtils.serializeObjectToHex(clusterCtx);
-        headers.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
-    }
-
 
     private void performVerification(Set<NodeIdentifier> nodeIds, String 
method, URI uri, Object entity, Map<String, String> headers, 
StandardAsyncClusterResponse clusterResponse) {
         logger.debug("Verifying that mutable request {} {} can be made", 
method, uri.getPath());
 
         final Map<String, String> updatedHeaders = new HashMap<>(headers);
-        updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, NODE_CONTINUE);
+        updatedHeaders.put(REQUEST_VALIDATION_HTTP_HEADER, NODE_CONTINUE);
 
         final int numNodes = nodeIds.size();
         final NodeRequestCompletionCallback completionCallback = new 
NodeRequestCompletionCallback() {
@@ -474,6 +442,12 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         }
     }
 
+    /**
+     * Removes the AsyncClusterResponse with the given ID from the map and 
handles any cleanup
+     * or post-processing related to the request after the client has consumed 
the response
+     *
+     * @param requestId the ID of the request that has been consumed by the 
client
+     */
     private void onResponseConsumed(final String requestId) {
         final AsyncClusterResponse response = responseMap.remove(requestId);
 
@@ -482,6 +456,13 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         }
     }
 
+    /**
+     * When all nodes have completed a request and provided a response (or 
have timed out), this method will be invoked
+     * to handle calling the Callback that was provided for the request, if 
any, and handle any cleanup or post-processing
+     * related to the request
+     *
+     * @param requestId the ID of the request that has completed
+     */
     private void onCompletedResponse(final String requestId) {
         final AsyncClusterResponse response = responseMap.get(requestId);
 
@@ -543,16 +524,15 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
     }
 
 
-    private void replicateRequest(final Set<NodeIdentifier> nodeIds, final 
String scheme,
-        final String path, final Function<NodeIdentifier, NodeHttpRequest> 
callableFactory, final Map<String, String> headers) {
+
+    private void replicateRequest(final Set<NodeIdentifier> nodeIds, final 
String scheme, final String path,
+        final Function<NodeIdentifier, NodeHttpRequest> callableFactory, final 
Map<String, String> headers) {
 
         if (nodeIds.isEmpty()) {
             return; // return quickly for trivial case
         }
 
         // submit the requests to the nodes
-        final String requestId = UUID.randomUUID().toString();
-        headers.put(WebClusterManager.REQUEST_ID_HEADER, requestId);
         for (final NodeIdentifier nodeId : nodeIds) {
             final NodeHttpRequest callable = callableFactory.apply(nodeId);
             executorService.submit(callable);

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
index e5f171d..8c645a9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -268,8 +267,6 @@ public class HttpRequestReplicatorImpl implements 
HttpRequestReplicator {
         }
 
         // submit the requests to the nodes
-        final String requestId = UUID.randomUUID().toString();
-        headers.put(WebClusterManager.REQUEST_ID_HEADER, requestId);
         for (final Map.Entry<NodeIdentifier, URI> entry : uriMap.entrySet()) {
             final NodeIdentifier nodeId = entry.getKey();
             final URI nodeUri = entry.getValue();
@@ -339,15 +336,15 @@ public class HttpRequestReplicatorImpl implements 
HttpRequestReplicator {
             }
 
             final StringBuilder sb = new StringBuilder();
-            sb.append("Node Responses for ").append(method).append(" 
").append(path).append(" (Request ID ").append(requestId).append("):\n");
+            sb.append("Node Responses for ").append(method).append(" 
").append(path).append(":\n");
             for (final NodeResponse response : result) {
                 sb.append(response).append("\n");
             }
 
             final long averageNanos = (nanosAdded == 0) ? -1L : nanosSum / 
nanosAdded;
             final long averageMillis = (averageNanos < 0) ? averageNanos : 
TimeUnit.MILLISECONDS.convert(averageNanos, TimeUnit.NANOSECONDS);
-            logger.debug("For {} {} (Request ID {}), minimum response time = 
{}, max = {}, average = {} ms",
-                    method, path, requestId, min, max, averageMillis);
+            logger.debug("For {} {}, minimum response time = {}, max = {}, 
average = {} ms",
+                method, path, min, max, averageMillis);
             logger.debug(sb.toString());
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 59d582b..f7b5745 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -19,7 +19,6 @@ package org.apache.nifi.cluster.manager.impl;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -54,13 +53,10 @@ import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMSource;
 import javax.xml.transform.stream.StreamResult;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.AuditService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
-import org.apache.nifi.cluster.context.ClusterContext;
-import org.apache.nifi.cluster.context.ClusterContextImpl;
 import 
org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
@@ -76,7 +72,6 @@ import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import org.apache.nifi.cluster.flow.ClusterDataFlow;
 import org.apache.nifi.cluster.flow.DaoException;
 import org.apache.nifi.cluster.flow.DataFlowManagementService;
-import org.apache.nifi.cluster.flow.PersistedFlowState;
 import org.apache.nifi.cluster.manager.HttpClusterManager;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.ConflictingNodeIdException;
@@ -165,11 +160,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.DomUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.util.ReflectionUtils;
-import org.apache.nifi.web.OptimisticLockingManager;
-import org.apache.nifi.web.Revision;
-import org.apache.nifi.web.UpdateRevision;
 import org.apache.nifi.web.api.dto.BulletinDTO;
 import org.apache.nifi.web.util.WebUtils;
 import org.slf4j.Logger;
@@ -197,58 +188,21 @@ import 
com.sun.jersey.api.client.config.DefaultClientConfig;
  */
 public class WebClusterManager implements HttpClusterManager, ProtocolHandler, 
ControllerServiceProvider, ReportingTaskProvider, RequestCompletionCallback {
 
-    public static final String ROOT_GROUP_ID_ALIAS = "root";
     public static final String BULLETIN_CATEGORY = "Clustering";
 
     private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(WebClusterManager.class));
 
-    /**
-     * The HTTP header to store a cluster context. An example of what may be 
stored in the context is a node's auditable actions in response to a cluster 
request. The cluster context is serialized
-     * using Java's serialization mechanism and hex encoded.
-     */
-    public static final String CLUSTER_CONTEXT_HTTP_HEADER = 
"X-ClusterContext";
-
-    /**
-     * HTTP Header that stores a unique ID for each request that is replicated 
to the nodes. This is used for logging purposes so that request information, 
such as timing, can be correlated between
-     * the NCM and the nodes
-     */
-    public static final String REQUEST_ID_HEADER = "X-RequestID";
-
-    /**
-     * The HTTP header that the NCM specifies to ask a node if they are able 
to process a given request. The value is always 150-NodeContinue. The node will 
respond with 150 CONTINUE if it is able to
-     * process the request, 417 EXPECTATION_FAILED otherwise.
-     */
-    public static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects";
-    public static final int NODE_CONTINUE_STATUS_CODE = 150;
-
-    /**
-     * The HTTP header that the NCM specifies to indicate that a node should 
invalidate the specified user group. This is done to ensure that user cache is 
not stale when an administrator modifies a
-     * group through the UI.
-     */
-    public static final String CLUSTER_INVALIDATE_USER_GROUP_HEADER = 
"X-ClusterInvalidateUserGroup";
-
-    /**
-     * The HTTP header that the NCM specifies to indicate that a node should 
invalidate the specified user. This is done to ensure that user cache is not 
stale when an administrator modifies a user
-     * through the UI.
-     */
-    public static final String CLUSTER_INVALIDATE_USER_HEADER = 
"X-ClusterInvalidateUser";
 
     /**
      * The default number of seconds to respond to a connecting node if the 
manager cannot provide it with a current data flow.
      */
     private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
 
-
-    public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = 
Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
-
-
-    public static final Pattern REPORTING_TASK_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
     public static final Pattern COUNTER_URI_PATTERN = 
Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
 
     private final NiFiProperties properties;
     private final DataFlowManagementService dataFlowManagementService;
     private final ClusterManagerProtocolSenderListener senderListener;
-    private final OptimisticLockingManager optimisticLockingManager;
     private final StringEncryptor encryptor;
     private final ReentrantReadWriteLock resourceRWLock = new 
ReentrantReadWriteLock(true);
     private final ClusterManagerLock readLock = new 
ClusterManagerLock(resourceRWLock.readLock(), "Read");
@@ -281,8 +235,8 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     private final RequestReplicator httpRequestReplicator;
 
     public WebClusterManager(
-            final DataFlowManagementService dataFlowManagementService, final 
ClusterManagerProtocolSenderListener senderListener,
-            final NiFiProperties properties, final StringEncryptor encryptor, 
final OptimisticLockingManager optimisticLockingManager) {
+        final DataFlowManagementService dataFlowManagementService, final 
ClusterManagerProtocolSenderListener senderListener,
+        final NiFiProperties properties, final StringEncryptor encryptor) {
 
         if (dataFlowManagementService == null) {
             throw new IllegalArgumentException("DataFlowManagementService may 
not be null.");
@@ -298,7 +252,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         this.instanceId = UUID.randomUUID().toString();
         this.senderListener = senderListener;
         this.encryptor = encryptor;
-        this.optimisticLockingManager = optimisticLockingManager;
         senderListener.addHandler(this);
         senderListener.setBulletinRepository(bulletinRepository);
 
@@ -356,7 +309,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
         final Client jerseyClient = WebUtils.createClient(new 
DefaultClientConfig(), SslContextFactory.createSslContext(properties));
         return new ThreadPoolRequestReplicator(numThreads, jerseyClient, 
clusterCoordinator, connectionTimeout, readTimeout, this,
-            eventReporter, this, optimisticLockingManager, 
dataFlowManagementService);
+            eventReporter, this, dataFlowManagementService);
     }
 
     private EventReporter createEventReporter() {
@@ -1734,85 +1687,21 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
         logger.debug("Applying prototype request " + uri + " to nodes.");
 
-        // the starting state of the flow (current, stale, unknown)
-        final PersistedFlowState originalPersistedFlowState = 
dataFlowManagementService.getPersistedFlowState();
-
-        // check if this request can change the flow
-        final boolean mutableRequest = canChangeNodeState(method, uri);
-
-        final ObjectHolder<NodeResponse> holder = new ObjectHolder<>(null);
-        final UpdateRevision federateRequest = new UpdateRevision() {
-            @Override
-            public Revision execute(Revision currentRevision) {
-                // update headers to contain cluster contextual information to 
send to the node
-                final Map<String, String> updatedHeaders = new 
HashMap<>(headers);
-                final ClusterContext clusterCtx = new ClusterContextImpl();
-                clusterCtx.setRequestSentByClusterManager(true);               
  // indicate request is sent from cluster manager
-                clusterCtx.setRevision(currentRevision);
-
-                // serialize cluster context and add to request header
-                final String serializedClusterCtx = 
WebUtils.serializeObjectToHex(clusterCtx);
-                updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, 
serializedClusterCtx);
-
-                // replicate request
-                final AsyncClusterResponse clusterResponse = 
httpRequestReplicator.replicate(nodeIds, method, uri, entity == null ? 
parameters : entity, updatedHeaders);
+        // replicate request
+        final AsyncClusterResponse clusterResponse = 
httpRequestReplicator.replicate(nodeIds, method, uri, entity == null ? 
parameters : entity, headers);
 
-                final NodeResponse clientResponse;
-                try {
-                    clientResponse = clusterResponse.awaitMergedResponse();
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    logger.warn("Thread was interrupted while waiting for a 
response from one or more nodes", e);
-                    final Set<NodeIdentifier> noResponses = 
clusterResponse.getNodesInvolved();
-                    
noResponses.removeAll(clusterResponse.getCompletedNodeIdentifiers());
-                    throw new IllegalClusterStateException("Interrupted while 
waiting for a response from the following nodes: " + noResponses, e);
-                }
-
-                holder.set(clientResponse);
-
-                // if we have a response get the updated cluster context for 
auditing and revision updating
-                Revision updatedRevision = null;
-                if (mutableRequest && clientResponse != null) {
-                    try {
-                        // get the cluster context from the response header
-                        final String serializedClusterContext = 
clientResponse.getClientResponse().getHeaders().getFirst(CLUSTER_CONTEXT_HTTP_HEADER);
-                        if (StringUtils.isNotBlank(serializedClusterContext)) {
-                            // deserialize object
-                            final Serializable clusterContextObj = 
WebUtils.deserializeHexToObject(serializedClusterContext);
-
-                            // if we have a valid object, audit the actions
-                            if (clusterContextObj instanceof ClusterContext) {
-                                final ClusterContext clusterContext = 
(ClusterContext) clusterContextObj;
-                                if (auditService != null) {
-                                    try {
-                                        
auditService.addActions(clusterContext.getActions());
-                                    } catch (final Throwable t) {
-                                        logger.warn("Unable to record actions: 
" + t.getMessage());
-                                        if (logger.isDebugEnabled()) {
-                                            logger.warn(StringUtils.EMPTY, t);
-                                        }
-                                    }
-                                }
-                                updatedRevision = clusterContext.getRevision();
-                            }
-                        }
-                    } catch (final ClassNotFoundException cnfe) {
-                        logger.warn("Classpath issue detected because failed 
to deserialize cluster context from node response due to: " + cnfe, cnfe);
-                    }
-                }
-
-                return updatedRevision;
-            }
-        };
-
-        // federate the request and lock on the revision
-        if (mutableRequest) {
-            optimisticLockingManager.setRevision(federateRequest);
-        } else {
-            
federateRequest.execute(optimisticLockingManager.getLastModification().getRevision());
+        final NodeResponse clientResponse;
+        try {
+            clientResponse = clusterResponse.awaitMergedResponse();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("Thread was interrupted while waiting for a response 
from one or more nodes", e);
+            final Set<NodeIdentifier> noResponses = 
clusterResponse.getNodesInvolved();
+            
noResponses.removeAll(clusterResponse.getCompletedNodeIdentifiers());
+            throw new IllegalClusterStateException("Interrupted while waiting 
for a response from the following nodes: " + noResponses, e);
         }
 
-        return holder.get();
+        return clientResponse;
     }
 
 
@@ -2152,20 +2041,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         }
     }
 
-    private void notifyDataFlowManagmentServiceOfFlowStateChange(final 
PersistedFlowState newState) {
-        writeLock.lock();
-        try {
-            logger.debug("Notifying DataFlow Management Service that flow 
state is " + newState);
-            dataFlowManagementService.setPersistedFlowState(newState);
-            if (newState != PersistedFlowState.CURRENT) {
-                cachedDataFlow = null;
-                /* do not reset primary node ID  because only the data flow 
has changed */
-            }
-        } finally {
-            
writeLock.unlock("notifyDataFlowManagementServiceOfFlowStateChange");
-        }
-    }
-
     public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) {
         return heartbeatMonitor.getLatestHeartbeat(nodeId);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
index a06bdaf..95263bd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java
@@ -27,7 +27,6 @@ import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
 import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.OptimisticLockingManager;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.FactoryBean;
 import org.springframework.context.ApplicationContext;
@@ -46,8 +45,6 @@ public class WebClusterManagerFactoryBean implements 
FactoryBean, ApplicationCon
 
     private StringEncryptor encryptor;
 
-    private OptimisticLockingManager optimisticLockingManager;
-
     @Override
     public Object getObject() throws Exception {
         if (properties.isClusterManager() && properties.isNode()) {
@@ -67,8 +64,7 @@ public class WebClusterManagerFactoryBean implements 
FactoryBean, ApplicationCon
                     dataFlowService,
                     senderListener,
                     properties,
-                    encryptor,
-                    optimisticLockingManager
+                encryptor
             );
 
             // set the service broadcaster
@@ -119,8 +115,4 @@ public class WebClusterManagerFactoryBean implements 
FactoryBean, ApplicationCon
     public void setEncryptor(final StringEncryptor encryptor) {
         this.encryptor = encryptor;
     }
-
-    public void setOptimisticLockingManager(OptimisticLockingManager 
optimisticLockingManager) {
-        this.optimisticLockingManager = optimisticLockingManager;
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
index 1de15cf..559462e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -59,14 +59,10 @@
         <property name="properties" ref="nifiProperties"/>
     </bean>
 
-    <!-- cluster manager optimistic locking manager -->
-    <bean id="clusterManagerOptimisticLockingManager" 
class="org.apache.nifi.web.StandardOptimisticLockingManager"/>
-
     <!-- cluster manager -->
     <bean id="clusterManager" 
class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean">
         <property name="properties" ref="nifiProperties"/>
         <property name="encryptor" ref="stringEncryptor"/>
-        <property name="optimisticLockingManager" 
ref="clusterManagerOptimisticLockingManager"/>
     </bean>
     
     <!-- discoverable services -->

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
index 6d3571b..1cc210e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
@@ -40,7 +40,6 @@ import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.web.StandardOptimisticLockingManager;
 import org.apache.nifi.web.api.entity.Entity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.junit.Assert;
@@ -156,13 +155,12 @@ public class TestThreadPoolRequestReplicator {
 
         final AtomicInteger requestCount = new AtomicInteger(0);
         final DataFlowManagementService dfmService = 
Mockito.mock(DataFlowManagementService.class);
-        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator,
-            "1 sec", "1 sec", null, null, null, new 
StandardOptimisticLockingManager(), dfmService) {
+        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", 
null, null, null, dfmService) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
                 // the resource builder will not expose its headers to us, so 
we are using Mockito's Whitebox class to extract them.
                 final OutBoundHeaders headers = (OutBoundHeaders) 
Whitebox.getInternalState(resourceBuilder, "metadata");
-                final Object expectsHeader = 
headers.getFirst(ThreadPoolRequestReplicator.NCM_EXPECTS_HTTP_HEADER);
+                final Object expectsHeader = 
headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
 
                 final int statusCode;
                 if (requestCount.incrementAndGet() == 1) {
@@ -208,13 +206,12 @@ public class TestThreadPoolRequestReplicator {
 
         final AtomicInteger requestCount = new AtomicInteger(0);
         final DataFlowManagementService dfmService = 
Mockito.mock(DataFlowManagementService.class);
-        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator,
-            "1 sec", "1 sec", null, null, null, new 
StandardOptimisticLockingManager(), dfmService) {
+        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", 
null, null, null, dfmService) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
                 // the resource builder will not expose its headers to us, so 
we are using Mockito's Whitebox class to extract them.
                 final OutBoundHeaders headers = (OutBoundHeaders) 
Whitebox.getInternalState(resourceBuilder, "metadata");
-                final Object expectsHeader = 
headers.getFirst(ThreadPoolRequestReplicator.NCM_EXPECTS_HTTP_HEADER);
+                final Object expectsHeader = 
headers.getFirst(ThreadPoolRequestReplicator.REQUEST_VALIDATION_HTTP_HEADER);
 
                 final int requestIndex = requestCount.incrementAndGet();
                 assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, 
expectsHeader);
@@ -256,8 +253,7 @@ public class TestThreadPoolRequestReplicator {
         final ClusterCoordinator coordinator = 
Mockito.mock(ClusterCoordinator.class);
 
         final DataFlowManagementService dfmService = 
Mockito.mock(DataFlowManagementService.class);
-        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator,
-            "1 sec", "1 sec", null, null, null, new 
StandardOptimisticLockingManager(), dfmService) {
+        final ThreadPoolRequestReplicator replicator = new 
ThreadPoolRequestReplicator(2, new Client(), coordinator, "1 sec", "1 sec", 
null, null, null, dfmService) {
             @Override
             protected NodeResponse replicateRequest(final WebResource.Builder 
resourceBuilder, final NodeIdentifier nodeId, final String method, final URI 
uri, final String requestId) {
                 if (delayMillis > 0L) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index f7912a1..c5fd95c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -170,6 +170,9 @@ 
nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}
 nifi.zookeeper.session.timeout=${nifi.zookeeper.session.timeout}
 nifi.zookeeper.root.node=${nifi.zookeeper.root.node}
 
+# How long a request should be allowed to hold a 'lock' on a component. #
+nifi.cluster.request.replication.claim.timeout=${nifi.cluster.request.replication.claim.timeout}
+
 # cluster manager properties (only configure for cluster manager) #
 nifi.cluster.is.manager=${nifi.cluster.is.manager}
 nifi.cluster.manager.address=${nifi.cluster.manager.address}

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
index 937f368..a6c17b1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/pom.xml
@@ -154,11 +154,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-framework-cluster-web</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java
index 85f0b9f..14c4b64 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/NiFiAuditor.java
@@ -18,16 +18,15 @@ package org.apache.nifi.audit;
 
 import java.util.ArrayList;
 import java.util.Collection;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.details.FlowChangeMoveDetails;
 import org.apache.nifi.action.details.MoveDetails;
 import org.apache.nifi.admin.service.AuditService;
-import org.apache.nifi.cluster.context.ClusterContext;
-import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.dao.ProcessGroupDAO;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 
 /**
@@ -58,13 +57,6 @@ public abstract class NiFiAuditor {
      * @param logger logger
      */
     protected void saveActions(Collection<Action> actions, Logger logger) {
-        ClusterContext ctx = ClusterContextThreadLocal.getContext();
-
-        // if we're a connected node, then put audit actions on threadlocal to 
propagate back to manager
-        if (ctx != null) {
-            ctx.getActions().addAll(actions);
-        }
-
         // always save the actions regardless of cluster or stand-alone
         // all nodes in a cluster will have their own local copy without 
batching
         try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index fa78b93..41b9867 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -81,6 +81,7 @@ import org.apache.nifi.web.api.entity.SnippetEntity;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -306,19 +307,21 @@ public interface NiFiServiceFacade {
      * @param description description
      * @param snippetId id
      * @param groupId id of the process group
+     * @param idGenerationSeed the seed to use for generating a UUID
      * @return template
      */
-    TemplateDTO createTemplate(String name, String description, String 
snippetId, String groupId);
+    TemplateDTO createTemplate(String name, String description, String 
snippetId, String groupId, Optional<String> idGenerationSeed);
 
     /**
      * Imports the specified Template.
      *
      * @param templateDTO The template dto
      * @param groupId id of the process group
+     * @param idGenerationSeed the seed to use for generating a UUID
      *
      * @return The new template dto
      */
-    TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId);
+    TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId, 
Optional<String> idGenerationSeed);
 
     /**
      * Instantiate the corresponding template.
@@ -327,9 +330,10 @@ public interface NiFiServiceFacade {
      * @param templateId template id
      * @param originX x
      * @param originY y
+     * @param idGenerationSeed the ID to use for generating UUID's. May be 
null.
      * @return snapshot
      */
-    FlowEntity createTemplateInstance(String groupId, Double originX, Double 
originY, String templateId);
+    FlowEntity createTemplateInstance(String groupId, Double originX, Double 
originY, String templateId, String idGenerationSeed);
 
     /**
      * Gets the template with the specified id.
@@ -1302,9 +1306,10 @@ public interface NiFiServiceFacade {
      * @param snippetId snippet id
      * @param originX x
      * @param originY y
+     * @param idGenerationSeed the seed to use for generating UUID's. May be 
null.
      * @return snapshot
      */
-    FlowEntity copySnippet(String groupId, String snippetId, Double originX, 
Double originY);
+    FlowEntity copySnippet(String groupId, String snippetId, Double originX, 
Double originY, String idGenerationSeed);
 
     /**
      * Creates a new snippet.

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b74e4de/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 2961ab3..0600cb6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -16,6 +16,30 @@
  */
 package org.apache.nifi.web;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import javax.ws.rs.WebApplicationException;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.Component;
@@ -30,8 +54,6 @@ import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
-import org.apache.nifi.cluster.context.ClusterContext;
-import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
 import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.impl.WebClusterManager;
@@ -42,6 +64,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
@@ -168,28 +191,6 @@ import org.apache.nifi.web.util.SnippetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.WebApplicationException;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
 /**
  * Implementation of NiFiServiceFacade that performs revision checking.
  */
@@ -200,9 +201,6 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     private ControllerFacade controllerFacade;
     private SnippetUtils snippetUtils;
 
-    // optimistic locking manager
-//    private OptimisticLockingManager optimisticLockingManager;
-
     // revision manager
     private RevisionManager revisionManager;
 
@@ -239,7 +237,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     // -----------------------------------------
     @Override
     public void claimRevision(Revision revision) {
-        revisionManager.requestClaim(revision);
+        revisionManager.requestClaim(revision, NiFiUserUtils.getNiFiUser());
     }
 
     // -----------------------------------------
@@ -274,22 +272,12 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public void verifyDeleteConnection(String connectionId) {
-        try {
-            connectionDAO.verifyDelete(connectionId);
-        } catch (final Exception e) {
-            revisionManager.cancelClaim(connectionId);
-            throw e;
-        }
+        connectionDAO.verifyDelete(connectionId);
     }
 
     @Override
     public void verifyDeleteFunnel(String funnelId) {
-        try {
-            funnelDAO.verifyDelete(funnelId);
-        } catch (final Exception e) {
-            revisionManager.cancelClaim(funnelId);
-            throw e;
-        }
+        funnelDAO.verifyDelete(funnelId);
     }
 
     @Override
@@ -308,12 +296,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public void verifyDeleteInputPort(String inputPortId) {
-        try {
-            inputPortDAO.verifyDelete(inputPortId);
-        } catch (final Exception e) {
-            revisionManager.cancelClaim(inputPortId);
-            throw e;
-        }
+        inputPortDAO.verifyDelete(inputPortId);
     }
 
     @Override
@@ -332,12 +315,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public void verifyDeleteOutputPort(String outputPortId) {
-        try {
-            outputPortDAO.verifyDelete(outputPortId);
-        } catch (final Exception e) {
-            revisionManager.cancelClaim(outputPortId);
-            throw e;
-        }
+        outputPortDAO.verifyDelete(outputPortId);
     }
 
     @Override
@@ -356,12 +334,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public void verifyDeleteProcessor(String processorId) {
-        try {
-            processorDAO.verifyDelete(processorId);
-        } catch (final Exception e) {
-            revisionManager.cancelClaim(processorId);
-            throw e;
-        }
+        processorDAO.verifyDelete(processorId);
     }
 
     @Override
@@ -380,12 +353,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public void verifyDeleteProcessGroup(String groupId) {
-        try {
-            processGroupDAO.verifyDelete(groupId);
-        } catch (final Exception e) {
-            revisionManager.cancelClaim(groupId);
-            throw e;
-        }
+        processGroupDAO.verifyDelete(groupId);
     }
 
     @Override
@@ -424,12 +392,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public void verifyDeleteRemoteProcessGroup(String remoteProcessGroupId) {
-        try {
-            remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId);
-        } catch (final Exception e) {
-            revisionManager.cancelClaim(remoteProcessGroupId);
-            throw e;
-        }
+        remoteProcessGroupDAO.verifyDelete(remoteProcessGroupId);
     }
 
     @Override
@@ -458,12 +421,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public void verifyDeleteControllerService(String controllerServiceId) {
-        try {
-            controllerServiceDAO.verifyDelete(controllerServiceId);
-        } catch (final Exception e) {
-            revisionManager.cancelClaim(controllerServiceId);
-            throw e;
-        }
+        controllerServiceDAO.verifyDelete(controllerServiceId);
     }
 
     @Override
@@ -482,12 +440,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public void verifyDeleteReportingTask(String reportingTaskId) {
-        try {
-            reportingTaskDAO.verifyDelete(reportingTaskId);
-        } catch (final Exception e) {
-            revisionManager.cancelClaim(reportingTaskId);
-            throw e;
-        }
+        reportingTaskDAO.verifyDelete(reportingTaskId);
     }
 
     // -----------------------------------------
@@ -581,9 +534,10 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
      * @return A ConfigurationSnapshot that represents the new configuration
      */
     private <D, C> RevisionUpdate<D> updateComponent(final Revision revision, 
final Authorizable authorizable, final Supplier<C> daoUpdate, final Function<C, 
D> dtoCreation) {
-        final String modifier = NiFiUserUtils.getNiFiUserName();
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        final String modifier = user.getUserName();
         try {
-            final RevisionUpdate<D> updatedComponent = 
revisionManager.updateRevision(new StandardRevisionClaim(revision), modifier, 
new UpdateRevisionTask<D>() {
+            final RevisionUpdate<D> updatedComponent = 
revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new 
UpdateRevisionTask<D>() {
                 @Override
                 public RevisionUpdate<D> update() {
                     // ensure write access to the flow
@@ -706,12 +660,13 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
             requestProcessGroup.authorize(authorizer, RequestAction.WRITE);
         }
 
-        final String modifier = NiFiUserUtils.getNiFiUserName();
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        final String modifier = user.getUserName();
         final RevisionClaim revisionClaim = new 
StandardRevisionClaim(requiredRevisions);
 
         RevisionUpdate<SnippetDTO> versionedSnippet;
         try {
-            versionedSnippet = revisionManager.updateRevision(revisionClaim, 
modifier, new UpdateRevisionTask<SnippetDTO>() {
+            versionedSnippet = revisionManager.updateRevision(revisionClaim, 
user, new UpdateRevisionTask<SnippetDTO>() {
                 @Override
                 public RevisionUpdate<SnippetDTO> update() {
                     // get the updated component
@@ -1081,7 +1036,9 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
      */
     private <D, C> D deleteComponent(final Revision revision, final 
Authorizable authorizable, final Runnable deleteAction, final D dto) {
         final RevisionClaim claim = new StandardRevisionClaim(revision);
-        return revisionManager.deleteRevision(claim, new 
DeleteRevisionTask<D>() {
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+        return revisionManager.deleteRevision(claim, user, new 
DeleteRevisionTask<D>() {
             @Override
             public D performTask() {
                 logger.debug("Attempting to delete component {} with claim 
{}", authorizable, claim);
@@ -1089,6 +1046,14 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
                 // ensure access to the component
                 authorizable.authorize(authorizer, RequestAction.WRITE);
 
+                // If the component has outgoing connections, ensure that we 
can delete them all.
+                if (authorizable instanceof Connectable) {
+                    final Connectable connectable = (Connectable) authorizable;
+                    for (final Connection connection : 
connectable.getConnections()) {
+                        connection.authorize(authorizer, RequestAction.WRITE);
+                    }
+                }
+
                 deleteAction.run();
 
                 // save the flow
@@ -1102,12 +1067,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public void verifyDeleteSnippet(String id) {
-        try {
-            snippetDAO.verifyDelete(id);
-        } catch (final Exception e) {
-            revisionManager.cancelClaim(id);
-            throw e;
-        }
+        snippetDAO.verifyDelete(id);
     }
 
     @Override
@@ -1356,7 +1316,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
-    public FlowEntity copySnippet(final String groupId, final String 
snippetId, final Double originX, final Double originY) {
+    public FlowEntity copySnippet(final String groupId, final String 
snippetId, final Double originX, final Double originY, final String 
idGenerationSeed) {
         final FlowDTO flowDto = revisionManager.get(groupId,
             rev -> {
                 // ensure access to process group
@@ -1364,7 +1324,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
                 processGroup.authorize(authorizer, RequestAction.WRITE);
 
                 // create the new snippet
-                final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, 
snippetId, originX, originY);
+                final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, 
snippetId, originX, originY, idGenerationSeed);
 
                 // TODO - READ access to all components in snippet
 
@@ -1505,7 +1465,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
-    public TemplateDTO createTemplate(String name, String description, String 
snippetId, String groupId) {
+    public TemplateDTO createTemplate(String name, String description, String 
snippetId, String groupId, Optional<String> idGenerationSeed) {
         // get the specified snippet
         Snippet snippet = snippetDAO.getSnippet(snippetId);
 
@@ -1517,12 +1477,8 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         templateDTO.setSnippet(snippetUtils.populateFlowSnippet(snippet, true, 
true));
 
         // set the id based on the specified seed
-        final ClusterContext clusterContext = 
ClusterContextThreadLocal.getContext();
-        if (clusterContext != null) {
-            
templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
-        } else {
-            templateDTO.setId(UUID.randomUUID().toString());
-        }
+        final String uuid = idGenerationSeed.isPresent() ? 
(UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString()
 : UUID.randomUUID().toString();
+        templateDTO.setId(uuid);
 
         // create the template
         Template template = templateDAO.createTemplate(templateDTO, groupId);
@@ -1531,14 +1487,10 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
-    public TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId) 
{
+    public TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId, 
Optional<String> idGenerationSeed) {
         // ensure id is set
-        final ClusterContext clusterContext = 
ClusterContextThreadLocal.getContext();
-        if (clusterContext != null) {
-            
templateDTO.setId(UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString());
-        } else {
-            templateDTO.setId(UUID.randomUUID().toString());
-        }
+        final String uuid = idGenerationSeed.isPresent() ? 
(UUID.nameUUIDFromBytes(idGenerationSeed.get().getBytes(StandardCharsets.UTF_8))).toString()
 : UUID.randomUUID().toString();
+        templateDTO.setId(uuid);
 
         // mark the timestamp
         templateDTO.setTimestamp(new Date());
@@ -1551,7 +1503,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     }
 
     @Override
-    public FlowEntity createTemplateInstance(final String groupId, final 
Double originX, final Double originY, final String templateId) {
+    public FlowEntity createTemplateInstance(final String groupId, final 
Double originX, final Double originY, final String templateId, final String 
idGenerationSeed) {
         final FlowDTO flowDto = revisionManager.get(groupId, rev -> {
             // ensure access to process group
             final ProcessGroup processGroup = 
processGroupDAO.getProcessGroup(groupId);
@@ -1559,7 +1511,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
             // instantiate the template - there is no need to make another 
copy of the flow snippet since the actual template
             // was copied and this dto is only used to instantiate it's 
components (which as already completed)
-            final FlowSnippetDTO snippet = 
templateDAO.instantiateTemplate(groupId, originX, originY, templateId);
+            final FlowSnippetDTO snippet = 
templateDAO.instantiateTemplate(groupId, originX, originY, templateId, 
idGenerationSeed);
 
             // TODO - READ access to all components in snippet
 
@@ -1624,9 +1576,10 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
     @Override
     public ProcessorEntity setProcessorAnnotationData(final Revision revision, 
final String processorId, final String annotationData) {
-        final String modifier = NiFiUserUtils.getNiFiUserName();
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        final String modifier = user.getUserName();
 
-        final RevisionUpdate<ProcessorEntity> update = 
revisionManager.updateRevision(new StandardRevisionClaim(revision), modifier, 
new UpdateRevisionTask<ProcessorEntity>() {
+        final RevisionUpdate<ProcessorEntity> update = 
revisionManager.updateRevision(new StandardRevisionClaim(revision), user, new 
UpdateRevisionTask<ProcessorEntity>() {
             @Override
             public RevisionUpdate<ProcessorEntity> update() {
                 // create the processor config
@@ -1701,9 +1654,9 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         final Map<String, Revision> referenceRevisions, final String 
controllerServiceId, final ScheduledState scheduledState, final 
ControllerServiceState controllerServiceState) {
 
         final RevisionClaim claim = new 
StandardRevisionClaim(referenceRevisions.values());
-        final String modifier = NiFiUserUtils.getNiFiUserName();
 
-        final RevisionUpdate<ControllerServiceReferencingComponentsEntity> 
update = revisionManager.updateRevision(claim, modifier,
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+        final RevisionUpdate<ControllerServiceReferencingComponentsEntity> 
update = revisionManager.updateRevision(claim, user,
             new 
UpdateRevisionTask<ControllerServiceReferencingComponentsEntity>() {
                 @Override
                 public 
RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {

Reply via email to