Repository: nifi
Updated Branches:
  refs/heads/master a2ed0f061 -> c2ae7a6d7


NIFI-2605: - Fixing a regression bug where nodes would potentially be elected 
leader for Cluster Coordinator role when they do not have the correct flow

 -  Ensure that we log which node is the cluster coordinator on startup instead 
of just indicating that there is one. If we later determine that there is none, 
ensure that we register for the role

This closes #900

Signed-off-by: jpercivall <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c2ae7a6d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c2ae7a6d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c2ae7a6d

Branch: refs/heads/master
Commit: c2ae7a6d7cf94b817e288c28038d18db51fcb457
Parents: a2ed0f0
Author: Mark Payne <[email protected]>
Authored: Fri Aug 19 13:41:58 2016 -0400
Committer: jpercivall <[email protected]>
Committed: Wed Aug 24 12:30:48 2016 -0400

----------------------------------------------------------------------
 .../node/CuratorNodeProtocolSender.java         |   2 +-
 .../node/LeaderElectionNodeProtocolSender.java  |   2 +-
 .../node/NodeClusterCoordinator.java            |   2 +-
 .../exception/BlockedByFirewallException.java   |   1 +
 .../manager/exception/ClusterException.java     |  39 -----
 .../exception/IllegalClusterStateException.java |   2 +
 .../NoClusterCoordinatorException.java          |  31 ----
 .../exception/NoConnectedNodesException.java    |   2 +
 .../exception/NoResponseFromNodesException.java |   2 +
 .../exception/NodeDisconnectionException.java   |   2 +
 .../exception/NodeReconnectionException.java    |   2 +
 .../manager/exception/UnknownNodeException.java |   2 +
 .../nifi/cluster/integration/Cluster.java       |   6 +-
 .../integration/ClusterConnectionIT.java        |  15 +-
 .../apache/nifi/cluster/integration/Node.java   |   8 +-
 .../cluster/exception/ClusterException.java     |  39 +++++
 .../NoClusterCoordinatorException.java          |  33 ++++
 .../apache/nifi/controller/FlowController.java  |  29 ++--
 .../nifi/controller/StandardFlowService.java    |  13 ++
 .../election/CuratorLeaderElectionManager.java  | 150 +++++++++++++++----
 .../leader/election/LeaderElectionManager.java  |  22 +--
 .../StandaloneLeaderElectionManager.java        |   9 +-
 .../java/org/apache/nifi/nar/NarCloseable.java  |   2 +-
 .../src/main/resources/conf/logback.xml         |   1 +
 .../nifi/web/StandardNiFiContentAccess.java     |   2 +-
 .../StandardNiFiWebConfigurationContext.java    |   2 +-
 .../nifi/web/api/ApplicationResource.java       |   2 +-
 .../web/api/config/ClusterExceptionMapper.java  |   3 +-
 .../NoClusterCoordinatorExceptionMapper.java    |   2 +-
 29 files changed, 288 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
index 0806959..ee6e930 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java
@@ -23,7 +23,7 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
-import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
+import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java
index 03de329..c5ee671 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
+import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index b1a088e..e50d8fa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -39,10 +39,10 @@ import 
org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
 import 
org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
 import org.apache.nifi.cluster.event.Event;
 import org.apache.nifi.cluster.event.NodeEvent;
+import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import 
org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
-import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.protocol.ComponentRevision;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
index c13e9d9..bf0138e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.cluster.manager.exception;
 
+import org.apache.nifi.cluster.exception.ClusterException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
 /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java
deleted file mode 100644
index e93acca..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.exception;
-
-/**
- * The base exception class for cluster related exceptions.
- *
- */
-public class ClusterException extends RuntimeException {
-
-    public ClusterException() {
-    }
-
-    public ClusterException(String msg) {
-        super(msg);
-    }
-
-    public ClusterException(Throwable cause) {
-        super(cause);
-    }
-
-    public ClusterException(String msg, Throwable cause) {
-        super(msg, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
index e10e9c3..1ba9c8f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.cluster.manager.exception;
 
+import org.apache.nifi.cluster.exception.ClusterException;
+
 /**
  * Signals that an operation to be performed on a cluster has been invoked at 
an illegal or inappropriate time.
  *

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java
deleted file mode 100644
index 89b6722..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.cluster.manager.exception;
-
-public class NoClusterCoordinatorException extends ClusterException {
-    private static final long serialVersionUID = -1782098541351698293L;
-
-    public NoClusterCoordinatorException() {
-        super("Action cannot be performed because there is currently no 
Cluster Coordinator elected. "
-            + "The request should be tried again after a moment, after a 
Cluster Coordinator has been automatically elected.");
-    }
-
-    public NoClusterCoordinatorException(final String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
index 7a828e3..8694878 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.cluster.manager.exception;
 
+import org.apache.nifi.cluster.exception.ClusterException;
+
 /**
  * Represents the exceptional case when the cluster is unable to service a 
request because no nodes are connected.
  *

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
index 9616ad3..9198c5e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.cluster.manager.exception;
 
+import org.apache.nifi.cluster.exception.ClusterException;
+
 /**
  * Represents the exceptional case when the cluster is unable to service a 
request because no nodes returned a response. When the given request is not 
mutable the nodes are left in their previous
  * state.

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
index 2f59eed..c788f16 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.cluster.manager.exception;
 
+import org.apache.nifi.cluster.exception.ClusterException;
+
 /**
  * Represents the exceptional case when a disconnection request to a node 
failed.
  *

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
index be3edf4..e7c2baa 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.cluster.manager.exception;
 
+import org.apache.nifi.cluster.exception.ClusterException;
+
 /**
  * Represents the exceptional case when a reconnection request to a node 
failed.
  *

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
index 2d43e8a..21843cd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.cluster.manager.exception;
 
+import org.apache.nifi.cluster.exception.ClusterException;
+
 /**
  * Represents the exceptional case when a request is made for a node that does 
not exist.
  *

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
index a0c8648..ce2017e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.cluster.integration;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,7 +41,7 @@ public class Cluster {
     private final Set<Node> nodes = new HashSet<>();
     private final TestingServer zookeeperServer;
 
-    public Cluster() {
+    public Cluster() throws IOException {
         try {
             zookeeperServer = new TestingServer();
         } catch (final Exception e) {
@@ -114,7 +115,8 @@ public class Cluster {
         addProps.put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, 
getZooKeeperConnectString());
         addProps.put(NiFiProperties.CLUSTER_IS_NODE, "true");
 
-        final Node node = new 
Node(NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties",
 addProps));
+        final NiFiProperties nifiProperties = 
NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties",
 addProps);
+        final Node node = new Node(nifiProperties);
         node.start();
         nodes.add(node);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
index 3439263..45a2e42 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java
@@ -46,7 +46,7 @@ public class ClusterConnectionIT {
     }
 
     @Before
-    public void createCluster() {
+    public void createCluster() throws IOException {
         cluster = new Cluster();
         cluster.start();
     }
@@ -140,9 +140,8 @@ public class ClusterConnectionIT {
         cluster.waitForPrimaryNode(10, TimeUnit.SECONDS);
     }
 
-
     @Test(timeout = 60000)
-    public void testRestartAllNodes() throws IOException {
+    public void testRestartAllNodes() throws IOException, InterruptedException 
{
         final Node firstNode = cluster.createNode();
         final Node secondNode = cluster.createNode();
         final Node thirdNode = cluster.createNode();
@@ -164,7 +163,13 @@ public class ClusterConnectionIT {
         firstNode.start();
         secondNode.start();
 
-        cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS);
+
+        firstNode.waitUntilConnected(20, TimeUnit.SECONDS);
+        System.out.println("\n\n\n**** Node 1 Re-Connected ****\n\n\n");
+        secondNode.waitUntilConnected(10, TimeUnit.SECONDS);
+        System.out.println("**** Node 2 Re-Connected ****");
+        thirdNode.waitUntilConnected(10, TimeUnit.SECONDS);
+        System.out.println("**** Node 3 Re-Connected ****");
 
         // wait for all 3 nodes to agree that node 2 is connected
         Stream.of(firstNode, secondNode, thirdNode).forEach(node -> {
@@ -205,7 +210,7 @@ public class ClusterConnectionIT {
         otherNode.assertNodeConnects(nodeToSuspend.getIdentifier(), 10, 
TimeUnit.SECONDS);
     }
 
-    @Test
+    @Test(timeout = 60000)
     public void testNodeInheritsClusterTopologyOnHeartbeat() throws 
InterruptedException {
         final Node node1 = cluster.createNode();
         final Node node2 = cluster.createNode();

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index b9372a6..7ba718b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -87,8 +87,9 @@ public class Node {
 
     private ScheduledExecutorService executor = new FlowEngine(8, "Node 
tasks", true);
 
+
     public Node(final NiFiProperties properties) {
-        this(new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 
createPort(), "localhost", createPort(), "localhost", null, null, false, null), 
properties);
+        this(createNodeId(), properties);
     }
 
     public Node(final NodeIdentifier nodeId, final NiFiProperties properties) {
@@ -121,6 +122,10 @@ public class Node {
     }
 
 
+    private static NodeIdentifier createNodeId() {
+        return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 
createPort(), "localhost", createPort(), "localhost", null, null, false, null);
+    }
+
     public synchronized void start() {
         running = true;
 
@@ -148,6 +153,7 @@ public class Node {
                 StringEncryptor.createEncryptor(nodeProperties), 
revisionManager, Mockito.mock(Authorizer.class));
 
             flowService.start();
+
             flowService.load(null);
         } catch (Exception e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java
new file mode 100644
index 0000000..22de7c6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.exception;
+
+/**
+ * The base exception class for cluster related exceptions.
+ *
+ */
+public class ClusterException extends RuntimeException {
+
+    public ClusterException() {
+    }
+
+    public ClusterException(String msg) {
+        super(msg);
+    }
+
+    public ClusterException(Throwable cause) {
+        super(cause);
+    }
+
+    public ClusterException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java
new file mode 100644
index 0000000..10e8457
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.exception;
+
+import org.apache.nifi.cluster.exception.ClusterException;
+
+public class NoClusterCoordinatorException extends ClusterException {
+    private static final long serialVersionUID = -1782098541351698293L;
+
+    public NoClusterCoordinatorException() {
+        super("Action cannot be performed because there is currently no 
Cluster Coordinator elected. "
+            + "The request should be tried again after a moment, after a 
Cluster Coordinator has been automatically elected.");
+    }
+
+    public NoClusterCoordinatorException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 4e67558..3acba94 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -591,16 +591,26 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // kicking everyone out. This way, we instead inherit the cluster 
flow before we attempt to be
             // the coordinator.
             LOG.info("Checking if there is already a Cluster Coordinator 
Elected...");
-            final NodeIdentifier electedCoordinatorNodeId = 
clusterCoordinator.getElectedActiveCoordinatorNode();
-            if (electedCoordinatorNodeId == null) {
+            final String clusterCoordinatorAddress = 
leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
+            if (StringUtils.isEmpty(clusterCoordinatorAddress)) {
                 LOG.info("It appears that no Cluster Coordinator has been 
Elected yet. Registering for Cluster Coordinator Role.");
-                registerForClusterCoordinator();
+                registerForClusterCoordinator(true);
             } else {
-                LOG.info("The Elected Cluster Coordinator is {}. Will not 
register to be elected for this role until after connecting "
-                        + "to the cluster and inheriting the cluster's flow.", 
electedCoordinatorNodeId);
+                // At this point, we have determined that there is a Cluster 
Coordinator elected. It is important to note, though,
+                // that if we are running an embedded ZooKeeper, and we have 
just restarted the cluster (at least the nodes that run the
+                // embedded ZooKeeper), that we could possibly determine that 
the Cluster Coordinator is at an address that is not really
+                // valid. This is because the latest stable ZooKeeper does not 
support "Container ZNodes" and as a result the ZNodes that
+                // are created are persistent, not ephemeral. Upon restart, we 
can get this persisted value, even though the node that belongs
+                // to that address has not started. ZooKeeper/Curator will 
recognize this after a while and delete the ZNode. As a result,
+                // we may later determine that there is in fact no Cluster 
Coordinator. If this happens, we will automatically register for
+                // Cluster Coordinator through the StandardFlowService.
+                LOG.info("The Election for Cluster Coordinator has already 
begun (Leader is {}). Will not register to be elected for this role until after 
connecting "
+                    + "to the cluster and inheriting the cluster's flow.", 
clusterCoordinatorAddress);
+                registerForClusterCoordinator(false);
             }
 
             leaderElectionManager.start();
+            heartbeatMonitor.start();
         } else {
             heartbeater = null;
         }
@@ -3321,8 +3331,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return configuredForClustering;
     }
 
-    private void registerForClusterCoordinator() {
-        final String participantId = heartbeatMonitor.getHeartbeatAddress();
+    void registerForClusterCoordinator(final boolean participate) {
+        final String participantId = participate ? 
heartbeatMonitor.getHeartbeatAddress() : null;
 
         leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new 
LeaderElectionStateChangeListener() {
             @Override
@@ -3342,12 +3352,11 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             @Override
             public synchronized void onLeaderElection() {
                 LOG.info("This node elected Active Cluster Coordinator");
-                heartbeatMonitor.start();   // ensure heartbeat monitor is 
started
             }
         }, participantId);
     }
 
-    private void registerForPrimaryNode() {
+    void registerForPrimaryNode() {
         final String participantId = heartbeatMonitor.getHeartbeatAddress();
 
         leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new 
LeaderElectionStateChangeListener() {
@@ -3401,7 +3410,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
                     // Participate in Leader Election for Heartbeat Monitor. 
Start the heartbeat monitor
                     // if/when we become leader and stop it when we lose 
leader role
-                    registerForClusterCoordinator();
+                    registerForClusterCoordinator(true);
 
                     leaderElectionManager.start();
                     stateManagerProvider.enableClusterProvider();

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index c5f9684..af96cfd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -54,6 +54,7 @@ import 
org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
 import org.apache.nifi.cluster.protocol.DataFlow;
@@ -787,6 +788,17 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                         // we received a successful connection response from 
manager
                         break;
                     }
+                } catch (final NoClusterCoordinatorException ncce) {
+                    logger.warn("There is currently no Cluster Coordinator. 
This often happens upon restart of NiFi when running an embedded ZooKeeper. 
Will register this node "
+                        + "to become the active Cluster Coordinator and will 
attempt to connect to cluster again");
+                    controller.registerForClusterCoordinator(true);
+
+                    try {
+                        Thread.sleep(1000L);
+                    } catch (final InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        break;
+                    }
                 } catch (final Exception pe) {
                     // could not create a socket and communicate with manager
                     logger.warn("Failed to connect to cluster due to: " + pe);
@@ -798,6 +810,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                         try {
                             Thread.sleep(response == null ? 5000 : 
response.getTryLaterSeconds());
                         } catch (final InterruptedException ie) {
+                            Thread.currentThread().interrupt();
                             break;
                         }
                     } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index dfe456b..efc7366 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -31,6 +31,7 @@ import org.apache.curator.retry.RetryNTimes;
 import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.common.PathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,16 +63,7 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
 
         stopped = false;
 
-        final RetryPolicy retryPolicy = new RetryNTimes(1, 100);
-        curatorClient = CuratorFrameworkFactory.builder()
-                .connectString(zkConfig.getConnectString())
-                .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
-                .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
-                .retryPolicy(retryPolicy)
-                .defaultData(new byte[0])
-                .build();
-
-        curatorClient.start();
+        curatorClient = createClient();
 
         // Call #register for each already-registered role. This will
         // cause us to start listening for leader elections for that
@@ -85,51 +77,59 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
     }
 
     @Override
-    public synchronized void register(final String roleName) {
-        register(roleName, null);
-    }
-
-    @Override
     public void register(String roleName, LeaderElectionStateChangeListener 
listener) {
         register(roleName, listener, null);
     }
 
+    private String getElectionPath(final String roleName) {
+        final String rootPath = zkConfig.getRootPath();
+        final String leaderPath = rootPath + (rootPath.endsWith("/") ? "" : 
"/") + "leaders/" + roleName;
+        return leaderPath;
+    }
+
     @Override
     public synchronized void register(final String roleName, final 
LeaderElectionStateChangeListener listener, final String participantId) {
         logger.debug("{} Registering new Leader Selector for role {}", this, 
roleName);
 
-        if (leaderRoles.containsKey(roleName)) {
+        // If we already have a Leader Role registered and either the Leader 
Role is participating in election,
+        // or the given participant id == null (don't want to participant in 
election) then we're done.
+        final LeaderRole currentRole = leaderRoles.get(roleName);
+        if (currentRole != null && (currentRole.isParticipant() || 
participantId == null)) {
             logger.info("{} Attempted to register Leader Election for role 
'{}' but this role is already registered", this, roleName);
             return;
         }
 
-        final String rootPath = zkConfig.getRootPath();
-        final String leaderPath = rootPath + (rootPath.endsWith("/") ? "" : 
"/") + "leaders/" + roleName;
+        final String leaderPath = getElectionPath(roleName);
 
         try {
-            PathUtils.validatePath(rootPath);
+            PathUtils.validatePath(leaderPath);
         } catch (final IllegalArgumentException e) {
             throw new IllegalStateException("Cannot register leader election 
for role '" + roleName + "' because this is not a valid role name");
         }
 
         registeredRoles.put(roleName, new RegisteredRole(participantId, 
listener));
 
+        final boolean isParticipant = participantId != null && 
!participantId.trim().isEmpty();
+
         if (!isStopped()) {
             final ElectionListener electionListener = new 
ElectionListener(roleName, listener);
             final LeaderSelector leaderSelector = new 
LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, 
electionListener);
-            leaderSelector.autoRequeue();
-            if (participantId != null) {
+            if (isParticipant) {
+                leaderSelector.autoRequeue();
                 leaderSelector.setId(participantId);
+                leaderSelector.start();
             }
 
-            leaderSelector.start();
-
-            final LeaderRole leaderRole = new LeaderRole(leaderSelector, 
electionListener);
+            final LeaderRole leaderRole = new LeaderRole(leaderSelector, 
electionListener, isParticipant);
 
             leaderRoles.put(roleName, leaderRole);
         }
 
-        logger.info("{} Registered new Leader Selector for role {}", this, 
roleName);
+        if (isParticipant) {
+            logger.info("{} Registered new Leader Selector for role {}; this 
node is an active participant in the election.", this, roleName);
+        } else {
+            logger.info("{} Registered new Leader Selector for role {}; this 
node is a silent observer in the election.", this, roleName);
+        }
     }
 
     @Override
@@ -151,9 +151,15 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
     public synchronized void stop() {
         stopped = true;
 
-        for (final LeaderRole role : leaderRoles.values()) {
+        for (final Map.Entry<String, LeaderRole> entry : 
leaderRoles.entrySet()) {
+            final LeaderRole role = entry.getValue();
             final LeaderSelector selector = role.getLeaderSelector();
-            selector.close();
+
+            try {
+                selector.close();
+            } catch (final Exception e) {
+                logger.warn("Failed to close Leader Selector for {}", 
entry.getKey(), e);
+            }
         }
 
         leaderRoles.clear();
@@ -192,9 +198,13 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
 
     @Override
     public String getLeader(final String roleName) {
+        if (isStopped()) {
+            return determineLeaderExternal(roleName);
+        }
+
         final LeaderRole role = getLeaderRole(roleName);
         if (role == null) {
-            return null;
+            return determineLeaderExternal(roleName);
         }
 
         Participant participant;
@@ -217,14 +227,92 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
         return participantId;
     }
 
+
+    /**
+     * Determines whether or not leader election has already begun for the 
role with the given name
+     *
+     * @param roleName the role of interest
+     * @return <code>true</code> if leader election has already begun, 
<code>false</code> if it has not or if unable to determine this.
+     */
+    @Override
+    public boolean isLeaderElected(final String roleName) {
+        final String leaderAddress = determineLeaderExternal(roleName);
+        return !StringUtils.isEmpty(leaderAddress);
+    }
+
+
+    /**
+     * Use a new Curator client to determine which node is the elected leader 
for the given role.
+     *
+     * @param roleName the name of the role
+     * @return the id of the elected leader, or <code>null</code> if no leader 
has been selected or if unable to determine
+     *         the leader from ZooKeeper
+     */
+    private String determineLeaderExternal(final String roleName) {
+        final CuratorFramework client = createClient();
+        try {
+            final LeaderSelectorListener electionListener = new 
LeaderSelectorListener() {
+                @Override
+                public void stateChanged(CuratorFramework client, 
ConnectionState newState) {
+                }
+
+                @Override
+                public void takeLeadership(CuratorFramework client) throws 
Exception {
+                }
+            };
+
+            final String electionPath = getElectionPath(roleName);
+
+            // Note that we intentionally do not auto-requeue here, and we do 
not start the selector. We do not
+            // want to join the leader election. We simply want to observe.
+            final LeaderSelector selector = new LeaderSelector(client, 
electionPath, electionListener);
+
+            try {
+                final Participant leader = selector.getLeader();
+                return leader == null ? null : leader.getId();
+            } catch (final KeeperException.NoNodeException nne) {
+                // If there is no ZNode, then there is no elected leader.
+                return null;
+            } catch (final Exception e) {
+                logger.warn("Unable to determine the Elected Leader for role 
'{}' due to {}; assuming no leader has been elected", roleName, e.toString());
+                if (logger.isDebugEnabled()) {
+                    logger.warn("", e);
+                }
+
+                return null;
+            }
+        } finally {
+            client.close();
+        }
+    }
+
+    private CuratorFramework createClient() {
+        // Create a new client because we don't want to try indefinitely for 
this to occur.
+        final RetryPolicy retryPolicy = new RetryNTimes(1, 100);
+
+        final CuratorFramework client = CuratorFrameworkFactory.builder()
+            .connectString(zkConfig.getConnectString())
+            .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
+            .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
+            .retryPolicy(retryPolicy)
+            .defaultData(new byte[0])
+            .build();
+
+        client.start();
+        return client;
+    }
+
+
     private static class LeaderRole {
 
         private final LeaderSelector leaderSelector;
         private final ElectionListener electionListener;
+        private final boolean participant;
 
-        public LeaderRole(final LeaderSelector leaderSelector, final 
ElectionListener electionListener) {
+        public LeaderRole(final LeaderSelector leaderSelector, final 
ElectionListener electionListener, final boolean participant) {
             this.leaderSelector = leaderSelector;
             this.electionListener = electionListener;
+            this.participant = participant;
         }
 
         public LeaderSelector getLeaderSelector() {
@@ -234,6 +322,10 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
         public boolean isLeader() {
             return electionListener.isLeader();
         }
+
+        public boolean isParticipant() {
+            return participant;
+        }
     }
 
     private static class RegisteredRole {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
index ef36528..d9d4e71 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
@@ -24,14 +24,9 @@ public interface LeaderElectionManager {
     void start();
 
     /**
-     * Adds a new role for which a leader is required
-     *
-     * @param roleName the name of the role
-     */
-    void register(String roleName);
-
-    /**
-     * Adds a new role for which a leader is required, without providing a 
Participant ID
+     * Adds a new role for which a leader is required, without participating 
in the leader election. I.e., this node
+     * will not be elected leader but will passively observe changes to the 
leadership. This allows calls to {@link #isLeader(String)}
+     * and {@link #getLeader(String)} to know which node is currently elected 
the leader.
      *
      * @param roleName the name of the role
      * @param listener a listener that will be called when the node gains or 
relinquishes
@@ -40,7 +35,8 @@ public interface LeaderElectionManager {
     void register(String roleName, LeaderElectionStateChangeListener listener);
 
     /**
-     * Adds a new role for which a leader is required, providing the given 
value for this node as the Participant ID
+     * Adds a new role for which a leader is required, providing the given 
value for this node as the Participant ID. If the Participant ID
+     * is <code>null</code>, this node will never be elected leader but will 
passively observe changes to the leadership.
      *
      * @param roleName the name of the role
      * @param listener a listener that will be called when the node gains or 
relinquishes
@@ -90,4 +86,12 @@ public interface LeaderElectionManager {
      * again, all previously registered roles will still be registered.
      */
     void stop();
+
+    /**
+     * Returns <code>true</code> if a leader has been elected for the given 
role, <code>false</code> otherwise.
+     *
+     * @param roleName the name of the role
+     * @return <code>true</code> if a leader has been elected, 
<code>false</code> otherwise.
+     */
+    boolean isLeaderElected(String roleName);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
index a2ed86e..182e83a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
@@ -29,10 +29,6 @@ public class StandaloneLeaderElectionManager implements 
LeaderElectionManager {
     }
 
     @Override
-    public void register(final String roleName) {
-    }
-
-    @Override
     public void register(final String roleName, final 
LeaderElectionStateChangeListener listener) {
     }
 
@@ -62,4 +58,9 @@ public class StandaloneLeaderElectionManager implements 
LeaderElectionManager {
     @Override
     public void stop() {
     }
+
+    @Override
+    public boolean isLeaderElected(String roleName) {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
index cbb96b1..c65501c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
@@ -45,7 +45,7 @@ public class NarCloseable implements Closeable {
             frameworkClassLoader = 
NarClassLoaders.getInstance().getFrameworkClassLoader();
         } catch (final Exception e) {
             // This should never happen in a running instance, but it will 
occur in unit tests
-            logger.error("Unable to access Framework ClassLoader due to " + e 
+ ". Will continue without change ClassLoaders.");
+            logger.error("Unable to access Framework ClassLoader due to " + e 
+ ". Will continue without changing ClassLoaders.");
             if (logger.isDebugEnabled()) {
                 logger.error("", e);
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
index 871265e..1a52cc6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
@@ -96,6 +96,7 @@
     <logger name="org.apache.zookeeper.server.NIOServerCnxnFactory" 
level="ERROR" />
     <logger name="org.apache.zookeeper.server.quorum" level="ERROR" />
     <logger name="org.apache.zookeeper.ZooKeeper" level="ERROR" />
+    <logger name="org.apache.zookeeper.server.PrepRequestProcessor" 
level="ERROR" />
 
     <logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" 
level="OFF" />
     <logger name="org.apache.curator.ConnectionState" level="OFF" />

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
index e17b147..c39fbc3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java
@@ -23,9 +23,9 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AccessDeniedException;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
+import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
-import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.util.NiFiProperties;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
index 8269d79..021f216 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
@@ -40,9 +40,9 @@ import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
+import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
-import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.reporting.ReportingTaskProvider;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index 880186d..b233a35 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -30,8 +30,8 @@ import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
+import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.manager.NodeResponse;
-import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.Snippet;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java
index 2a67cf8..dd332f0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java
@@ -19,7 +19,8 @@ package org.apache.nifi.web.api.config;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ExceptionMapper;
 import javax.ws.rs.ext.Provider;
-import org.apache.nifi.cluster.manager.exception.ClusterException;
+
+import org.apache.nifi.cluster.exception.ClusterException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
index 4b15a70..c052c88 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java
@@ -20,7 +20,7 @@ package org.apache.nifi.web.api.config;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ExceptionMapper;
 
-import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException;
+import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
 import org.apache.nifi.util.StringUtils;
 import org.slf4j.Logger;

Reply via email to