NIFI-250: Merge develop into NIFI-250

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c28d9f57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c28d9f57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c28d9f57

Branch: refs/heads/NIFI-250
Commit: c28d9f57287b5b95d6451a5365dbc2b3d4552422
Parents: 5a6723c 761e64a
Author: Matt Gilman <[email protected]>
Authored: Tue Mar 17 08:16:36 2015 -0400
Committer: Matt Gilman <[email protected]>
Committed: Tue Mar 17 08:16:36 2015 -0400

----------------------------------------------------------------------
 .../nifi-cluster-protocol/.gitignore            |    1 -
 .../nifi-cluster-protocol/pom.xml               |   67 -
 .../protocol/ClusterManagerProtocolSender.java  |   69 -
 .../cluster/protocol/ConnectionRequest.java     |   44 -
 .../cluster/protocol/ConnectionResponse.java    |  141 -
 .../apache/nifi/cluster/protocol/Heartbeat.java |   68 -
 .../nifi/cluster/protocol/NodeBulletins.java    |   44 -
 .../nifi/cluster/protocol/NodeIdentifier.java   |  172 -
 .../cluster/protocol/NodeProtocolSender.java    |   73 -
 .../nifi/cluster/protocol/ProtocolContext.java  |   39 -
 .../cluster/protocol/ProtocolException.java     |   40 -
 .../nifi/cluster/protocol/ProtocolHandler.java  |   44 -
 .../nifi/cluster/protocol/ProtocolListener.java |   72 -
 .../protocol/ProtocolMessageMarshaller.java     |   38 -
 .../protocol/ProtocolMessageUnmarshaller.java   |   38 -
 .../nifi/cluster/protocol/StandardDataFlow.java |  105 -
 .../UnknownServiceAddressException.java         |   39 -
 .../impl/ClusterManagerProtocolSenderImpl.java  |  245 -
 .../ClusterManagerProtocolSenderListener.java   |  118 -
 .../protocol/impl/ClusterServiceDiscovery.java  |  181 -
 .../protocol/impl/ClusterServiceLocator.java    |  229 -
 .../impl/ClusterServicesBroadcaster.java        |  182 -
 .../protocol/impl/CopyingInputStream.java       |   77 -
 .../impl/MulticastProtocolListener.java         |  204 -
 .../protocol/impl/NodeProtocolSenderImpl.java   |  171 -
 .../impl/NodeProtocolSenderListener.java        |  115 -
 .../protocol/impl/SocketProtocolListener.java   |  205 -
 .../protocol/jaxb/JaxbProtocolContext.java      |  148 -
 .../jaxb/message/AdaptedConnectionRequest.java  |   40 -
 .../jaxb/message/AdaptedConnectionResponse.java |  109 -
 .../protocol/jaxb/message/AdaptedCounter.java   |   56 -
 .../protocol/jaxb/message/AdaptedDataFlow.java  |   64 -
 .../protocol/jaxb/message/AdaptedHeartbeat.java |   66 -
 .../jaxb/message/AdaptedNodeBulletins.java      |   50 -
 .../jaxb/message/AdaptedNodeIdentifier.java     |   76 -
 .../jaxb/message/ConnectionRequestAdapter.java  |   41 -
 .../jaxb/message/ConnectionResponseAdapter.java |   55 -
 .../protocol/jaxb/message/DataFlowAdapter.java  |   50 -
 .../protocol/jaxb/message/HeartbeatAdapter.java |   54 -
 .../jaxb/message/JaxbProtocolUtils.java         |   42 -
 .../jaxb/message/NodeBulletinsAdapter.java      |   48 -
 .../jaxb/message/NodeIdentifierAdapter.java     |   51 -
 .../protocol/jaxb/message/ObjectFactory.java    |  104 -
 .../message/ConnectionRequestMessage.java       |   46 -
 .../message/ConnectionResponseMessage.java      |   66 -
 .../ControllerStartupFailureMessage.java        |   49 -
 .../protocol/message/DisconnectMessage.java     |   55 -
 .../protocol/message/ExceptionMessage.java      |   44 -
 .../protocol/message/FlowRequestMessage.java    |   46 -
 .../protocol/message/FlowResponseMessage.java   |   44 -
 .../protocol/message/HeartbeatMessage.java      |   43 -
 .../message/MulticastProtocolMessage.java       |   66 -
 .../protocol/message/NodeBulletinsMessage.java  |   43 -
 .../cluster/protocol/message/PingMessage.java   |   55 -
 .../message/PrimaryRoleAssignmentMessage.java   |   56 -
 .../protocol/message/ProtocolMessage.java       |   61 -
 .../message/ReconnectionFailureMessage.java     |   45 -
 .../message/ReconnectionRequestMessage.java     |   94 -
 .../message/ReconnectionResponseMessage.java    |   32 -
 .../message/ServiceBroadcastMessage.java        |   64 -
 .../MulticastConfigurationFactoryBean.java      |   60 -
 .../ServerSocketConfigurationFactoryBean.java   |   65 -
 .../spring/SocketConfigurationFactoryBean.java  |   66 -
 .../resources/nifi-cluster-protocol-context.xml |  110 -
 .../ClusterManagerProtocolSenderImplTest.java   |  134 -
 .../impl/ClusterServiceDiscoveryTest.java       |  135 -
 .../impl/ClusterServiceLocatorTest.java         |  121 -
 .../impl/ClusterServicesBroadcasterTest.java    |  133 -
 .../impl/MulticastProtocolListenerTest.java     |  171 -
 .../impl/NodeProtocolSenderImplTest.java        |  203 -
 .../impl/testutils/DelayedProtocolHandler.java  |   57 -
 .../testutils/ReflexiveProtocolHandler.java     |   47 -
 .../nifi-framework/nifi-cluster-web/.gitignore  |    1 -
 .../nifi-framework/nifi-cluster-web/pom.xml     |   44 -
 .../nifi/cluster/context/ClusterContext.java    |   59 -
 .../cluster/context/ClusterContextImpl.java     |   69 -
 .../context/ClusterContextThreadLocal.java      |   42 -
 .../nifi-framework/nifi-cluster/.gitignore      |    1 -
 .../nifi-framework/nifi-cluster/pom.xml         |  134 -
 .../cluster/client/MulticastTestClient.java     |  151 -
 .../org/apache/nifi/cluster/event/Event.java    |  122 -
 .../apache/nifi/cluster/event/EventManager.java |   65 -
 .../cluster/event/impl/EventManagerImpl.java    |  143 -
 .../cluster/firewall/ClusterNodeFirewall.java   |   35 -
 .../impl/FileBasedClusterNodeFirewall.java      |  207 -
 .../nifi/cluster/flow/ClusterDataFlow.java      |   56 -
 .../apache/nifi/cluster/flow/DaoException.java  |   40 -
 .../apache/nifi/cluster/flow/DataFlowDao.java   |   62 -
 .../cluster/flow/DataFlowManagementService.java |  132 -
 .../nifi/cluster/flow/PersistedFlowState.java   |   37 -
 .../nifi/cluster/flow/StaleFlowException.java   |   42 -
 .../nifi/cluster/flow/impl/DataFlowDaoImpl.java |  615 ---
 .../impl/DataFlowManagementServiceImpl.java     |  413 --
 .../nifi/cluster/manager/ClusterManager.java    |  225 -
 .../cluster/manager/HttpClusterManager.java     |  169 -
 .../cluster/manager/HttpRequestReplicator.java  |   99 -
 .../cluster/manager/HttpResponseMapper.java     |   42 -
 .../nifi/cluster/manager/NodeResponse.java      |  329 --
 .../exception/BlockedByFirewallException.java   |   60 -
 .../manager/exception/ClusterException.java     |   40 -
 .../ConnectingNodeMutableRequestException.java  |   41 -
 ...DisconnectedNodeMutableRequestException.java |   41 -
 .../exception/IllegalClusterStateException.java |   41 -
 .../exception/IllegalNodeDeletionException.java |   41 -
 .../IllegalNodeDisconnectionException.java      |   42 -
 .../IllegalNodeReconnectionException.java       |   41 -
 .../IneligiblePrimaryNodeException.java         |   41 -
 .../exception/MutableRequestException.java      |   42 -
 .../exception/NoConnectedNodesException.java    |   41 -
 .../exception/NoResponseFromNodesException.java |   42 -
 .../exception/NodeDisconnectionException.java   |   41 -
 .../exception/NodeReconnectionException.java    |   40 -
 .../PrimaryRoleAssignmentException.java         |   41 -
 .../SafeModeMutableRequestException.java        |   41 -
 .../manager/exception/UnknownNodeException.java |   41 -
 .../exception/UriConstructionException.java     |   42 -
 .../manager/impl/ClusteredEventAccess.java      |  135 -
 .../manager/impl/ClusteredReportingContext.java |  165 -
 .../manager/impl/HttpRequestReplicatorImpl.java |  531 ---
 .../manager/impl/HttpResponseMapperImpl.java    |   85 -
 .../cluster/manager/impl/WebClusterManager.java | 4187 ------------------
 .../java/org/apache/nifi/cluster/node/Node.java |  252 --
 ...anagerProtocolServiceLocatorFactoryBean.java |  116 -
 ...FileBasedClusterNodeFirewallFactoryBean.java |   58 -
 .../spring/WebClusterManagerFactoryBean.java    |  141 -
 .../reporting/ClusteredReportingTaskNode.java   |   49 -
 .../resources/nifi-cluster-manager-context.xml  |  128 -
 .../event/impl/EventManagerImplTest.java        |  119 -
 .../impl/FileBasedClusterNodeFirewallTest.java  |   98 -
 .../impl/DataFlowManagementServiceImplTest.java |  343 --
 .../impl/HttpRequestReplicatorImplTest.java     |  368 --
 .../impl/HttpResponseMapperImplTest.java        |  126 -
 .../manager/impl/TestWebClusterManager.java     |   54 -
 .../cluster/manager/testutils/HttpRequest.java  |  239 -
 .../cluster/manager/testutils/HttpResponse.java |   93 -
 .../manager/testutils/HttpResponseAction.java   |   60 -
 .../cluster/manager/testutils/HttpServer.java   |  240 -
 .../ClusterManagerProtocolSenderImplTest.java   |  133 -
 .../impl/ClusterServiceLocatorTest.java         |  119 -
 .../impl/ClusterServicesBroadcasterTest.java    |  131 -
 .../impl/MulticastProtocolListenerTest.java     |  171 -
 .../impl/NodeProtocolSenderImplTest.java        |  201 -
 .../impl/SocketProtocolListenerTest.java        |  132 -
 .../testutils/DelayedProtocolHandler.java       |   57 -
 .../testutils/ReflexiveProtocolHandler.java     |   47 -
 .../src/test/resources/logback-test.xml         |   48 -
 .../apache/nifi/cluster/firewall/impl/empty.txt |    0
 .../apache/nifi/cluster/firewall/impl/ips.txt   |   12 -
 .../nifi-framework-cluster-protocol/.gitignore  |    1 +
 .../nifi-framework-cluster-protocol/pom.xml     |   67 +
 .../protocol/ClusterManagerProtocolSender.java  |   69 +
 .../cluster/protocol/ConnectionRequest.java     |   44 +
 .../cluster/protocol/ConnectionResponse.java    |  141 +
 .../apache/nifi/cluster/protocol/Heartbeat.java |   68 +
 .../nifi/cluster/protocol/NodeBulletins.java    |   44 +
 .../nifi/cluster/protocol/NodeIdentifier.java   |  172 +
 .../cluster/protocol/NodeProtocolSender.java    |   73 +
 .../nifi/cluster/protocol/ProtocolContext.java  |   39 +
 .../cluster/protocol/ProtocolException.java     |   40 +
 .../nifi/cluster/protocol/ProtocolHandler.java  |   44 +
 .../nifi/cluster/protocol/ProtocolListener.java |   72 +
 .../protocol/ProtocolMessageMarshaller.java     |   38 +
 .../protocol/ProtocolMessageUnmarshaller.java   |   38 +
 .../nifi/cluster/protocol/StandardDataFlow.java |  105 +
 .../UnknownServiceAddressException.java         |   39 +
 .../impl/ClusterManagerProtocolSenderImpl.java  |  245 +
 .../ClusterManagerProtocolSenderListener.java   |  118 +
 .../protocol/impl/ClusterServiceDiscovery.java  |  181 +
 .../protocol/impl/ClusterServiceLocator.java    |  229 +
 .../impl/ClusterServicesBroadcaster.java        |  182 +
 .../protocol/impl/CopyingInputStream.java       |   77 +
 .../impl/MulticastProtocolListener.java         |  204 +
 .../protocol/impl/NodeProtocolSenderImpl.java   |  171 +
 .../impl/NodeProtocolSenderListener.java        |  115 +
 .../protocol/impl/SocketProtocolListener.java   |  205 +
 .../protocol/jaxb/JaxbProtocolContext.java      |  148 +
 .../jaxb/message/AdaptedConnectionRequest.java  |   40 +
 .../jaxb/message/AdaptedConnectionResponse.java |  109 +
 .../protocol/jaxb/message/AdaptedCounter.java   |   56 +
 .../protocol/jaxb/message/AdaptedDataFlow.java  |   64 +
 .../protocol/jaxb/message/AdaptedHeartbeat.java |   66 +
 .../jaxb/message/AdaptedNodeBulletins.java      |   50 +
 .../jaxb/message/AdaptedNodeIdentifier.java     |   76 +
 .../jaxb/message/ConnectionRequestAdapter.java  |   41 +
 .../jaxb/message/ConnectionResponseAdapter.java |   55 +
 .../protocol/jaxb/message/DataFlowAdapter.java  |   50 +
 .../protocol/jaxb/message/HeartbeatAdapter.java |   54 +
 .../jaxb/message/JaxbProtocolUtils.java         |   42 +
 .../jaxb/message/NodeBulletinsAdapter.java      |   48 +
 .../jaxb/message/NodeIdentifierAdapter.java     |   51 +
 .../protocol/jaxb/message/ObjectFactory.java    |  104 +
 .../message/ConnectionRequestMessage.java       |   46 +
 .../message/ConnectionResponseMessage.java      |   66 +
 .../ControllerStartupFailureMessage.java        |   49 +
 .../protocol/message/DisconnectMessage.java     |   55 +
 .../protocol/message/ExceptionMessage.java      |   44 +
 .../protocol/message/FlowRequestMessage.java    |   46 +
 .../protocol/message/FlowResponseMessage.java   |   44 +
 .../protocol/message/HeartbeatMessage.java      |   43 +
 .../message/MulticastProtocolMessage.java       |   66 +
 .../protocol/message/NodeBulletinsMessage.java  |   43 +
 .../cluster/protocol/message/PingMessage.java   |   55 +
 .../message/PrimaryRoleAssignmentMessage.java   |   56 +
 .../protocol/message/ProtocolMessage.java       |   61 +
 .../message/ReconnectionFailureMessage.java     |   45 +
 .../message/ReconnectionRequestMessage.java     |   94 +
 .../message/ReconnectionResponseMessage.java    |   32 +
 .../message/ServiceBroadcastMessage.java        |   64 +
 .../MulticastConfigurationFactoryBean.java      |   60 +
 .../ServerSocketConfigurationFactoryBean.java   |   65 +
 .../spring/SocketConfigurationFactoryBean.java  |   66 +
 .../resources/nifi-cluster-protocol-context.xml |  110 +
 .../ClusterManagerProtocolSenderImplTest.java   |  134 +
 .../impl/ClusterServiceDiscoveryTest.java       |  135 +
 .../impl/ClusterServiceLocatorTest.java         |  121 +
 .../impl/ClusterServicesBroadcasterTest.java    |  133 +
 .../impl/MulticastProtocolListenerTest.java     |  171 +
 .../impl/NodeProtocolSenderImplTest.java        |  203 +
 .../impl/testutils/DelayedProtocolHandler.java  |   57 +
 .../testutils/ReflexiveProtocolHandler.java     |   47 +
 .../nifi-framework-cluster-web/.gitignore       |    1 +
 .../nifi-framework-cluster-web/pom.xml          |   44 +
 .../nifi/cluster/context/ClusterContext.java    |   59 +
 .../cluster/context/ClusterContextImpl.java     |   69 +
 .../context/ClusterContextThreadLocal.java      |   42 +
 .../ClusterAwareOptimisticLockingManager.java   |   96 +
 .../nifi-framework-cluster/.gitignore           |    1 +
 .../nifi-framework-cluster/pom.xml              |  148 +
 .../cluster/client/MulticastTestClient.java     |  151 +
 .../org/apache/nifi/cluster/event/Event.java    |  122 +
 .../apache/nifi/cluster/event/EventManager.java |   65 +
 .../cluster/event/impl/EventManagerImpl.java    |  143 +
 .../cluster/firewall/ClusterNodeFirewall.java   |   35 +
 .../impl/FileBasedClusterNodeFirewall.java      |  207 +
 .../nifi/cluster/flow/ClusterDataFlow.java      |   56 +
 .../apache/nifi/cluster/flow/DaoException.java  |   40 +
 .../apache/nifi/cluster/flow/DataFlowDao.java   |   62 +
 .../cluster/flow/DataFlowManagementService.java |  132 +
 .../nifi/cluster/flow/PersistedFlowState.java   |   37 +
 .../nifi/cluster/flow/StaleFlowException.java   |   42 +
 .../nifi/cluster/flow/impl/DataFlowDaoImpl.java |  615 +++
 .../impl/DataFlowManagementServiceImpl.java     |  413 ++
 .../nifi/cluster/manager/ClusterManager.java    |  225 +
 .../cluster/manager/HttpClusterManager.java     |  169 +
 .../cluster/manager/HttpRequestReplicator.java  |   99 +
 .../cluster/manager/HttpResponseMapper.java     |   42 +
 .../nifi/cluster/manager/NodeResponse.java      |  329 ++
 .../exception/BlockedByFirewallException.java   |   60 +
 .../manager/exception/ClusterException.java     |   40 +
 .../ConnectingNodeMutableRequestException.java  |   41 +
 ...DisconnectedNodeMutableRequestException.java |   41 +
 .../exception/IllegalClusterStateException.java |   41 +
 .../exception/IllegalNodeDeletionException.java |   41 +
 .../IllegalNodeDisconnectionException.java      |   42 +
 .../IllegalNodeReconnectionException.java       |   41 +
 .../IneligiblePrimaryNodeException.java         |   41 +
 .../exception/MutableRequestException.java      |   42 +
 .../exception/NoConnectedNodesException.java    |   41 +
 .../exception/NoResponseFromNodesException.java |   42 +
 .../exception/NodeDisconnectionException.java   |   41 +
 .../exception/NodeReconnectionException.java    |   40 +
 .../PrimaryRoleAssignmentException.java         |   41 +
 .../SafeModeMutableRequestException.java        |   41 +
 .../manager/exception/UnknownNodeException.java |   41 +
 .../exception/UriConstructionException.java     |   42 +
 .../manager/impl/ClusteredEventAccess.java      |  135 +
 .../manager/impl/ClusteredReportingContext.java |  165 +
 .../manager/impl/HttpRequestReplicatorImpl.java |  531 +++
 .../manager/impl/HttpResponseMapperImpl.java    |   85 +
 .../cluster/manager/impl/WebClusterManager.java | 4187 ++++++++++++++++++
 .../java/org/apache/nifi/cluster/node/Node.java |  252 ++
 ...anagerProtocolServiceLocatorFactoryBean.java |  116 +
 ...FileBasedClusterNodeFirewallFactoryBean.java |   58 +
 .../spring/WebClusterManagerFactoryBean.java    |  141 +
 .../reporting/ClusteredReportingTaskNode.java   |   49 +
 .../resources/nifi-cluster-manager-context.xml  |  128 +
 .../event/impl/EventManagerImplTest.java        |  119 +
 .../impl/FileBasedClusterNodeFirewallTest.java  |   98 +
 .../impl/DataFlowManagementServiceImplTest.java |  343 ++
 .../impl/HttpRequestReplicatorImplTest.java     |  368 ++
 .../impl/HttpResponseMapperImplTest.java        |  126 +
 .../manager/impl/TestWebClusterManager.java     |   54 +
 .../cluster/manager/testutils/HttpRequest.java  |  239 +
 .../cluster/manager/testutils/HttpResponse.java |   93 +
 .../manager/testutils/HttpResponseAction.java   |   60 +
 .../cluster/manager/testutils/HttpServer.java   |  240 +
 .../ClusterManagerProtocolSenderImplTest.java   |  133 +
 .../impl/ClusterServiceLocatorTest.java         |  119 +
 .../impl/ClusterServicesBroadcasterTest.java    |  131 +
 .../impl/MulticastProtocolListenerTest.java     |  171 +
 .../impl/NodeProtocolSenderImplTest.java        |  201 +
 .../impl/SocketProtocolListenerTest.java        |  132 +
 .../testutils/DelayedProtocolHandler.java       |   57 +
 .../testutils/ReflexiveProtocolHandler.java     |   47 +
 .../src/test/resources/logback-test.xml         |   48 +
 .../apache/nifi/cluster/firewall/impl/empty.txt |    0
 .../apache/nifi/cluster/firewall/impl/ips.txt   |   12 +
 .../nifi-framework/nifi-framework-core/pom.xml  |   19 +-
 .../nifi-framework/nifi-web/nifi-web-ui/pom.xml |   13 +
 .../nifi-framework/pom.xml                      |    6 +-
 .../nifi-standard-processors/pom.xml            |   86 +-
 nifi/pom.xml                                    |   36 +-
 302 files changed, 19060 insertions(+), 18844 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
index 0000000,18e4ba4..e08a9bf
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
@@@ -1,0 -1,48 +1,44 @@@
+ <?xml version="1.0" encoding="UTF-8"?>
+ <!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ -->
+ <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+     <modelVersion>4.0.0</modelVersion>
+     <parent>
+         <groupId>org.apache.nifi</groupId>
+         <artifactId>nifi-framework</artifactId>
+         <version>0.0.3-incubating-SNAPSHOT</version>
+     </parent>
+     <artifactId>nifi-framework-cluster-web</artifactId>
+     <packaging>jar</packaging>
+     <description>The clustering software for communicating with the NiFi web 
api.</description>
+     <dependencies>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-api</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-properties</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
 -            <artifactId>nifi-web-optimistic-locking</artifactId>
 -        </dependency>
 -        <dependency>
 -            <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-administration</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-user-actions</artifactId>
+         </dependency>
+     </dependencies>
+ </project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
index 0000000,012e7c7..c8c7206
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
@@@ -1,0 -1,47 +1,42 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.cluster.context;
+ 
+ /**
+  * Manages a cluster context on a threadlocal.
+  */
+ public class ClusterContextThreadLocal {
+     
+     private static final ThreadLocal<ClusterContext> contextHolder = new 
ThreadLocal<>();
+     
+     public static void removeContext() {
+         contextHolder.remove();
+     }
+     
+     public static ClusterContext createEmptyContext() {
+         return new ClusterContextImpl();
+     }
+     
+     public static ClusterContext getContext() {
 -        ClusterContext ctx = contextHolder.get();
 -        if(ctx == null) {
 -            ctx = createEmptyContext();
 -            contextHolder.set(ctx);
 -        }
 -        return ctx;
++        return contextHolder.get();
+     }
+     
+     public static void setContext(final ClusterContext context) {
+         contextHolder.set(context);
+     }
+     
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index 0000000,ef927a3..d069d72
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@@ -1,0 -1,144 +1,148 @@@
+ <?xml version="1.0" encoding="UTF-8"?>
+ <!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ -->
+ <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+     <modelVersion>4.0.0</modelVersion>
+     <parent>
+         <groupId>org.apache.nifi</groupId>
+         <artifactId>nifi-framework</artifactId>
+         <version>0.0.3-incubating-SNAPSHOT</version>
+     </parent>
+     <artifactId>nifi-framework-cluster</artifactId>
+     <packaging>jar</packaging>
+     <description>The clustering software for NiFi.</description>
+     <dependencies>
+         <!-- application core dependencies -->
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-api</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-properties</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-logging-utils</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-utils</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-client-dto</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
++            <artifactId>nifi-web-optimistic-locking</artifactId>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-framework-core</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-framework-core-api</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-framework-cluster-protocol</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-framework-cluster-web</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-web-utils</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-administration</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-site-to-site</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.nifi</groupId>
+             <artifactId>nifi-site-to-site-client</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.apache.commons</groupId>
+             <artifactId>commons-compress</artifactId>
+         </dependency>
+         <!-- third party dependencies -->
+         
+         <!-- sun dependencies -->
+         <dependency>
+             <groupId>javax.servlet</groupId>
+             <artifactId>javax.servlet-api</artifactId>
+         </dependency>
+         
+         <!-- commons dependencies -->
+         <dependency>
+             <groupId>commons-io</groupId>
+             <artifactId>commons-io</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>commons-net</groupId>
+             <artifactId>commons-net</artifactId>
+         </dependency>
+         
+         <!-- jersey dependencies -->
+         <dependency>
+             <groupId>com.sun.jersey</groupId>
+             <artifactId>jersey-client</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>com.sun.jersey</groupId>
+             <artifactId>jersey-server</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>com.sun.jersey</groupId>
+             <artifactId>jersey-json</artifactId>
+         </dependency>
+         
+         <!-- spring dependencies -->
+         <dependency>
+             <groupId>org.springframework</groupId>
+             <artifactId>spring-core</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.springframework</groupId>
+             <artifactId>spring-beans</artifactId>
+         </dependency>
+         <dependency>
+             <groupId>org.springframework</groupId>
+             <artifactId>spring-context</artifactId>
+         </dependency>
+     </dependencies>
+     <build>
+         <plugins>
+             <plugin>
+                 <groupId>org.apache.rat</groupId>
+                 <artifactId>apache-rat-plugin</artifactId>
+                 <configuration>
+                     <excludes>
+                         
<exclude>src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt</exclude>
+                         
<exclude>src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt</exclude>
+                     </excludes>
+                 </configuration>
+             </plugin>
+         </plugins>
+     </build>  
+ </project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
index 0000000,eedb88f..c17b429
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
@@@ -1,0 -1,45 +1,56 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.cluster.flow;
+ 
+ import org.apache.nifi.cluster.protocol.NodeIdentifier;
+ import org.apache.nifi.cluster.protocol.StandardDataFlow;
+ 
+ /**
+  * A dataflow with additional information about the cluster.
+  *
+  * @author unattributed
+  */
+ public class ClusterDataFlow {
+ 
+     private final StandardDataFlow dataFlow;
 -
+     private final NodeIdentifier primaryNodeId;
++    private final byte[] controllerServices;
++    private final byte[] reportingTasks;
+ 
 -    public ClusterDataFlow(final StandardDataFlow dataFlow, final 
NodeIdentifier primaryNodeId) {
++    public ClusterDataFlow(final StandardDataFlow dataFlow, final 
NodeIdentifier primaryNodeId, final byte[] controllerServices, final byte[] 
reportingTasks) {
+         this.dataFlow = dataFlow;
+         this.primaryNodeId = primaryNodeId;
++        this.controllerServices = controllerServices;
++        this.reportingTasks = reportingTasks;
+     }
+ 
++    public byte[] getControllerServices() {
++      return controllerServices;
++    }
++    
++    public byte[] getReportingTasks() {
++      return reportingTasks;
++    }
++    
+     public NodeIdentifier getPrimaryNodeId() {
+         return primaryNodeId;
+     }
+ 
+     public StandardDataFlow getDataFlow() {
+         return dataFlow;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
index 0000000,339d904..082d65e
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
@@@ -1,0 -1,115 +1,132 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.cluster.flow;
+ 
+ import java.util.Set;
++
+ import org.apache.nifi.cluster.protocol.NodeIdentifier;
+ 
+ /**
+  * A service for managing the cluster's flow. The service will attempt to keep
+  * the cluster's dataflow current while respecting the value of the configured
+  * retrieval delay.
+  *
+  * The eligible retrieval time is reset with the configured delay every time 
the
+  * flow state is set to STALE. If the state is set to UNKNOWN or CURRENT, then
+  * the flow will not be retrieved.
+  *
+  * Clients must call start() and stop() to initialize and stop the instance.
+  *
+  * @author unattributed
+  */
+ public interface DataFlowManagementService {
+ 
+     /**
+      * Starts the instance. Start may only be called if the instance is not
+      * running.
+      */
+     void start();
+ 
+     /**
+      * Stops the instance. Stop may only be called if the instance is running.
+      */
+     void stop();
+ 
+     /**
+      * @return true if the instance is started; false otherwise.
+      */
+     boolean isRunning();
+ 
+     /**
+      * Loads the dataflow.
+      *
+      * @return the dataflow or null if no dataflow exists
+      */
+     ClusterDataFlow loadDataFlow();
+ 
+     /**
+      * Updates the dataflow with the given primary node identifier.
+      *
+      * @param nodeId the node identifier
+      *
+      * @throws DaoException if the update failed
+      */
+     void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
+ 
+     /**
++     * Updates the dataflow with the given serialized form of the Controller 
Services that are to exist on the NCM.
++     * 
++     * @param serializedControllerServices
++     * @throws DaoException
++     */
++    void updateControllerServices(byte[] serializedControllerServices) throws 
DaoException;
++    
++    /**
++     * Updates the dataflow with the given serialized form of Reporting Tasks 
that are to exist on the NCM.
++     * 
++     * @param serviceNodes
++     * @throws DaoException
++     */
++    void updateReportingTasks(byte[] serializedReportingTasks) throws 
DaoException;
++    
++    /**
+      * Sets the state of the flow.
+      *
+      * @param flowState the state
+      *
+      * @see PersistedFlowState
+      */
+     void setPersistedFlowState(PersistedFlowState flowState);
+ 
+     /**
+      * @return the state of the flow
+      */
+     PersistedFlowState getPersistedFlowState();
+ 
+     /**
+      * @return true if the flow is current; false otherwise.
+      */
+     boolean isFlowCurrent();
+ 
+     /**
+      * Sets the node identifiers to use when attempting to retrieve the flow.
+      *
+      * @param nodeIds the node identifiers
+      */
+     void setNodeIds(Set<NodeIdentifier> nodeIds);
+ 
+     /**
+      * Returns the set of node identifiers the service is using to retrieve 
the
+      * flow.
+      *
+      * @return the set of node identifiers the service is using to retrieve 
the
+      * flow.
+      */
+     Set<NodeIdentifier> getNodeIds();
+ 
+     /**
+      * @return the retrieval delay in seconds
+      */
+     int getRetrievalDelaySeconds();
+ 
+     /**
+      * Sets the retrieval delay.
+      *
+      * @param delay the retrieval delay in seconds
+      */
+     void setRetrievalDelay(String delay);
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
index 0000000,72b594a..dd9d2a3
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
@@@ -1,0 -1,600 +1,615 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.cluster.flow.impl;
+ 
+ import java.io.ByteArrayOutputStream;
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.FileOutputStream;
+ import java.io.FilenameFilter;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.security.MessageDigest;
+ import java.security.NoSuchAlgorithmException;
+ import java.util.Arrays;
+ import java.util.UUID;
+ 
+ import javax.xml.bind.JAXBContext;
+ import javax.xml.bind.JAXBException;
+ import javax.xml.bind.Marshaller;
+ import javax.xml.bind.Unmarshaller;
+ import javax.xml.bind.annotation.XmlRootElement;
+ import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+ import javax.xml.parsers.DocumentBuilder;
+ import javax.xml.parsers.DocumentBuilderFactory;
+ import javax.xml.transform.OutputKeys;
+ import javax.xml.transform.Transformer;
+ import javax.xml.transform.TransformerFactory;
+ import javax.xml.transform.dom.DOMSource;
+ import javax.xml.transform.stream.StreamResult;
+ 
+ import org.apache.commons.compress.archivers.ArchiveEntry;
+ import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+ import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+ import org.apache.nifi.cluster.flow.ClusterDataFlow;
+ import org.apache.nifi.cluster.flow.DaoException;
+ import org.apache.nifi.cluster.flow.DataFlowDao;
+ import org.apache.nifi.cluster.flow.PersistedFlowState;
+ import org.apache.nifi.cluster.protocol.DataFlow;
+ import org.apache.nifi.cluster.protocol.NodeIdentifier;
+ import org.apache.nifi.cluster.protocol.StandardDataFlow;
+ import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+ import org.apache.nifi.logging.NiFiLog;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.stream.io.ByteArrayInputStream;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.util.file.FileUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.w3c.dom.Document;
+ import org.w3c.dom.Element;
+ 
+ /**
+  * Implements the FlowDao interface. The implementation tracks the state of 
the
+  * dataflow by annotating the filename of the flow state file. Specifically, 
the
+  * implementation correlates PersistedFlowState states to filename extensions.
+  * The correlation is as follows:
+  * <ul>
+  * <li> CURRENT maps to flow.xml </li>
+  * <li> STALE maps to flow.xml.stale </li>
+  * <li> UNKNOWN maps to flow.xml.unknown </li>
+  * </ul>
+  * Whenever the flow state changes, the flow state file's name is updated to
+  * denote its state.
+  *
+  * The implementation also provides for a restore directory that may be
+  * configured for higher availability. At instance creation, if the primary or
+  * restore directories have multiple flow state files, an exception is thrown.
+  * If the primary directory has a current flow state file, but the restore
+  * directory does not, then the primary flow state file is copied to the 
restore
+  * directory. If the restore directory has a current flow state file, but the
+  * primary directory does not, then the restore flow state file is copied to 
the
+  * primary directory. If both the primary and restore directories have a 
current
+  * flow state file and the files are different, then an exception is thrown.
+  *
+  * When the flow state file is saved, it is always saved first to the restore
+  * directory followed by a save to the primary directory. When the flow state
+  * file is loaded, a check is made to verify that the primary and restore flow
+  * state files are both current. If either is not current, then an exception 
is
+  * thrown. The primary flow state file is always read when the load method is
+  * called.
+  *
+  * @author unattributed
+  */
+ public class DataFlowDaoImpl implements DataFlowDao {
+ 
+     private final File primaryDirectory;
+     private final File restoreDirectory;
+     private final boolean autoStart;
+     private final String generatedRootGroupId = UUID.randomUUID().toString();
+ 
+     public static final String STALE_EXT = ".stale";
+     public static final String UNKNOWN_EXT = ".unknown";
+     public static final String FLOW_PACKAGE = "flow.tar";
+     public static final String FLOW_XML_FILENAME = "flow.xml";
+     public static final String TEMPLATES_FILENAME = "templates.xml";
+     public static final String SNIPPETS_FILENAME = "snippets.xml";
++    public static final String CONTROLLER_SERVICES_FILENAME = 
"controller-services.xml";
++    public static final String REPORTING_TASKS_FILENAME = 
"reporting-tasks.xml";
+     public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml";
+ 
+     private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class));
+ 
+     public DataFlowDaoImpl(final File primaryDirectory) throws DaoException {
+         this(primaryDirectory, null, false);
+     }
+ 
+     public DataFlowDaoImpl(final File primaryDirectory, final File 
restoreDirectory, final boolean autoStart) throws DaoException {
+ 
+         // sanity check that primary directory is a directory, creating it if 
necessary
+         if (primaryDirectory == null) {
+             throw new IllegalArgumentException("Primary directory may not be 
null.");
+         } else if (!primaryDirectory.exists()) {
+             if (!primaryDirectory.mkdir()) {
+                 throw new DaoException(String.format("Failed to create 
primary directory '%s'", primaryDirectory.getAbsolutePath()));
+             }
+         } else if (!primaryDirectory.isDirectory()) {
+             throw new IllegalArgumentException("Primary directory must be a 
directory.");
+         }
+ 
+         this.autoStart = autoStart;
+ 
+         try {
+             this.primaryDirectory = primaryDirectory;
+             this.restoreDirectory = restoreDirectory;
+ 
+             if (restoreDirectory == null) {
+                 // check that we have exactly one current flow state file
+                 ensureSingleCurrentStateFile(primaryDirectory);
+             } else {
+ 
+                 // check that restore directory is a directory, creating it 
if necessary
+                 FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
+ 
+                 // check that restore directory is not the same as the 
primary directory
+                 if 
(primaryDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) 
{
+                     throw new IllegalArgumentException(String.format("Primary 
directory '%s' is the same as restore directory '%s' ",
+                             primaryDirectory.getAbsolutePath(), 
restoreDirectory.getAbsolutePath()));
+                 }
+ 
+                 final File[] primaryFlowStateFiles = 
getFlowStateFiles(primaryDirectory);
+                 final File[] restoreFlowStateFiles = 
getFlowStateFiles(restoreDirectory);
+ 
+                 // if more than one state file in either primary or restore, 
then throw exception
+                 if (primaryFlowStateFiles.length > 1) {
+                     throw new IllegalStateException(String.format("Found 
multiple dataflow state files in primary directory '%s'", primaryDirectory));
+                 } else if (restoreFlowStateFiles.length > 1) {
+                     throw new IllegalStateException(String.format("Found 
multiple dataflow state files in restore directory '%s'", restoreDirectory));
+                 }
+ 
+                 // check that the single primary state file we found is 
current or create a new one
+                 final File primaryFlowStateFile = 
ensureSingleCurrentStateFile(primaryDirectory);
+ 
+                 // check that the single restore state file we found is 
current or create a new one
+                 final File restoreFlowStateFile = 
ensureSingleCurrentStateFile(restoreDirectory);
+ 
+                 // if there was a difference in flow state file directories, 
then copy the appropriate files
+                 if (restoreFlowStateFiles.length == 0 && 
primaryFlowStateFiles.length != 0) {
+                     // copy primary state file to restore
+                     FileUtils.copyFile(primaryFlowStateFile, 
restoreFlowStateFile, false, false, logger);
+                 } else if (primaryFlowStateFiles.length == 0 && 
restoreFlowStateFiles.length != 0) {
+                     // copy restore state file to primary
+                     FileUtils.copyFile(restoreFlowStateFile, 
primaryFlowStateFile, false, false, logger);
+                 } else {
+                     // sync the primary copy with the restore copy
+                     syncWithRestore(primaryFlowStateFile, 
restoreFlowStateFile);
+                 }
+ 
+             }
+         } catch (final IOException | IllegalArgumentException | 
IllegalStateException | JAXBException ex) {
+             throw new DaoException(ex);
+         }
+     }
+     
+     
+     private void syncWithRestore(final File primaryFile, final File 
restoreFile) throws IOException {
+         try (final FileInputStream primaryFis = new 
FileInputStream(primaryFile);
+              final TarArchiveInputStream primaryIn = new 
TarArchiveInputStream(primaryFis);
+              final FileInputStream restoreFis = new 
FileInputStream(restoreFile);
+              final TarArchiveInputStream restoreIn = new 
TarArchiveInputStream(restoreFis)) {
+             
+             final ArchiveEntry primaryEntry = primaryIn.getNextEntry();
+             final ArchiveEntry restoreEntry = restoreIn.getNextEntry();
+ 
+             if ( primaryEntry == null && restoreEntry == null ) {
+                 return;
+             }
+ 
+             if ( (primaryEntry == null && restoreEntry != null) || 
(primaryEntry != null && restoreEntry == null) ) {
+                 throw new IllegalStateException(String.format("Primary file 
'%s' is different than restore file '%s'",
+                         primaryFile.getAbsoluteFile(), 
restoreFile.getAbsolutePath()));
+             }
+             
+             final byte[] primaryMd5 = calculateMd5(primaryIn);
+             final byte[] restoreMd5 = calculateMd5(restoreIn);
+             
+             if ( !Arrays.equals(primaryMd5, restoreMd5) ) {
+                 throw new IllegalStateException(String.format("Primary file 
'%s' is different than restore file '%s'",
+                         primaryFile.getAbsoluteFile(), 
restoreFile.getAbsolutePath()));
+             }
+         }
+     }
+     
+     private byte[] calculateMd5(final InputStream in) throws IOException {
+         final MessageDigest digest;
+         try {
+             digest = MessageDigest.getInstance("MD5");
+         } catch (final NoSuchAlgorithmException nsae) {
+             throw new IOException(nsae);
+         }
+         
+         int len;
+         final byte[] buffer = new byte[8192];
+         while ((len = in.read(buffer)) > -1) {
+             if (len > 0) {
+                 digest.update(buffer, 0, len);
+             }
+         }
+         return digest.digest();
+     }
+ 
+     @Override
+     public ClusterDataFlow loadDataFlow() throws DaoException {
+         try {
+             return parseDataFlow(getExistingFlowStateFile(primaryDirectory));
+         } catch (final IOException | JAXBException ex) {
+             throw new DaoException(ex);
+         }
+     }
+ 
+     @Override
+     public void saveDataFlow(final ClusterDataFlow dataFlow) throws 
DaoException {
+         try {
+ 
+             final File primaryStateFile = getFlowStateFile(primaryDirectory);
+ 
+             // write to restore before writing to primary in case primary 
experiences problems
+             if (restoreDirectory != null) {
+                 final File restoreStateFile = 
getFlowStateFile(restoreDirectory);
+                 if (restoreStateFile == null) {
+                     if (primaryStateFile == null) {
+                         
writeDataFlow(createNewFlowStateFile(restoreDirectory), dataFlow);
+                     } else {
+                         throw new DaoException(String.format("Unable to save 
dataflow because dataflow state file in primary directory '%s' exists, but it 
does not exist in the restore directory '%s'",
+                                 primaryDirectory.getAbsolutePath(), 
restoreDirectory.getAbsolutePath()));
+                     }
+                 } else {
+                     if (primaryStateFile == null) {
+                         throw new DaoException(String.format("Unable to save 
dataflow because dataflow state file in restore directory '%s' exists, but it 
does not exist in the primary directory '%s'",
+                                 restoreDirectory.getAbsolutePath(), 
primaryDirectory.getAbsolutePath()));
+                     } else {
+                         final PersistedFlowState primaryFlowState = 
getPersistedFlowState(primaryStateFile);
+                         final PersistedFlowState restoreFlowState = 
getPersistedFlowState(restoreStateFile);
+                         if (primaryFlowState == restoreFlowState) {
+                             writeDataFlow(restoreStateFile, dataFlow);
+                         } else {
+                             throw new DaoException(String.format("Unable to 
save dataflow because state file in primary directory '%s' has state '%s', but 
the state file in the restore directory '%s' has state '%s'",
+                                     primaryDirectory.getAbsolutePath(), 
primaryFlowState, restoreDirectory.getAbsolutePath(), restoreFlowState));
+                         }
+                     }
+                 }
+             }
+ 
+             // write dataflow to primary 
+             if (primaryStateFile == null) {
+                 writeDataFlow(createNewFlowStateFile(primaryDirectory), 
dataFlow);
+             } else {
+                 writeDataFlow(primaryStateFile, dataFlow);
+             }
+ 
+         } catch (final IOException | JAXBException ex) {
+             throw new DaoException(ex);
+         }
+     }
+ 
+     @Override
+     public PersistedFlowState getPersistedFlowState() {
+         // trust restore over primary if configured for restore
+         if (restoreDirectory == null) {
+             return 
getPersistedFlowState(getExistingFlowStateFile(primaryDirectory));
+         } else {
+             return 
getPersistedFlowState(getExistingFlowStateFile(restoreDirectory));
+         }
+     }
+ 
+     @Override
+     public void setPersistedFlowState(final PersistedFlowState flowState) 
throws DaoException {
+         // rename restore before primary if configured for restore
+         if (restoreDirectory != null) {
+             renameFlowStateFile(getExistingFlowStateFile(restoreDirectory), 
flowState);
+         }
+         renameFlowStateFile(getExistingFlowStateFile(primaryDirectory), 
flowState);
+     }
+ 
+     private File ensureSingleCurrentStateFile(final File dir) throws 
IOException, JAXBException {
+ 
+         // ensure that we have at most one state file and if we have one, it 
is current
+         final File[] directoryFlowStateFiles = getFlowStateFiles(dir);
+         if (directoryFlowStateFiles.length > 1) {
+             throw new DaoException(String.format("Found multiple dataflow 
state files in directory '%s'", dir));
+         } else if (directoryFlowStateFiles.length == 0) {
+             // create a new file if none exist
+             return createNewFlowStateFile(dir);
+         } else {
+             // check that the single flow state file is current
+             final PersistedFlowState flowState = 
getPersistedFlowState(directoryFlowStateFiles[0]);
+             if (PersistedFlowState.CURRENT == flowState) {
+                 return directoryFlowStateFiles[0];
+             } else {
+                 throw new DaoException(String.format("Dataflow state file 
'%s' must be current.", directoryFlowStateFiles[0].getAbsolutePath()));
+             }
+         }
+ 
+     }
+ 
+     private PersistedFlowState getPersistedFlowState(final File file) {
+         final String path = file.getAbsolutePath();
+         if (path.endsWith(STALE_EXT)) {
+             return PersistedFlowState.STALE;
+         } else if (path.endsWith(UNKNOWN_EXT)) {
+             return PersistedFlowState.UNKNOWN;
+         } else {
+             return PersistedFlowState.CURRENT;
+         }
+     }
+ 
+     private File getFlowStateFile(final File dir) {
+         final File[] files = getFlowStateFiles(dir);
+         if (files.length > 1) {
+             throw new IllegalStateException(String.format("Expected at most 
one dataflow state file, but found %s files.", files.length));
+         } else if (files.length == 0) {
+             return null;
+         } else {
+             return files[0];
+         }
+     }
+ 
+     private File getExistingFlowStateFile(final File dir) {
+         final File file = getFlowStateFile(dir);
+         if (file == null) {
+             throw new IllegalStateException(String.format("Expected a 
dataflow state file, but none existed in directory '%s'", 
dir.getAbsolutePath()));
+         }
+         return file;
+     }
+ 
+     private File[] getFlowStateFiles(final File dir) {
+         final File[] files = dir.listFiles(new FilenameFilter() {
+             @Override
+             public boolean accept(File dir, String name) {
+                 return (name.equals(FLOW_PACKAGE) || name.endsWith(STALE_EXT) 
|| name.endsWith(UNKNOWN_EXT));
+             }
+         });
+ 
+         if (files == null) {
+             return new File[0];
+         } else {
+             return files;
+         }
+     }
+ 
+     private File removeStateFileExtension(final File file) {
+ 
+         final String path = file.getAbsolutePath();
+         final int stateFileExtIndex;
+         if (path.endsWith(STALE_EXT)) {
+             stateFileExtIndex = path.lastIndexOf(STALE_EXT);
+         } else if (path.endsWith(UNKNOWN_EXT)) {
+             stateFileExtIndex = path.lastIndexOf(UNKNOWN_EXT);
+         } else {
+             stateFileExtIndex = path.length();
+         }
+ 
+         return new File(path.substring(0, stateFileExtIndex));
+     }
+ 
+     private File addStateFileExtension(final File file, final 
PersistedFlowState state) {
+         switch (state) {
+             case CURRENT: {
+                 return file;
+             }
+             case STALE: {
+                 return new File(file.getAbsolutePath() + STALE_EXT);
+             }
+             case UNKNOWN: {
+                 return new File(file.getAbsolutePath() + UNKNOWN_EXT);
+             }
+             default: {
+                 throw new RuntimeException("Unsupported PersistedFlowState 
Enum value: " + state);
+             }
+         }
+     }
+ 
+     private File createNewFlowStateFile(final File dir) throws IOException, 
JAXBException {
+         final File stateFile = new File(dir, FLOW_PACKAGE);
+         stateFile.createNewFile();
+ 
 -        final byte[] flowBytes = getEmptyFlowBytes();
 -        final byte[] templateBytes = new byte[0];
 -        final byte[] snippetBytes = new byte[0];
 -        final DataFlow dataFlow = new StandardDataFlow(flowBytes, 
templateBytes, snippetBytes);
 -
 -        final ClusterMetadata clusterMetadata = new ClusterMetadata();
 -        writeDataFlow(stateFile, dataFlow, clusterMetadata);
++        writeDataFlow(stateFile, new ClusterDataFlow(null, null, new byte[0], 
new byte[0]), new ClusterMetadata());
+ 
+         return stateFile;
+     }
+ 
+     private byte[] getEmptyFlowBytes() throws IOException {
+         try {
+             final DocumentBuilder docBuilder = 
DocumentBuilderFactory.newInstance().newDocumentBuilder();
+             final Document document = docBuilder.newDocument();
+ 
+             final Element controller = 
document.createElement("flowController");
+             document.appendChild(controller);
+ 
+             controller.appendChild(createTextElement(document, 
"maxThreadCount", "15"));
+ 
+             final Element rootGroup = document.createElement("rootGroup");
+             rootGroup.appendChild(createTextElement(document, "id", 
generatedRootGroupId));
+             rootGroup.appendChild(createTextElement(document, "name", "NiFi 
Flow"));
+ 
+             // create the position element
+             final Element positionElement = createTextElement(document, 
"position", "");
+             positionElement.setAttribute("x", "0.0");
+             positionElement.setAttribute("y", "0.0");
+             rootGroup.appendChild(positionElement);
+ 
+             rootGroup.appendChild(createTextElement(document, "comment", ""));
+             controller.appendChild(rootGroup);
+ 
+             final Transformer transformer = 
TransformerFactory.newInstance().newTransformer();
+             
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount";, "2");
+             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ 
+             final DOMSource source = new DOMSource(document);
+             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final StreamResult result = new StreamResult(baos);
+             transformer.transform(source, result);
+ 
+             return baos.toByteArray();
+         } catch (final Exception e) {
+             throw new IOException(e);
+         }
+     }
+ 
+     private Element createTextElement(final Document document, final String 
elementName, final String value) {
+         final Element element = document.createElement(elementName);
+         element.setTextContent(value);
+         return element;
+     }
+ 
+     private void renameFlowStateFile(final File flowStateFile, final 
PersistedFlowState newState) throws DaoException {
+         final PersistedFlowState existingState = 
getPersistedFlowState(flowStateFile);
+         if (existingState != newState) {
+             final File newFlowStateFile = 
addStateFileExtension(removeStateFileExtension(flowStateFile), newState);
+             if (flowStateFile.renameTo(newFlowStateFile) == false) {
+                 throw new DaoException(
+                         String.format("Failed to rename flow state file '%s' 
to new name '%s'", flowStateFile.getAbsolutePath(), 
newFlowStateFile.getAbsolutePath()));
+             }
+         }
+     }
+ 
+     private ClusterDataFlow parseDataFlow(final File file) throws 
IOException, JAXBException, DaoException {
+         byte[] flowBytes = new byte[0];
+         byte[] templateBytes = new byte[0];
+         byte[] snippetBytes = new byte[0];
+         byte[] clusterInfoBytes = new byte[0];
 -
++        byte[] controllerServiceBytes = new byte[0];
++        byte[] reportingTaskBytes = new byte[0];
++        
+         try (final InputStream inStream = new FileInputStream(file);
+                 final TarArchiveInputStream tarIn = new 
TarArchiveInputStream(new BufferedInputStream(inStream))) {
+             TarArchiveEntry tarEntry;
+             while ((tarEntry = tarIn.getNextTarEntry()) != null) {
+                 switch (tarEntry.getName()) {
+                     case FLOW_XML_FILENAME:
+                         flowBytes = new byte[(int) tarEntry.getSize()];
+                         StreamUtils.fillBuffer(tarIn, flowBytes, true);
+                         break;
+                     case TEMPLATES_FILENAME:
+                         templateBytes = new byte[(int) tarEntry.getSize()];
+                         StreamUtils.fillBuffer(tarIn, templateBytes, true);
+                         break;
+                     case SNIPPETS_FILENAME:
+                         snippetBytes = new byte[(int) tarEntry.getSize()];
+                         StreamUtils.fillBuffer(tarIn, snippetBytes, true);
+                         break;
+                     case CLUSTER_INFO_FILENAME:
+                         clusterInfoBytes = new byte[(int) tarEntry.getSize()];
+                         StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
+                         break;
++                    case CONTROLLER_SERVICES_FILENAME:
++                      controllerServiceBytes = new byte[(int) 
tarEntry.getSize()];
++                      StreamUtils.fillBuffer(tarIn, controllerServiceBytes, 
true);
++                      break;
++                    case REPORTING_TASKS_FILENAME:
++                      reportingTaskBytes = new byte[(int) tarEntry.getSize()];
++                      StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true);
++                      break;
+                     default:
+                         throw new DaoException("Found Unexpected file in 
dataflow configuration: " + tarEntry.getName());
+                 }
+             }
+         }
+ 
+         final ClusterMetadata clusterMetadata;
+         if (clusterInfoBytes.length == 0) {
+             clusterMetadata = null;
+         } else {
+             final Unmarshaller clusterMetadataUnmarshaller = 
ClusterMetadata.jaxbCtx.createUnmarshaller();
+             clusterMetadata = (ClusterMetadata) 
clusterMetadataUnmarshaller.unmarshal(new 
ByteArrayInputStream(clusterInfoBytes));
+         }
+ 
+         final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, 
templateBytes, snippetBytes);
+         dataFlow.setAutoStartProcessors(autoStart);
+ 
 -        return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null 
: clusterMetadata.getPrimaryNodeId());
++        return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null 
: clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, 
reportingTaskBytes);
+     }
+ 
+     private void writeDataFlow(final File file, final ClusterDataFlow 
clusterDataFlow) throws IOException, JAXBException {
+ 
+         // get the data flow
+         DataFlow dataFlow = clusterDataFlow.getDataFlow();
+ 
+         // if no dataflow, then write a new dataflow
+         if (dataFlow == null) {
+             dataFlow = new StandardDataFlow(new byte[0], new byte[0], new 
byte[0]);
+         }
+ 
+         // setup the cluster metadata
+         final ClusterMetadata clusterMetadata = new ClusterMetadata();
+         clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId());
+ 
+         // write to disk
 -        writeDataFlow(file, dataFlow, clusterMetadata);
++        writeDataFlow(file, clusterDataFlow, clusterMetadata);
+     }
+ 
+     private void writeTarEntry(final TarArchiveOutputStream tarOut, final 
String filename, final byte[] bytes) throws IOException {
+         final TarArchiveEntry flowEntry = new TarArchiveEntry(filename);
+         flowEntry.setSize(bytes.length);
+         tarOut.putArchiveEntry(flowEntry);
+         tarOut.write(bytes);
+         tarOut.closeArchiveEntry();
+     }
+ 
 -    private void writeDataFlow(final File file, final DataFlow dataFlow, 
final ClusterMetadata clusterMetadata) throws IOException, JAXBException {
++    private void writeDataFlow(final File file, final ClusterDataFlow 
clusterDataFlow, final ClusterMetadata clusterMetadata) throws IOException, 
JAXBException {
+ 
+         try (final OutputStream fos = new FileOutputStream(file);
+                 final TarArchiveOutputStream tarOut = new 
TarArchiveOutputStream(new BufferedOutputStream(fos))) {
+ 
 -            writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
 -            writeTarEntry(tarOut, TEMPLATES_FILENAME, 
dataFlow.getTemplates());
 -            writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets());
++            final DataFlow dataFlow = clusterDataFlow.getDataFlow();
++            if ( dataFlow == null ) {
++                writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes());
++                writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]);
++                writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]);
++            } else {
++                writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
++                writeTarEntry(tarOut, TEMPLATES_FILENAME, 
dataFlow.getTemplates());
++                writeTarEntry(tarOut, SNIPPETS_FILENAME, 
dataFlow.getSnippets());
++            }
++            writeTarEntry(tarOut, CONTROLLER_SERVICES_FILENAME, 
clusterDataFlow.getControllerServices());
++            writeTarEntry(tarOut, REPORTING_TASKS_FILENAME, 
clusterDataFlow.getReportingTasks());
+ 
+             final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
+             writeClusterMetadata(clusterMetadata, baos);
+             final byte[] clusterInfoBytes = baos.toByteArray();
+ 
+             writeTarEntry(tarOut, CLUSTER_INFO_FILENAME, clusterInfoBytes);
+         }
+     }
+ 
+     private void writeClusterMetadata(final ClusterMetadata clusterMetadata, 
final OutputStream os) throws IOException, JAXBException {
+         // write cluster metadata to output stream
+         final Marshaller marshaller = 
ClusterMetadata.jaxbCtx.createMarshaller();
+         marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+         marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
+         marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
+         marshaller.marshal(clusterMetadata, os);
+     }
+ 
+     @XmlRootElement(name = "clusterMetadata")
+     private static class ClusterMetadata {
+ 
+         private NodeIdentifier primaryNodeId;
+ 
+         private static final JAXBContext jaxbCtx;
+ 
+         static {
+             try {
+                 jaxbCtx = JAXBContext.newInstance(ClusterMetadata.class);
+             } catch (final JAXBException je) {
+                 throw new RuntimeException(je);
+             }
+         }
+ 
+         @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+         public NodeIdentifier getPrimaryNodeId() {
+             return primaryNodeId;
+         }
+ 
+         public void setPrimaryNodeId(final NodeIdentifier primaryNodeId) {
+             this.primaryNodeId = primaryNodeId;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
index 0000000,e135af3..1bb8ca3
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
@@@ -1,0 -1,356 +1,413 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.cluster.flow.impl;
+ 
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.Set;
+ import java.util.Timer;
+ import java.util.TimerTask;
+ import java.util.concurrent.CopyOnWriteArraySet;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ 
+ import org.apache.nifi.cluster.flow.ClusterDataFlow;
+ import org.apache.nifi.cluster.flow.DaoException;
+ import org.apache.nifi.cluster.flow.DataFlowDao;
+ import org.apache.nifi.cluster.flow.DataFlowManagementService;
+ import org.apache.nifi.cluster.flow.PersistedFlowState;
+ import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+ import org.apache.nifi.cluster.protocol.NodeIdentifier;
+ import org.apache.nifi.cluster.protocol.StandardDataFlow;
+ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+ import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+ import org.apache.nifi.logging.NiFiLog;
+ import org.apache.nifi.util.FormatUtils;
 -
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * Implements FlowManagementService interface. The service tries to keep the
+  * cluster's flow current with regards to the available nodes.
+  *
+  * The instance may be configured with a retrieval delay, which will reduce 
the
+  * number of retrievals performed by the service at the expense of increasing
+  * the chances that the service will not be able to provide a current flow to
+  * the caller.
+  *
+  * By default, the service will try to update the flow as quickly as possible.
+  * Configuring a delay enables a less aggressive retrieval strategy.
+  * Specifically, the eligible retrieval time is reset every time the flow 
state
+  * is set to STALE. If the state is set to UNKNOWN or CURRENT, then the flow
+  * will not be retrieved.
+  *
+  * @author unattributed
+  */
+ public class DataFlowManagementServiceImpl implements 
DataFlowManagementService {
+ 
+     /*
+      * Developer Note: 
+      * 
+      * This class maintains an ExecutorService and a Runnable.
+      * Although the class is not externally threadsafe, its internals are 
protected to
+      * accommodate multithread access between the ExecutorServer and the 
Runnable.
+      * 
+      */
+     private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class));
+ 
+     private final DataFlowDao flowDao;
+ 
+     private final ClusterManagerProtocolSender sender;
+ 
+     private final Set<NodeIdentifier> nodeIds = new CopyOnWriteArraySet<>();
+ 
+     private final AtomicBoolean stopRequested = new AtomicBoolean(false);
+ 
+     private final AtomicLong lastRetrievalTime = new AtomicLong(-1);
+ 
+     private Timer flowRetriever;
+ 
+     private long retrievableAfterTime = 0L;
+ 
+     private AtomicInteger retrievalDelaySeconds = new AtomicInteger(0);
+ 
+     private final TimingReentrantLock resourceLock = new 
TimingReentrantLock(new ReentrantLock());
+ 
+     public DataFlowManagementServiceImpl(final DataFlowDao flowDao, final 
ClusterManagerProtocolSender sender) {
+         if (flowDao == null) {
+             throw new IllegalArgumentException("Flow DAO may not be null.");
+         } else if (sender == null) {
+             throw new IllegalArgumentException("Cluster Manager Protocol 
Sender may not be null.");
+         }
+         this.flowDao = flowDao;
+         this.sender = sender;
+     }
+ 
+     @Override
+     public void start() {
+ 
+         if (isRunning()) {
+             throw new IllegalArgumentException("Instance is already 
running.");
+         }
+ 
+         // reset stop requested
+         stopRequested.set(false);
+ 
+         // setup flow retreiver timer
+         flowRetriever = new Timer("Flow Management Service", /* is daemon */ 
true);
+         flowRetriever.schedule(new FlowRetrieverTimerTask(), 0, 500);
+     }
+ 
+     @Override
+     public boolean isRunning() {
+         return (flowRetriever != null);
+     }
+ 
+     @Override
+     public void stop() {
+ 
+         if (isRunning() == false) {
+             throw new IllegalArgumentException("Instance is already 
stopped.");
+         }
+ 
+         // record stop request
+         stopRequested.set(true);
+ 
+         flowRetriever.cancel();
+         flowRetriever = null;
+ 
+     }
+ 
+     @Override
+     public ClusterDataFlow loadDataFlow() throws DaoException {
+         resourceLock.lock();
+         try {
+             return flowDao.loadDataFlow();
+         } finally {
+             resourceLock.unlock("loadDataFlow");
+         }
+     }
+ 
+     @Override
+     public void updatePrimaryNode(final NodeIdentifier nodeId) {
+         resourceLock.lock();
+         try {
+             final ClusterDataFlow existingClusterDataFlow = 
flowDao.loadDataFlow();
+ 
+             final StandardDataFlow dataFlow;
++            final byte[] controllerServiceBytes;
++            final byte[] reportingTaskBytes;
+             if (existingClusterDataFlow == null) {
+                 dataFlow = null;
++                controllerServiceBytes = new byte[0];
++                reportingTaskBytes = new byte[0];
+             } else {
+                 dataFlow = existingClusterDataFlow.getDataFlow();
++                controllerServiceBytes = 
existingClusterDataFlow.getControllerServices();
++                reportingTaskBytes = 
existingClusterDataFlow.getReportingTasks();
+             }
+ 
 -            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId));
++            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, 
controllerServiceBytes, reportingTaskBytes));
+         } finally {
+             resourceLock.unlock("updatePrimaryNode");
+         }
+     }
++    
++    
++    @Override
++    public void updateControllerServices(final byte[] controllerServiceBytes) 
throws DaoException {
++      resourceLock.lock();
++      try {
++              final ClusterDataFlow existingClusterDataFlow = 
flowDao.loadDataFlow();
++
++            final StandardDataFlow dataFlow;
++            final byte[] reportingTaskBytes;
++            final NodeIdentifier nodeId;
++            if (existingClusterDataFlow == null) {
++                dataFlow = null;
++                nodeId = null;
++                reportingTaskBytes = new byte[0];
++            } else {
++                dataFlow = existingClusterDataFlow.getDataFlow();
++                nodeId = existingClusterDataFlow.getPrimaryNodeId();
++                reportingTaskBytes = 
existingClusterDataFlow.getReportingTasks();
++            }
++
++            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, 
controllerServiceBytes, reportingTaskBytes));
++      } finally {
++              resourceLock.unlock("updateControllerServices");
++      }
++    }
++    
++    @Override
++    public void updateReportingTasks(final byte[] reportingTaskBytes) throws 
DaoException {
++      resourceLock.lock();
++      try {
++              final ClusterDataFlow existingClusterDataFlow = 
flowDao.loadDataFlow();
++
++            final StandardDataFlow dataFlow;
++            final byte[] controllerServiceBytes;
++            final NodeIdentifier nodeId;
++            if (existingClusterDataFlow == null) {
++                dataFlow = null;
++                nodeId = null;
++                controllerServiceBytes = null;
++            } else {
++                dataFlow = existingClusterDataFlow.getDataFlow();
++                nodeId = existingClusterDataFlow.getPrimaryNodeId();
++                controllerServiceBytes = 
existingClusterDataFlow.getControllerServices();
++            }
++
++            flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, 
controllerServiceBytes, reportingTaskBytes));
++      } finally {
++              resourceLock.unlock("updateControllerServices");
++      }
++    }
+ 
+     @Override
+     public PersistedFlowState getPersistedFlowState() {
+         resourceLock.lock();
+         try {
+             return flowDao.getPersistedFlowState();
+         } finally {
+             resourceLock.unlock("getPersistedFlowState");
+         }
+     }
+ 
+     @Override
+     public boolean isFlowCurrent() {
+         return PersistedFlowState.CURRENT == getPersistedFlowState();
+     }
+ 
+     @Override
+     public void setPersistedFlowState(final PersistedFlowState flowState) {
+         // lock to ensure state change and retrievable time update are atomic
+         resourceLock.lock();
+         try {
+             flowDao.setPersistedFlowState(flowState);
+             if (PersistedFlowState.STALE == flowState) {
+                 retrievableAfterTime = new Date().getTime() + 
(getRetrievalDelaySeconds() * 1000);
+             } else if (PersistedFlowState.UNKNOWN == flowState || 
PersistedFlowState.CURRENT == flowState) {
+                 retrievableAfterTime = Long.MAX_VALUE;
+             }
+         } finally {
+             resourceLock.unlock("setPersistedFlowState");
+         }
+     }
+ 
+     @Override
+     public Set<NodeIdentifier> getNodeIds() {
+         return Collections.unmodifiableSet(nodeIds);
+     }
+ 
+     @Override
+     public void setNodeIds(final Set<NodeIdentifier> nodeIds) {
+ 
+         if (nodeIds == null) {
+             throw new IllegalArgumentException("Node IDs may not be null.");
+         }
+ 
+         resourceLock.lock();
+         try {
+ 
+             if (this.nodeIds.equals(nodeIds)) {
+                 return;
+             }
+ 
+             this.nodeIds.clear();
+             this.nodeIds.addAll(nodeIds);
+ 
+         } finally {
+             resourceLock.unlock("setNodeIds");
+         }
+ 
+     }
+ 
+     @Override
+     public int getRetrievalDelaySeconds() {
+         return retrievalDelaySeconds.get();
+     }
+ 
+     @Override
+     public void setRetrievalDelay(final String retrievalDelay) {
+         this.retrievalDelaySeconds.set((int) 
FormatUtils.getTimeDuration(retrievalDelay, TimeUnit.SECONDS));
+     }
+ 
+     public ClusterManagerProtocolSender getSender() {
+         return sender;
+     }
+ 
+     public long getLastRetrievalTime() {
+         return lastRetrievalTime.get();
+     }
+ 
+     /**
+      * A timer task for issuing FlowRequestMessage messages to nodes to 
retrieve
+      * an updated flow.
+      */
+     private class FlowRetrieverTimerTask extends TimerTask {
+ 
+         @Override
+         public void run() {
+ 
+             resourceLock.lock();
+             try {
+                 // if flow is current, then we're done
+                 if (isFlowCurrent()) {
+                     return;
+                 }
+             } catch (final Exception ex) {
+                 logger.info("Encountered exception checking if flow is 
current caused by " + ex, ex);
+             } finally {
+                 resourceLock.unlock("FlowRetrieverTimerTask - isFlowCurrent");
+             }
+ 
+             final FlowRequestMessage request = new FlowRequestMessage();
+             for (final NodeIdentifier nodeId : getNodeIds()) {
+                 try {
+                     // setup request
+                     request.setNodeId(nodeId);
+ 
+                     // record request time
+                     final long requestSentTime = new Date().getTime();
+ 
+                     resourceLock.lock();
+                     try {
+                         // sanity checks before making request
+                         if (stopRequested.get()) {  // did we receive a stop 
request
+                             logger.debug("Stopping runnable prematurely 
because a request to stop was issued.");
+                             return;
+                         } else if (requestSentTime < retrievableAfterTime) {
+                             /*
+                              * Retrievable after time was updated while 
obtaining
+                              * the lock, so try again later
+                              */
+                             return;
+                         }
+                     } finally {
+                         resourceLock.unlock("FlowRetrieverTimerTask - check 
stopRequested");
+                     }
+ 
+                     // send request
+                     final FlowResponseMessage response = 
sender.requestFlow(request);
+ 
+                     resourceLock.lock();
+                     try {
+                         // check if the retrieved flow is still valid
+                         if (requestSentTime > retrievableAfterTime) {
+                             logger.info("Saving retrieved flow.");
+ 
+                             final StandardDataFlow dataFlow = 
response.getDataFlow();
+                             final ClusterDataFlow existingClusterDataFlow = 
flowDao.loadDataFlow();
+                             final ClusterDataFlow currentClusterDataFlow;
+                             if (existingClusterDataFlow == null) {
 -                                currentClusterDataFlow = new 
ClusterDataFlow(dataFlow, null);
++                                currentClusterDataFlow = new 
ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]);
+                             } else {
 -                                currentClusterDataFlow = new 
ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId());
++                                currentClusterDataFlow = new 
ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(), 
++                                              
existingClusterDataFlow.getControllerServices(), 
existingClusterDataFlow.getReportingTasks());
+                             }
+                             flowDao.saveDataFlow(currentClusterDataFlow);
+                             
flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
+                             lastRetrievalTime.set(new Date().getTime());
+                         }
+ 
+                         /*
+                          * Retrievable after time was updated while requesting
+                          * the flow, so try again later.
+                          */
+                     } finally {
+                         resourceLock.unlock("FlowRetrieverTimerTask - 
saveDataFlow");
+                     }
+ 
+                 } catch (final Throwable t) {
+                     logger.info("Encountered exception retrieving flow from 
node " + nodeId + " caused by " + t, t);
+                 }
+             }
+         }
+     }
+ 
+     private static class TimingReentrantLock {
+ 
+         private final Lock lock;
+         private static final Logger logger = 
LoggerFactory.getLogger("dataFlowManagementService.lock");
+ 
+         private final ThreadLocal<Long> lockTime = new ThreadLocal<>();
+ 
+         public TimingReentrantLock(final Lock lock) {
+             this.lock = lock;
+         }
+ 
+         public void lock() {
+             lock.lock();
+             lockTime.set(System.nanoTime());
+         }
+ 
+         public void unlock(final String task) {
+             final long nanosLocked = System.nanoTime() - lockTime.get();
+             lock.unlock();
+ 
+             final long millisLocked = 
TimeUnit.MILLISECONDS.convert(nanosLocked, TimeUnit.NANOSECONDS);
+             if (millisLocked > 100L) {
+                 logger.debug("Lock held for {} milliseconds for task: {}", 
millisLocked, task);
+             }
+         }
+     }
+ }

Reply via email to