NIFI-1801: Scope Templates to Process Groups. This closes #446.

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/270944ec
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/270944ec
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/270944ec

Branch: refs/heads/master
Commit: 270944ec692e12c221cdff202bdab56309dfcfd7
Parents: 347b281
Author: Mark Payne <[email protected]>
Authored: Thu May 12 13:09:36 2016 -0400
Committer: Matt Gilman <[email protected]>
Committed: Mon May 16 16:12:43 2016 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/web/Revision.java |   5 -
 .../apache/nifi/web/api/dto/TemplateDTO.java    |  15 +-
 .../nifi/cluster/protocol/StandardDataFlow.java |  11 +-
 .../protocol/jaxb/message/AdaptedDataFlow.java  |   9 -
 .../protocol/jaxb/message/DataFlowAdapter.java  |   3 +-
 .../impl/NodeProtocolSenderImplTest.java        |   2 +-
 .../ClusterProtocolHeartbeatMonitor.java        |  17 -
 .../endpoints/FlowSnippetEndpointMerger.java    |  44 +-
 .../http/endpoints/ProcessorEndpointMerger.java |  14 +
 .../RemoteProcessGroupEndpointMerger.java       |  13 +
 .../http/replication/ResponseUtils.java         |   2 +-
 .../ThreadPoolRequestReplicator.java            |  30 +-
 .../nifi/cluster/flow/impl/DataFlowDaoImpl.java |   4 +-
 .../cluster/manager/impl/WebClusterManager.java |   6 +-
 .../impl/DataFlowManagementServiceImplTest.java |  14 +-
 .../apache/nifi/cluster/protocol/DataFlow.java  |   5 -
 .../org/apache/nifi/controller/Template.java    | 191 +++++++
 .../org/apache/nifi/groups/ProcessGroup.java    |  42 ++
 .../apache/nifi/controller/FlowController.java  |  75 +--
 .../nifi/controller/StandardFlowService.java    | 100 +++-
 .../controller/StandardFlowSynchronizer.java    |  99 +---
 .../org/apache/nifi/controller/Template.java    |  37 --
 .../apache/nifi/controller/TemplateManager.java | 524 -------------------
 .../apache/nifi/controller/TemplateUtils.java   | 287 ++++++++++
 .../serialization/StandardFlowSerializer.java   |  26 +
 .../nifi/fingerprint/FingerprintFactory.java    |   5 +-
 .../nifi/groups/StandardProcessGroup.java       |  95 ++++
 .../nifi/persistence/TemplateSerializer.java    |   1 +
 .../controller/StandardFlowServiceTest.java     |  16 +-
 .../service/mock/MockProcessGroup.java          |  30 ++
 .../org/apache/nifi/web/NiFiServiceFacade.java  |   7 +-
 .../nifi/web/StandardNiFiServiceFacade.java     | 300 ++++++++---
 .../nifi/web/api/ProcessGroupResource.java      |  11 +-
 .../apache/nifi/web/api/ProcessorResource.java  |  23 +-
 .../config/IllegalArgumentExceptionMapper.java  |   5 +-
 .../nifi/web/controller/ControllerFacade.java   |   2 +-
 .../org/apache/nifi/web/dao/TemplateDAO.java    |   6 +-
 .../nifi/web/dao/impl/StandardTemplateDAO.java  |  45 +-
 .../nifi/web/revision/NaiveRevisionManager.java | 131 +++--
 .../nifi/web/revision/RevisionManager.java      |   8 +
 .../web/revision/TestNaiveRevisionManager.java  |  65 ++-
 41 files changed, 1325 insertions(+), 1000 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-api/src/main/java/org/apache/nifi/web/Revision.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/web/Revision.java 
b/nifi-api/src/main/java/org/apache/nifi/web/Revision.java
index 0533307..4c47dde 100644
--- a/nifi-api/src/main/java/org/apache/nifi/web/Revision.java
+++ b/nifi-api/src/main/java/org/apache/nifi/web/Revision.java
@@ -81,11 +81,6 @@ public class Revision implements Serializable {
             return false;
         }
 
-        // TODO: THIS IS FOR TESTING PURPOSES! DO NOT LET THIS GET CHECKED IN 
THIS WAY!!!!!!!!!!!!
-        if (true) {
-            return true;
-        }
-
         Revision thatRevision = (Revision) obj;
         // ensure that component ID's are the same (including null)
         if (thatRevision.getComponentId() == null && getComponentId() != null) 
{

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java
index 6fa9daf..4f54923 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/TemplateDTO.java
@@ -31,6 +31,7 @@ public class TemplateDTO {
     private String uri;
 
     private String id;
+    private String groupId;
     private String name;
     private String description;
     private Date timestamp;
@@ -40,9 +41,7 @@ public class TemplateDTO {
     /**
      * @return id for this template
      */
-    @ApiModelProperty(
-            value = "The id of the template."
-    )
+    @ApiModelProperty("The id of the template.")
     public String getId() {
         return id;
     }
@@ -51,6 +50,16 @@ public class TemplateDTO {
         this.id = id;
     }
 
+    @ApiModelProperty("The id of the Process Group that the template belongs 
to.")
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+
     /**
      * @return uri for this template
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
index 3b6d110..fc3558a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
@@ -29,9 +29,9 @@ import 
org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter;
  */
 @XmlJavaTypeAdapter(DataFlowAdapter.class)
 public class StandardDataFlow implements Serializable, DataFlow {
+    private static final long serialVersionUID = 1L;
 
     private final byte[] flow;
-    private final byte[] templateBytes;
     private final byte[] snippetBytes;
 
     private boolean autoStartProcessors;
@@ -40,23 +40,20 @@ public class StandardDataFlow implements Serializable, 
DataFlow {
      * Constructs an instance.
      *
      * @param flow a valid flow as bytes, which cannot be null
-     * @param templateBytes an XML representation of templates.  May be null.
      * @param snippetBytes an XML representation of snippets.  May be null.
      *
      * @throws NullPointerException if flow is null
      */
-    public StandardDataFlow(final byte[] flow, final byte[] templateBytes, 
final byte[] snippetBytes) {
+    public StandardDataFlow(final byte[] flow, final byte[] snippetBytes) {
         if(flow == null){
             throw new NullPointerException("Flow cannot be null");
         }
         this.flow = flow;
-        this.templateBytes = templateBytes;
         this.snippetBytes = snippetBytes;
     }
 
     public StandardDataFlow(final DataFlow toCopy) {
         this.flow = copy(toCopy.getFlow());
-        this.templateBytes = copy(toCopy.getTemplates());
         this.snippetBytes = copy(toCopy.getSnippets());
         this.autoStartProcessors = toCopy.isAutoStartProcessors();
     }
@@ -70,10 +67,6 @@ public class StandardDataFlow implements Serializable, 
DataFlow {
         return flow;
     }
 
-    @Override
-    public byte[] getTemplates() {
-        return templateBytes;
-    }
 
     @Override
     public byte[] getSnippets() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
index 683fdf5..62796d7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedDataFlow.java
@@ -21,7 +21,6 @@ package org.apache.nifi.cluster.protocol.jaxb.message;
 public class AdaptedDataFlow {
 
     private byte[] flow;
-    private byte[] templates;
     private byte[] snippets;
 
     private boolean autoStartProcessors;
@@ -37,14 +36,6 @@ public class AdaptedDataFlow {
         this.flow = flow;
     }
 
-    public byte[] getTemplates() {
-        return templates;
-    }
-
-    public void setTemplates(byte[] templates) {
-        this.templates = templates;
-    }
-
     public byte[] getSnippets() {
         return snippets;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
index 520b8eb..bd3e69e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/DataFlowAdapter.java
@@ -31,7 +31,6 @@ public class DataFlowAdapter extends 
XmlAdapter<AdaptedDataFlow, StandardDataFlo
 
         if (df != null) {
             aDf.setFlow(df.getFlow());
-            aDf.setTemplates(df.getTemplates());
             aDf.setSnippets(df.getSnippets());
             aDf.setAutoStartProcessors(df.isAutoStartProcessors());
         }
@@ -41,7 +40,7 @@ public class DataFlowAdapter extends 
XmlAdapter<AdaptedDataFlow, StandardDataFlo
 
     @Override
     public StandardDataFlow unmarshal(final AdaptedDataFlow aDf) {
-        final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), 
aDf.getTemplates(), aDf.getSnippets());
+        final StandardDataFlow dataFlow = new StandardDataFlow(aDf.getFlow(), 
aDf.getSnippets());
         dataFlow.setAutoStartProcessors(aDf.isAutoStartProcessors());
         return dataFlow;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
index f9a986f..bd57fe4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
@@ -106,7 +106,7 @@ public class NodeProtocolSenderImplTest {
         
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
         ConnectionResponseMessage mockMessage = new 
ConnectionResponseMessage();
         mockMessage.setConnectionResponse(new 
ConnectionResponse(nodeIdentifier,
-            new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new 
byte[0]), null, null, UUID.randomUUID().toString()));
+            new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0]), null, 
null, UUID.randomUUID().toString()));
         
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
 
         ConnectionRequestMessage request = new ConnectionRequestMessage();

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index d9ef0be..f172915 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -47,7 +47,6 @@ import 
org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
-import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -183,23 +182,7 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
     @Override
     public synchronized void removeHeartbeat(final NodeIdentifier nodeId) {
         logger.debug("Deleting heartbeat for node {}", nodeId);
-        final String nodeInfoPath = clusterNodesPath + "/" + nodeId.getId();
-
         heartbeatMessages.remove(nodeId);
-
-        try {
-            getClient().delete().forPath(nodeInfoPath);
-            logger.info("Removed heartbeat from ZooKeeper for Node {}", 
nodeId);
-        } catch (final NoNodeException e) {
-            // node did not exist. Just return.
-            logger.debug("Attempted to remove heartbeat for Node with ID {} 
but no ZNode existed at {}", nodeId, nodeInfoPath);
-            return;
-        } catch (final Exception e) {
-            logger.warn("Failed to remove heartbeat from ZooKeeper for Node {} 
due to {}", nodeId, e);
-            logger.warn("", e);
-
-            clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Failed 
to remove node's heartbeat from ZooKeeper due to " + e);
-        }
     }
 
     protected Set<NodeIdentifier> getClusterNodeIds() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java
index d7f6948..2063de4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowSnippetEndpointMerger.java
@@ -26,10 +26,10 @@ import java.util.regex.Pattern;
 import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-import org.apache.nifi.web.api.entity.FlowSnippetEntity;
+import org.apache.nifi.web.api.dto.flow.FlowDTO;
+import org.apache.nifi.web.api.entity.FlowEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 
 public class FlowSnippetEndpointMerger implements EndpointResponseMerger {
     public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = 
Pattern.compile("/nifi-api/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
@@ -43,21 +43,21 @@ public class FlowSnippetEndpointMerger implements 
EndpointResponseMerger {
 
     @Override
     public NodeResponse merge(final URI uri, final String method, 
Set<NodeResponse> successfulResponses, final Set<NodeResponse> 
problematicResponses, final NodeResponse clientResponse) {
-        final FlowSnippetEntity responseEntity = 
clientResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
-        final FlowSnippetDTO contents = responseEntity.getContents();
+        final FlowEntity responseEntity = 
clientResponse.getClientResponse().getEntity(FlowEntity.class);
+        final FlowDTO flowDto = responseEntity.getFlow();
 
-        if (contents == null) {
+        if (flowDto == null) {
             return clientResponse;
         } else {
-            final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap 
= new HashMap<>();
-            final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> 
remoteProcessGroupMap = new HashMap<>();
+            final Map<String, Map<NodeIdentifier, ProcessorEntity>> 
processorMap = new HashMap<>();
+            final Map<String, Map<NodeIdentifier, RemoteProcessGroupEntity>> 
remoteProcessGroupMap = new HashMap<>();
 
             for (final NodeResponse nodeResponse : successfulResponses) {
-                final FlowSnippetEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
-                final FlowSnippetDTO nodeContents = 
nodeResponseEntity.getContents();
+                final FlowEntity nodeResponseEntity = nodeResponse == 
clientResponse ? responseEntity : 
nodeResponse.getClientResponse().getEntity(FlowEntity.class);
+                final FlowDTO nodeContents = nodeResponseEntity.getFlow();
 
-                for (final ProcessorDTO nodeProcessor : 
nodeContents.getProcessors()) {
-                    Map<NodeIdentifier, ProcessorDTO> innerMap = 
processorMap.get(nodeProcessor.getId());
+                for (final ProcessorEntity nodeProcessor : 
nodeContents.getProcessors()) {
+                    Map<NodeIdentifier, ProcessorEntity> innerMap = 
processorMap.get(nodeProcessor.getId());
                     if (innerMap == null) {
                         innerMap = new HashMap<>();
                         processorMap.put(nodeProcessor.getId(), innerMap);
@@ -66,8 +66,8 @@ public class FlowSnippetEndpointMerger implements 
EndpointResponseMerger {
                     innerMap.put(nodeResponse.getNodeId(), nodeProcessor);
                 }
 
-                for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : 
nodeContents.getRemoteProcessGroups()) {
-                    Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = 
remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
+                for (final RemoteProcessGroupEntity nodeRemoteProcessGroup : 
nodeContents.getRemoteProcessGroups()) {
+                    Map<NodeIdentifier, RemoteProcessGroupEntity> innerMap = 
remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
                     if (innerMap == null) {
                         innerMap = new HashMap<>();
                         
remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap);
@@ -78,21 +78,19 @@ public class FlowSnippetEndpointMerger implements 
EndpointResponseMerger {
             }
 
             final ProcessorEndpointMerger procMerger = new 
ProcessorEndpointMerger();
-            for (final ProcessorDTO processor : contents.getProcessors()) {
+            for (final ProcessorEntity processor : flowDto.getProcessors()) {
                 final String procId = processor.getId();
-                final Map<NodeIdentifier, ProcessorDTO> mergeMap = 
processorMap.get(procId);
+                final Map<NodeIdentifier, ProcessorEntity> mergeMap = 
processorMap.get(procId);
 
                 procMerger.mergeResponses(processor, mergeMap, 
successfulResponses, problematicResponses);
             }
 
             final RemoteProcessGroupEndpointMerger rpgMerger = new 
RemoteProcessGroupEndpointMerger();
-            for (final RemoteProcessGroupDTO remoteProcessGroup : 
contents.getRemoteProcessGroups()) {
-                if (remoteProcessGroup.getContents() != null) {
-                    final String remoteProcessGroupId = 
remoteProcessGroup.getId();
-                    final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap 
= remoteProcessGroupMap.get(remoteProcessGroupId);
+            for (final RemoteProcessGroupEntity remoteProcessGroup : 
flowDto.getRemoteProcessGroups()) {
+                final String remoteProcessGroupId = remoteProcessGroup.getId();
+                final Map<NodeIdentifier, RemoteProcessGroupEntity> mergeMap = 
remoteProcessGroupMap.get(remoteProcessGroupId);
 
-                    rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, 
successfulResponses, problematicResponses);
-                }
+                rpgMerger.mergeResponses(remoteProcessGroup, mergeMap, 
successfulResponses, problematicResponses);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
index 6a040fa..a892e8a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ProcessorEndpointMerger.java
@@ -72,4 +72,18 @@ public class ProcessorEndpointMerger extends 
AbstractSingleEntityEndpoint<Proces
         // set the merged the validation errors
         
clientDto.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap,
 dtoMap.size()));
     }
+
+    protected void mergeResponses(final ProcessorEntity clientEntity, final 
Map<NodeIdentifier, ProcessorEntity> entityMap, final Set<NodeResponse> 
successfulResponses,
+        final Set<NodeResponse> problematicResponses) {
+
+        final ProcessorDTO clientDto = clientEntity.getComponent();
+        final Map<NodeIdentifier, ProcessorDTO> dtoMap = new HashMap<>();
+        for (final Map.Entry<NodeIdentifier, ProcessorEntity> entry : 
entityMap.entrySet()) {
+            final ProcessorEntity nodeProcEntity = entry.getValue();
+            final ProcessorDTO nodeProcDto = nodeProcEntity.getComponent();
+            dtoMap.put(entry.getKey(), nodeProcDto);
+        }
+
+        mergeResponses(clientDto, dtoMap, successfulResponses, 
problematicResponses);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
index 94383de..56636fb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/RemoteProcessGroupEndpointMerger.java
@@ -19,6 +19,7 @@ package org.apache.nifi.cluster.coordination.http.endpoints;
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -113,4 +114,16 @@ public class RemoteProcessGroupEndpointMerger extends 
AbstractSingleEntityEndpoi
             clientDto.setAuthorizationIssues(mergedAuthorizationIssues);
         }
     }
+
+    protected void mergeResponses(RemoteProcessGroupEntity clientEntity, 
Map<NodeIdentifier, RemoteProcessGroupEntity> entityMap,
+        Set<NodeResponse> successfulResponses, Set<NodeResponse> 
problematicResponses) {
+
+        final RemoteProcessGroupDTO clientDto = clientEntity.getComponent();
+        final Map<NodeIdentifier, RemoteProcessGroupDTO> dtoMap = new 
HashMap<>();
+        for (final Map.Entry<NodeIdentifier, RemoteProcessGroupEntity> entry : 
entityMap.entrySet()) {
+            dtoMap.put(entry.getKey(), entry.getValue().getComponent());
+        }
+
+        mergeResponses(clientDto, dtoMap, successfulResponses, 
problematicResponses);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java
index 8435c60..2fac89e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ResponseUtils.java
@@ -39,7 +39,7 @@ public class ResponseUtils {
     public static Set<NodeIdentifier> findLongResponseTimes(final 
AsyncClusterResponse response, final double stdDeviationMultiple) {
         final Set<NodeIdentifier> slowResponses = new HashSet<>();
 
-        if (response.isOlderThan(2, TimeUnit.SECONDS)) {
+        if (response.isOlderThan(1, TimeUnit.SECONDS)) {
             // If the response is older than 2 seconds, determines if any node 
took a long time to respond.
             final Set<NodeIdentifier> completedIds = 
response.getCompletedNodeIdentifiers();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 3b71654..d218af2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -228,17 +228,24 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             throw new IllegalArgumentException("Cannot replicate request to 0 
nodes");
         }
 
+        logger.debug("Replicating request {} {} with entity {} to {}; response 
is {}", method, uri, entity, nodeIds, response);
+
+        // Update headers to indicate the current revision so that we can
+        // prevent multiple users changing the flow at the same time
+        final Map<String, String> updatedHeaders = new HashMap<>(headers);
+        final String requestId = 
updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID, key -> 
UUID.randomUUID().toString());
+
         if (performVerification) {
             verifyState(method, uri.getPath());
         }
 
         final int numRequests = responseMap.size();
         if (numRequests >= MAX_CONCURRENT_REQUESTS) {
+            logger.debug("Cannot replicate request because there are {} 
outstanding HTTP Requests already", numRequests);
             throw new IllegalStateException("There are too many outstanding 
HTTP requests with a total " + numRequests + " outstanding requests");
         }
 
         // create the request objects and replicate to all nodes
-        final String requestId = UUID.randomUUID().toString();
         final CompletionCallback completionCallback = clusterResponse -> 
onCompletedResponse(requestId);
         final Runnable responseConsumedCallback = () -> 
onResponseConsumed(requestId);
 
@@ -249,10 +256,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             responseMap.put(requestId, response);
         }
 
-        // Update headers to indicate the current revision so that we can
-        // prevent multiple users changing the flow at the same time
-        final Map<String, String> updatedHeaders = new HashMap<>(headers);
-        updatedHeaders.put(REQUEST_TRANSACTION_ID, 
UUID.randomUUID().toString());
+        logger.debug("For Request ID {}, response object is {}", requestId, 
response);
         // setRevision(updatedHeaders);
 
         // if mutable request, we have to do a two-phase commit where we ask 
each node to verify
@@ -262,6 +266,7 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
         // replicate the actual request.
         final boolean mutableRequest = isMutableRequest(method, uri.getPath());
         if (mutableRequest && performVerification) {
+            logger.debug("Performing verification (first phase of two-phase 
commit) for Request ID {}", requestId);
             performVerification(nodeIds, method, uri, entity, updatedHeaders, 
response);
             return response;
         }
@@ -309,13 +314,22 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
 
             @Override
             public void onCompletion(final NodeResponse nodeResponse) {
-                // Add the node response to our collection.
-                nodeResponses.add(nodeResponse);
+                // Add the node response to our collection. We later need to 
know whether or
+                // not this is the last node response, so we add the response 
and then check
+                // the size within a synchronized block to ensure that those 
two things happen
+                // atomically. Otherwise, we could have multiple threads 
checking the sizes of
+                // the sets at the same time, which could result in multiple 
threads performing
+                // the 'all nodes are complete' logic.
+                final boolean allNodesResponded;
+                synchronized (nodeResponses) {
+                    nodeResponses.add(nodeResponse);
+                    allNodesResponded = nodeResponses.size() == numNodes;
+                }
 
                 try {
                     // If we have all of the node responses, then we can 
verify the responses
                     // and if good replicate the original request to all of 
the nodes.
-                    if (nodeResponses.size() == numNodes) {
+                    if (allNodesResponded) {
                         // Check if we have any requests that do not have a 
150-Continue status code.
                         final long dissentingCount = 
nodeResponses.stream().filter(p -> p.getStatus() != 
NODE_CONTINUE_STATUS_CODE).count();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
index 335c0ef..7c99c93 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
@@ -512,7 +512,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
             clusterMetadata = (ClusterMetadata) 
clusterMetadataUnmarshaller.unmarshal(new 
ByteArrayInputStream(clusterInfoBytes));
         }
 
-        final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, 
templateBytes, snippetBytes);
+        final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, 
snippetBytes);
         dataFlow.setAutoStartProcessors(autoStart);
 
         return new ClusterDataFlow(dataFlow, clusterMetadata == null ? null : 
clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes);
@@ -543,11 +543,9 @@ public class DataFlowDaoImpl implements DataFlowDao {
             final DataFlow dataFlow = clusterDataFlow.getDataFlow();
             if (dataFlow == null) {
                 writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes());
-                writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]);
                 writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]);
             } else {
                 writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
-                writeTarEntry(tarOut, TEMPLATES_FILENAME, 
dataFlow.getTemplates());
                 writeTarEntry(tarOut, SNIPPETS_FILENAME, 
dataFlow.getSnippets());
             }
             writeTarEntry(tarOut, CONTROLLER_SERVICES_FILENAME, 
clusterDataFlow.getControllerServices());

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 7c2eabf..59d582b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -1957,10 +1957,10 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             // disconnect problematic nodes
             if (!problematicNodeResponses.isEmpty()) {
                 if (problematicNodeResponses.size() < nodeResponses.size()) {
-                    logger.warn(String.format("The following nodes failed to 
process URI '%s'.  Requesting each node to disconnect from cluster: ", uriPath, 
problematicNodeResponses));
-                    disconnectNodes(problematicNodeResponses, "Failed to 
process URI " + uriPath);
+                    logger.warn(String.format("The following nodes failed to 
process URI %s '%s'.  Requesting each node to disconnect from cluster: ", 
uriPath, problematicNodeResponses));
+                    disconnectNodes(problematicNodeResponses, "Failed to 
process URI " + method + " " + uriPath);
                 } else {
-                    logger.warn("All nodes failed to process URI {}. As a 
result, no node will be disconnected from cluster", uriPath);
+                    logger.warn("All nodes failed to process URI {} {}. As a 
result, no node will be disconnected from cluster", method, uriPath);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
index e526ea3..34189ac 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImplTest.java
@@ -145,7 +145,7 @@ public class DataFlowManagementServiceImplTest {
     public void testLoadFlowSingleNode() throws Exception {
         String flowStr = "<rootGroup />";
         byte[] flowBytes = flowStr.getBytes();
-        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
 
         NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 
apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
         service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));
@@ -165,7 +165,7 @@ public class DataFlowManagementServiceImplTest {
     public void testLoadFlowWithSameNodeIds() throws Exception {
 
         String flowStr = "<rootGroup />";
-        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
 
         NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", 
apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
         NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", 
apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
@@ -193,7 +193,7 @@ public class DataFlowManagementServiceImplTest {
 
         String flowStr = "<rootGroup />";
         byte[] flowBytes = flowStr.getBytes();
-        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
 
         NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", 
apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false);
         NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", 
apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
@@ -214,7 +214,7 @@ public class DataFlowManagementServiceImplTest {
     public void testLoadFlowWithConstantNodeIdChanging() throws Exception {
         String flowStr = "<rootGroup />";
         byte[] flowBytes = flowStr.getBytes();
-        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
 
         NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", 
apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false);
         NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", 
apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
@@ -236,7 +236,7 @@ public class DataFlowManagementServiceImplTest {
     public void testLoadFlowWithConstantNodeIdChangingWithRetrievalDelay() 
throws Exception {
 
         String flowStr = "<rootGroup />";
-        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
 
         NodeIdentifier nodeId1 = new NodeIdentifier("1", "localhost", 
apiDummyPort, "localhost", socketPort + 1, "localhost", 1234, false);
         NodeIdentifier nodeId2 = new NodeIdentifier("2", "localhost", 
apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
@@ -259,7 +259,7 @@ public class DataFlowManagementServiceImplTest {
     public void testStopRequestedWhileRetrieving() throws Exception {
 
         String flowStr = "<rootGroup />";
-        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
         Set<NodeIdentifier> nodeIds = new HashSet<>();
         for (int i = 0; i < 1000; i++) {
             nodeIds.add(new NodeIdentifier("1", "localhost", apiDummyPort, 
"localhost", socketPort + 1, "localhost", 1234, false));
@@ -289,7 +289,7 @@ public class DataFlowManagementServiceImplTest {
 
         String flowStr = "<rootGroup />";
         byte[] flowBytes = flowStr.getBytes();
-        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0], new byte[0])));
+        listener.addHandler(new FlowRequestProtocolHandler(new 
StandardDataFlow(flowStr.getBytes("UTF-8"), new byte[0])));
         NodeIdentifier nodeId = new NodeIdentifier("1", "localhost", 
apiDummyPort, "localhost", socketPort, "localhost", 1234, false);
 
         service.setNodeIds(new HashSet<>(Arrays.asList(nodeId)));

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
index 57c1c30..312e3b0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/protocol/DataFlow.java
@@ -24,11 +24,6 @@ public interface DataFlow {
     public byte[] getFlow();
 
     /**
-     * @return the raw byte array of the templates
-     */
-    public byte[] getTemplates();
-
-    /**
      * @return the raw byte array of the snippets
      */
     public byte[] getSnippets();

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java
new file mode 100644
index 0000000..b9fd0cb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.authorization.AccessDeniedException;
+import org.apache.nifi.authorization.AuthorizationRequest;
+import org.apache.nifi.authorization.AuthorizationResult;
+import org.apache.nifi.authorization.AuthorizationResult.Result;
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.RequestAction;
+import org.apache.nifi.authorization.Resource;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.authorization.resource.ResourceFactory;
+import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.LabelDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.dto.TemplateDTO;
+
+public class Template implements Authorizable {
+
+    private final TemplateDTO dto;
+    private volatile ProcessGroup processGroup;
+
+    public Template(final TemplateDTO dto) {
+        this.dto = dto;
+    }
+
+    public String getIdentifier() {
+        return dto.getId();
+    }
+
+    /**
+     * Returns a TemplateDTO object that describes the contents of this 
Template
+     *
+     * @return template dto
+     */
+    public TemplateDTO getDetails() {
+        return dto;
+    }
+
+    public void setProcessGroup(final ProcessGroup group) {
+        this.processGroup = group;
+    }
+
+    public ProcessGroup getProcessGroup() {
+        return processGroup;
+    }
+
+
+    @Override
+    public Authorizable getParentAuthorizable() {
+        return null;
+    }
+
+    @Override
+    public Resource getResource() {
+        return ResourceFactory.getComponentResource(ResourceType.Template, 
dto.getId(), dto.getName());
+    }
+
+    private Set<Authorizable> getAuthorizableComponents() {
+        return getAuthorizableComponents(processGroup);
+    }
+
+    private Set<Authorizable> getAuthorizableComponents(final ProcessGroup 
processGroup) {
+        final Set<Authorizable> authComponents = new HashSet<>();
+        final FlowSnippetDTO snippet = dto.getSnippet();
+
+        authComponents.add(processGroup);
+
+        // If there is any component in the DTO that still exists in the flow, 
check its authorizations
+        for (final ConnectionDTO connectionDto : snippet.getConnections()) {
+            final Connection connection = 
processGroup.getConnection(connectionDto.getId());
+            if (connection != null) {
+                authComponents.add(connection);
+            }
+        }
+
+        // TODO: Authorize Controller Services
+        for (final ControllerServiceDTO service : 
snippet.getControllerServices()) {
+        }
+
+        for (final LabelDTO labelDto : snippet.getLabels()) {
+            final Label label = processGroup.getLabel(labelDto.getId());
+            if (label != null) {
+                authComponents.add(label);
+            }
+        }
+
+        for (final ProcessorDTO processorDto : snippet.getProcessors()) {
+            final ProcessorNode procNode = 
processGroup.getProcessor(processorDto.getId());
+            if (procNode != null) {
+                authComponents.add(procNode);
+            }
+        }
+
+        for (final RemoteProcessGroupDTO groupDto : 
snippet.getRemoteProcessGroups()) {
+            final RemoteProcessGroup rpg = 
processGroup.getRemoteProcessGroup(groupDto.getId());
+            if (rpg != null) {
+                authComponents.add(rpg);
+            }
+        }
+
+        for (final ProcessGroupDTO groupDto : snippet.getProcessGroups()) {
+            final ProcessGroup group = 
processGroup.getProcessGroup(groupDto.getId());
+            if (group != null) {
+                authComponents.addAll(getAuthorizableComponents(processGroup));
+            }
+        }
+
+        return authComponents;
+    }
+
+    @Override
+    public void authorize(final Authorizer authorizer, final RequestAction 
action) throws AccessDeniedException {
+        final AuthorizationResult result = checkAuthorization(authorizer, 
action, true);
+        if (Result.Denied.equals(result)) {
+            final String explanation = result.getExplanation() == null ? 
"Access is denied" : result.getExplanation();
+            throw new AccessDeniedException(explanation);
+        }
+    }
+
+    @Override
+    public AuthorizationResult checkAuthorization(final Authorizer authorizer, 
final RequestAction action) {
+        return checkAuthorization(authorizer, action, false);
+    }
+
+    private AuthorizationResult checkAuthorization(final Authorizer 
authorizer, final RequestAction action, final boolean accessAttempt) {
+        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+
+        // TODO - include user details context
+
+        // build the request
+        final AuthorizationRequest request = new AuthorizationRequest.Builder()
+            .identity(user.getIdentity())
+            .anonymous(user.isAnonymous())
+            .accessAttempt(accessAttempt)
+            .action(action)
+            .resource(getResource())
+            .build();
+
+        // perform the authorization
+        final AuthorizationResult result = authorizer.authorize(request);
+
+        // verify the results
+        if (Result.ResourceNotFound.equals(result.getResult())) {
+            for (final Authorizable child : getAuthorizableComponents()) {
+                final AuthorizationResult childResult = 
child.checkAuthorization(authorizer, action);
+                if (Result.Denied.equals(childResult)) {
+                    return childResult;
+                }
+            }
+
+            return AuthorizationResult.denied();
+        } else {
+            return result;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "Template[id=" + getIdentifier() + ", Name=" + dto.getName() + 
"]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index e1f66ee..c3a4c8e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -28,6 +28,7 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.Snippet;
+import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flowfile.FlowFile;
@@ -768,4 +769,45 @@ public interface ProcessGroup extends Authorizable {
      * @throws IllegalStateException if the move is not valid at this time
      */
     void verifyCanMove(Snippet snippet, ProcessGroup newProcessGroup);
+
+    /**
+     * Adds the given template to this Process Group
+     *
+     * @param template the template to add
+     */
+    void addTemplate(Template template);
+
+    /**
+     * Removes the given template from the Process Group
+     *
+     * @param template the template to remove
+     */
+    void removeTemplate(Template template);
+
+    /**
+     * Returns the template with the given ID
+     *
+     * @param id the ID of the template
+     * @return the template with the given ID or <code>null</code> if no 
template
+     *         exists in this Process Group with the given ID
+     */
+    Template getTemplate(String id);
+
+    /**
+     * @param id of the template
+     * @return the Template with the given ID, if it exists as a child or
+     *         descendant of this ProcessGroup. This performs a recursive 
search of all
+     *         descendant ProcessGroups
+     */
+    Template findTemplate(String id);
+
+    /**
+     * @return a Set of all Templates that belong to this Process Group
+     */
+    Set<Template> getTemplates();
+
+    /**
+     * @return a Set of all Templates that belong to this Process Group and 
any descendant Process Groups
+     */
+    Set<Template> findAllTemplates();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 4243712..73d36a5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -82,8 +82,8 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.connectable.StandardConnection;
-import org.apache.nifi.controller.cluster.Heartbeater;
 import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
+import org.apache.nifi.controller.cluster.Heartbeater;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
@@ -220,7 +220,6 @@ import org.apache.nifi.web.api.dto.RelationshipDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
-import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
@@ -257,7 +256,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final ProvenanceEventRepository provenanceEventRepository;
     private final VolatileBulletinRepository bulletinRepository;
     private final StandardProcessScheduler processScheduler;
-    private final TemplateManager templateManager;
     private final SnippetManager snippetManager;
     private final long gracefulShutdownSeconds;
     private final ExtensionManager extensionManager;
@@ -478,11 +476,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         this.configuredForClustering = configuredForClustering;
         this.heartbeatDelaySeconds = (int) 
FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), 
TimeUnit.SECONDS);
-        try {
-            this.templateManager = new 
TemplateManager(properties.getTemplateDirectory());
-        } catch (final IOException e) {
-            throw new RuntimeException(e);
-        }
 
         this.snippetManager = new SnippetManager();
 
@@ -1478,72 +1471,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
-    //
-    // Template access
-    //
-    /**
-     * Adds a template to this controller. The contents of this template must 
be part of the current flow. This is going create a template based on a snippet 
of this flow.
-     *
-     * @param dto template
-     * @return a copy of the given DTO
-     * @throws IOException if an I/O error occurs when persisting the Template
-     * @throws NullPointerException if the DTO is null
-     * @throws IllegalArgumentException if does not contain all required 
information, such as the template name or a processor's configuration element
-     */
-    public Template addTemplate(final TemplateDTO dto) throws IOException {
-        return templateManager.addTemplate(dto);
-    }
-
-    /**
-     * Removes all templates from this controller
-     *
-     * @throws IOException ioe
-     */
-    public void clearTemplates() throws IOException {
-        templateManager.clear();
-    }
-
-    /**
-     * Imports the specified template into this controller. The contents of 
this template may have come from another NiFi instance.
-     *
-     * @param dto dto
-     * @return template
-     * @throws IOException ioe
-     */
-    public Template importTemplate(final TemplateDTO dto) throws IOException {
-        return templateManager.importTemplate(dto);
-    }
-
-    /**
-     * @param id identifier
-     * @return the template with the given ID, or <code>null</code> if no 
template exists with the given ID
-     */
-    public Template getTemplate(final String id) {
-        return templateManager.getTemplate(id);
-    }
-
-    public TemplateManager getTemplateManager() {
-        return templateManager;
-    }
-
-    /**
-     * @return all templates that this controller knows about
-     */
-    public Collection<Template> getTemplates() {
-        return templateManager.getTemplates();
-    }
-
-    /**
-     * Removes the template with the given ID.
-     *
-     * @param id the ID of the template to remove
-     * @throws NullPointerException if the argument is null
-     * @throws IllegalStateException if no template exists with the given ID
-     * @throws IOException if template could not be removed
-     */
-    public void removeTemplate(final String id) throws IOException, 
IllegalStateException {
-        templateManager.removeTemplate(id);
-    }
 
     private Position toPosition(final PositionDTO dto) {
         return new Position(dto.getX(), dto.getY());

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index f8b3262..296659c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -16,20 +16,25 @@
  */
 package org.apache.nifi.controller;
 
+import java.io.BufferedInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
@@ -76,13 +81,16 @@ import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.lifecycle.LifeCycleStartException;
 import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.nar.NarClassLoaders;
 import org.apache.nifi.persistence.FlowConfigurationDAO;
 import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO;
+import org.apache.nifi.persistence.TemplateDeserializer;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.services.FlowService;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -522,16 +530,14 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             final byte[] flowBytes = baos.toByteArray();
             baos.reset();
 
-            final byte[] templateBytes = 
controller.getTemplateManager().export();
             final byte[] snippetBytes = 
controller.getSnippetManager().export();
 
             // create the response
             final FlowResponseMessage response = new FlowResponseMessage();
 
-            response.setDataFlow(new StandardDataFlow(flowBytes, 
templateBytes, snippetBytes));
+            response.setDataFlow(new StandardDataFlow(flowBytes, 
snippetBytes));
 
             return response;
-
         } catch (final Exception ex) {
             throw new ProtocolException("Failed serializing flow controller 
state for flow request due to: " + ex, ex);
         } finally {
@@ -606,27 +612,21 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
     private void loadFromBytes(final DataFlow proposedFlow, final boolean 
allowEmptyFlow)
             throws IOException, FlowSerializationException, 
FlowSynchronizationException, UninheritableFlowException {
         logger.trace("Loading flow from bytes");
-        final TemplateManager templateManager = 
controller.getTemplateManager();
-        templateManager.loadTemplates();
-        logger.trace("Finished loading templates");
 
         // resolve the given flow (null means load flow from disk)
         final DataFlow actualProposedFlow;
         final byte[] flowBytes;
-        final byte[] templateBytes;
         if (proposedFlow == null) {
             final ByteArrayOutputStream flowOnDisk = new 
ByteArrayOutputStream();
             copyCurrentFlow(flowOnDisk);
             flowBytes = flowOnDisk.toByteArray();
-            templateBytes = templateManager.export();
             logger.debug("Loaded Flow from bytes");
         } else {
             flowBytes = proposedFlow.getFlow();
-            templateBytes = proposedFlow.getTemplates();
             logger.debug("Loaded flow from proposed flow");
         }
 
-        actualProposedFlow = new StandardDataFlow(flowBytes, templateBytes, 
null);
+        actualProposedFlow = new StandardDataFlow(flowBytes, null);
 
         if (firstControllerInitialization) {
             // load the controller services
@@ -642,6 +642,17 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             throw new FlowSynchronizationException("Failed to load flow 
because unable to connect to cluster and local flow is empty");
         }
 
+        final List<Template> templates = loadTemplates();
+        for (final Template template : templates) {
+            final Template existing = 
rootGroup.getTemplate(template.getIdentifier());
+            if (existing == null) {
+                logger.info("Imported Template '{}' to Root Group", 
template.getDetails().getName());
+                rootGroup.addTemplate(template);
+            } else {
+                logger.info("Template '{}' was already present in Root Group 
so will not import from file", template.getDetails().getName());
+            }
+        }
+
         // lazy initialization of controller tasks and flow
         if (firstControllerInitialization) {
             logger.debug("First controller initialization. Loading reporting 
tasks and initializing controller.");
@@ -653,6 +664,56 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
         }
     }
 
+    /**
+     * In NiFi 0.x, templates were stored in a templates directory as separate 
files. They are
+     * now stored in the flow itself. If there already are templates in that 
directory, though,
+     * we want to restore them.
+     *
+     * @return the templates found in the templates directory
+     * @throws IOException if unable to read from the file system
+     */
+    public List<Template> loadTemplates() throws IOException {
+        final NiFiProperties properties = NiFiProperties.getInstance();
+        final Path templatePath = properties.getTemplateDirectory();
+
+        final File[] files = templatePath.toFile().listFiles(pathname -> {
+            final String lowerName = pathname.getName().toLowerCase();
+            return lowerName.endsWith(".template") || 
lowerName.endsWith(".xml");
+        });
+
+        if (files == null) {
+            return Collections.emptyList();
+        }
+
+        final List<Template> templates = new ArrayList<>();
+        for (final File file : files) {
+            try (final FileInputStream fis = new FileInputStream(file);
+                final BufferedInputStream bis = new BufferedInputStream(fis)) {
+
+                final TemplateDTO templateDto;
+                try {
+                    templateDto = TemplateDeserializer.deserialize(bis);
+                } catch (final Exception e) {
+                    logger.error("Unable to interpret " + file + " as a 
Template. Skipping file.");
+                    continue;
+                }
+
+                if (templateDto.getId() == null) {
+                    // If there is no ID assigned, we need to assign one. We 
do this by generating
+                    // an ID from the name. This is because we know that 
Template Names are unique
+                    // and are consistent across all nodes in the cluster.
+                    final String uuid = 
UUID.nameUUIDFromBytes(templateDto.getName().getBytes(StandardCharsets.UTF_8)).toString();
+                    templateDto.setId(uuid);
+                }
+
+                final Template template = new Template(templateDto);
+                templates.add(template);
+            }
+        }
+
+        return templates;
+    }
+
     private ConnectionResponse connect(final boolean retryOnCommsFailure, 
final boolean retryIndefinitely) throws ConnectionException {
         writeLock.lock();
         try {
@@ -759,7 +820,6 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             // start the processors as indicated by the dataflow
             controller.onFlowInitialized(dataFlow.isAutoStartProcessors());
 
-            loadTemplates(dataFlow.getTemplates());
             loadSnippets(dataFlow.getSnippets());
             controller.startHeartbeating();
         } catch (final UninheritableFlowException ufe) {
@@ -794,17 +854,6 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
         }
     }
 
-    public void loadTemplates(final byte[] bytes) throws IOException {
-        if (bytes.length == 0) {
-            return;
-        }
-
-        controller.clearTemplates();
-
-        for (final Template template : TemplateManager.parseBytes(bytes)) {
-            controller.addTemplate(template.getDetails());
-        }
-    }
 
     public void loadSnippets(final byte[] bytes) throws IOException {
         if (bytes.length == 0) {
@@ -828,6 +877,9 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
         @Override
         public void run() {
+            final ClassLoader currentCl = 
Thread.currentThread().getContextClassLoader();
+            final ClassLoader cl = NarClassLoaders.getFrameworkClassLoader();
+            Thread.currentThread().setContextClassLoader(cl);
             try {
                 //Hang onto the SaveHolder here rather than setting it to null 
because if the save fails we will try again
                 final SaveHolder holder = 
StandardFlowService.this.saveHolder.get();
@@ -864,6 +916,10 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                 // record the failed save as a bulletin
                 final Bulletin saveFailureBulletin = 
BulletinFactory.createBulletin(EVENT_CATEGORY, LogLevel.ERROR.name(), "Unable 
to save flow controller configuration.");
                 
controller.getBulletinRepository().addBulletin(saveFailureBulletin);
+            } finally {
+                if (currentCl != null) {
+                    Thread.currentThread().setContextClassLoader(currentCl);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 97ed3a9..fe3e59b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -24,7 +24,6 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -98,6 +97,7 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
@@ -138,6 +138,8 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
     @Override
     public void sync(final FlowController controller, final DataFlow 
proposedFlow, final StringEncryptor encryptor)
             throws FlowSerializationException, UninheritableFlowException, 
FlowSynchronizationException {
+        // TODO - Include templates
+
         // handle corner cases involving no proposed flow
         if (proposedFlow == null) {
             if (controller.getGroup(controller.getRootGroupId()).isEmpty()) {
@@ -204,14 +206,10 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             throw new FlowSerializationException(e);
         }
 
-        logger.trace("Exporting templates from controller");
-        final byte[] existingTemplates = 
controller.getTemplateManager().export();
         logger.trace("Exporting snippets from controller");
         final byte[] existingSnippets = 
controller.getSnippetManager().export();
 
-        final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, 
existingTemplates, existingSnippets);
-
-        final boolean existingTemplatesEmpty = existingTemplates == null || 
existingTemplates.length == 0;
+        final DataFlow existingDataFlow = new StandardDataFlow(existingFlow, 
existingSnippets);
 
         // check that the proposed flow is inheritable by the controller
         try {
@@ -222,13 +220,6 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                     throw new UninheritableFlowException("Proposed 
configuration is not inheritable by the flow controller because of flow 
differences: " + problemInheriting);
                 }
             }
-            if (!existingTemplatesEmpty) {
-                logger.trace("Checking template inheritability");
-                final String problemInheriting = 
checkTemplateInheritability(existingDataFlow, proposedFlow);
-                if (problemInheriting != null) {
-                    throw new UninheritableFlowException("Proposed 
configuration is not inheritable by the flow controller because of flow 
differences: " + problemInheriting);
-                }
-            }
         } catch (final FingerprintException fe) {
             throw new FlowSerializationException("Failed to generate flow 
fingerprints", fe);
         }
@@ -301,16 +292,6 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                 }
             }
 
-            logger.trace("Synching templates");
-            if ((existingTemplates == null || existingTemplates.length == 0) 
&& proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 
0) {
-                // need to load templates
-                final TemplateManager templateManager = 
controller.getTemplateManager();
-                final List<Template> proposedTemplateList = 
TemplateManager.parseBytes(proposedFlow.getTemplates());
-                for (final Template template : proposedTemplateList) {
-                    templateManager.addTemplate(template.getDetails());
-                }
-            }
-
             // clear the snippets that are currently in memory
             logger.trace("Clearing existing snippets");
             final SnippetManager snippetManager = 
controller.getSnippetManager();
@@ -711,6 +692,17 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             updateControllerService(controller, serviceNodeElement, encryptor);
         }
 
+        // Replace the templates with those from the proposed flow
+        final List<Element> templateNodeList = 
getChildrenByTagName(processGroupElement, "template");
+        for (final Template template : processGroup.getTemplates()) {
+            processGroup.removeTemplate(template);
+        }
+        for (final Element templateElement : templateNodeList) {
+            final TemplateDTO templateDto = 
TemplateUtils.parseDto(templateElement);
+            final Template template = new Template(templateDto);
+            processGroup.addTemplate(template);
+        }
+
         return processGroup;
     }
 
@@ -1052,6 +1044,13 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             ControllerServiceLoader.loadControllerServices(serviceNodeList, 
controller, processGroup, encryptor, controller.getBulletinRepository(), 
autoResumeState);
         }
 
+        final List<Element> templateNodeList = 
getChildrenByTagName(processGroupElement, "template");
+        for (final Element templateNode : templateNodeList) {
+            final TemplateDTO templateDTO = 
TemplateUtils.parseDto(templateNode);
+            final Template template = new Template(templateDTO);
+            processGroup.addTemplate(template);
+        }
+
         return processGroup;
     }
 
@@ -1103,60 +1102,6 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         return null;
     }
 
-    /**
-     * Returns true if the given controller can inherit the proposed flow 
without orphaning flow files.
-     *
-     * @param existingFlow flow
-     * @param proposedFlow the flow to inherit
-     *
-     * @return null if the controller can inherit the specified flow, an 
explanation of why it cannot be inherited otherwise
-     *
-     * @throws FingerprintException if flow fingerprints could not be generated
-     */
-    public String checkTemplateInheritability(final DataFlow existingFlow, 
final DataFlow proposedFlow) throws FingerprintException {
-        if (existingFlow == null) {
-            return null;  // no existing flow, so equivalent to proposed flow
-        }
-
-        // check if the Flow is inheritable
-        final FingerprintFactory fingerprintFactory = new 
FingerprintFactory(encryptor);
-        // check if the Templates are inheritable
-        final byte[] existingTemplateBytes = existingFlow.getTemplates();
-        if (existingTemplateBytes == null || existingTemplateBytes.length == 
0) {
-            return null;
-        }
-
-        final List<Template> existingTemplates = 
TemplateManager.parseBytes(existingTemplateBytes);
-        final String existingTemplateFingerprint = 
fingerprintFactory.createFingerprint(existingTemplates);
-        if (existingTemplateFingerprint.trim().isEmpty()) {
-            return null;
-        }
-
-        final byte[] proposedTemplateBytes = proposedFlow.getTemplates();
-        if (proposedTemplateBytes == null || proposedTemplateBytes.length == 
0) {
-            return "Proposed Flow does not contain any Templates but Current 
Flow does";
-        }
-
-        final List<Template> proposedTemplates = 
TemplateManager.parseBytes(proposedTemplateBytes);
-        final String proposedTemplateFingerprint = 
fingerprintFactory.createFingerprint(proposedTemplates);
-        if (proposedTemplateFingerprint.trim().isEmpty()) {
-            return "Proposed Flow does not contain any Templates but Current 
Flow does";
-        }
-
-        try {
-            final String existingTemplateMd5 = 
fingerprintFactory.md5Hash(existingTemplateFingerprint);
-            final String proposedTemplateMd5 = 
fingerprintFactory.md5Hash(proposedTemplateFingerprint);
-
-            if (!existingTemplateMd5.equals(proposedTemplateMd5)) {
-                return findFirstDiscrepancy(existingTemplateFingerprint, 
proposedTemplateFingerprint, "Templates");
-            }
-        } catch (final NoSuchAlgorithmException e) {
-            throw new FingerprintException(e);
-        }
-
-        return null;
-    }
-
     private String findFirstDiscrepancy(final String existing, final String 
proposed, final String comparisonDescription) {
         final int shortestFileLength = Math.min(existing.length(), 
proposed.length());
         for (int i = 0; i < shortestFileLength; i++) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/270944ec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java
deleted file mode 100644
index c2e8e04..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/Template.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import org.apache.nifi.web.api.dto.TemplateDTO;
-
-public class Template {
-
-    private final TemplateDTO dto;
-
-    public Template(final TemplateDTO dto) {
-        this.dto = dto;
-    }
-
-    /**
-     * Returns a TemplateDTO object that describes the contents of this 
Template
-     *
-     * @return template dto
-     */
-    public TemplateDTO getDetails() {
-        return dto;
-    }
-}

Reply via email to