Repository: nifi
Updated Branches:
  refs/heads/master d094130a2 -> c1c052af7


NIFI-2406: Ensure that hearbeat monitor continues to run while instance is 
running. This way if a node sends heartbeat to this node as elected coordinator 
changes, we notify the node accordingly. Handle Exceptions more gracefully in 
leader election code.  Tweaked some handling of how nodes reconnect to the 
cluster to ensure more stability with cluster

Signed-off-by: Yolanda M. Davis <[email protected]>

This closes #729


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c1c052af
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c1c052af
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c1c052af

Branch: refs/heads/master
Commit: c1c052af71d4cfc8a40974e1482d5b0f3c78c902
Parents: d094130
Author: Mark Payne <[email protected]>
Authored: Wed Jul 27 11:55:02 2016 -0400
Committer: Yolanda M. Davis <[email protected]>
Committed: Mon Aug 8 09:15:10 2016 -0400

----------------------------------------------------------------------
 .../ClusterCoordinationProtocolSender.java      |   8 ++
 ...usterCoordinationProtocolSenderListener.java |   8 +-
 .../protocol/impl/SocketProtocolListener.java   |  31 +++++-
 ...andardClusterCoordinationProtocolSender.java |  47 +++++++++
 .../message/NodeConnectionStatusAdapter.java    |  16 +--
 .../protocol/jaxb/message/ObjectFactory.java    |  11 ++
 .../NodeConnectionStatusRequestMessage.java     |  30 ++++++
 .../NodeConnectionStatusResponseMessage.java    |  40 ++++++++
 .../protocol/message/ProtocolMessage.java       |   2 +
 .../jaxb/message/TestJaxbProtocolUtils.java     |  31 ++++++
 .../heartbeat/AbstractHeartbeatMonitor.java     |  25 +++--
 .../node/NodeClusterCoordinator.java            | 100 +++++++++++++------
 .../node/TestNodeClusterCoordinator.java        |  16 ---
 .../apache/nifi/controller/FlowController.java  |  14 ++-
 .../election/CuratorLeaderElectionManager.java  |  16 ++-
 15 files changed, 326 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
index b49f57c..986231e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
@@ -18,6 +18,7 @@ package org.apache.nifi.cluster.protocol;
 
 import java.util.Set;
 
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -61,4 +62,11 @@ public interface ClusterCoordinationProtocolSender {
      * @param msg the message that indicates which node's status changed and 
what it changed to
      */
     void notifyNodeStatusChange(Set<NodeIdentifier> nodesToNotify, 
NodeStatusChangeMessage msg);
+
+    /**
+     * Sends a request to the given hostname and port to request its 
connection status
+     *
+     * @return the connection status returned from the node at the given 
hostname & port
+     */
+    NodeConnectionStatus requestNodeConnectionStatus(String hostname, int 
port);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java
index e97712a..ae3a0e5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Set;
 
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
@@ -105,7 +106,12 @@ public class ClusterCoordinationProtocolSenderListener 
implements ClusterCoordin
     }
 
     @Override
-    public void notifyNodeStatusChange(Set<NodeIdentifier> nodesToNotify, 
NodeStatusChangeMessage msg) {
+    public void notifyNodeStatusChange(final Set<NodeIdentifier> 
nodesToNotify, final NodeStatusChangeMessage msg) {
         sender.notifyNodeStatusChange(nodesToNotify, msg);
     }
+
+    @Override
+    public NodeConnectionStatus requestNodeConnectionStatus(final String 
hostname, final int port) {
+        return sender.requestNodeConnectionStatus(hostname, port);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
index c6a4883..8958988 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
@@ -26,13 +26,19 @@ import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
 import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.io.socket.ServerSocketConfiguration;
 import org.apache.nifi.io.socket.SocketListener;
@@ -173,8 +179,10 @@ public class SocketProtocolListener extends SocketListener 
implements ProtocolLi
             }
 
             stopWatch.stop();
+            final NodeIdentifier nodeId = getNodeIdentifier(request);
+            final String from = nodeId == null ? hostname : nodeId.toString();
             logger.info("Finished processing request {} (type={}, length={} 
bytes) from {} in {} millis",
-                requestId, request.getType(), receivedMessage.length, 
hostname, stopWatch.getDuration(TimeUnit.MILLISECONDS));
+                requestId, request.getType(), receivedMessage.length, from, 
stopWatch.getDuration(TimeUnit.MILLISECONDS));
         } catch (final IOException | ProtocolException e) {
             logger.warn("Failed processing protocol message from " + hostname 
+ " due to " + e, e);
 
@@ -185,6 +193,27 @@ public class SocketProtocolListener extends SocketListener 
implements ProtocolLi
         }
     }
 
+    private NodeIdentifier getNodeIdentifier(final ProtocolMessage message) {
+        if (message == null) {
+            return null;
+        }
+
+        switch (message.getType()) {
+            case CONNECTION_REQUEST:
+                return ((ConnectionRequestMessage) 
message).getConnectionRequest().getProposedNodeIdentifier();
+            case HEARTBEAT:
+                return ((HeartbeatMessage) 
message).getHeartbeat().getNodeIdentifier();
+            case DISCONNECTION_REQUEST:
+                return ((DisconnectMessage) message).getNodeId();
+            case FLOW_REQUEST:
+                return ((FlowRequestMessage) message).getNodeId();
+            case RECONNECTION_REQUEST:
+                return ((ReconnectionRequestMessage) message).getNodeId();
+            default:
+                return null;
+        }
+    }
+
     private String getRequestorDN(Socket socket) {
         try {
             return CertificateUtils.extractPeerDNFromSSLSocket(socket);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
index 4b58886..e809961 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -28,6 +29,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolContext;
@@ -35,6 +37,8 @@ import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
 import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
+import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
@@ -45,6 +49,8 @@ import org.apache.nifi.io.socket.SocketUtils;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A protocol sender for sending protocol messages from the cluster manager to
@@ -57,6 +63,7 @@ import org.apache.nifi.util.NiFiProperties;
  *
  */
 public class StandardClusterCoordinationProtocolSender implements 
ClusterCoordinationProtocolSender {
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardClusterCoordinationProtocolSender.class);
 
     private final ProtocolContext<ProtocolMessage> protocolContext;
     private final SocketConfiguration socketConfiguration;
@@ -183,6 +190,44 @@ public class StandardClusterCoordinationProtocolSender 
implements ClusterCoordin
     }
 
     @Override
+    public NodeConnectionStatus requestNodeConnectionStatus(final String 
hostname, final int port) {
+        Objects.requireNonNull(hostname);
+
+        final NodeConnectionStatusRequestMessage msg = new 
NodeConnectionStatusRequestMessage();
+
+        final byte[] msgBytes;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            final ProtocolMessageMarshaller<ProtocolMessage> marshaller = 
protocolContext.createMarshaller();
+            marshaller.marshal(msg, baos);
+            msgBytes = baos.toByteArray();
+        } catch (final IOException e) {
+            throw new ProtocolException("Failed to marshal 
NodeIdentifierRequestMessage", e);
+        }
+
+        try (final Socket socket = createSocket(hostname, port, true)) {
+            // marshal message to output stream
+            socket.getOutputStream().write(msgBytes);
+
+            final ProtocolMessage response;
+            try {
+                // unmarshall response and return
+                final ProtocolMessageUnmarshaller<ProtocolMessage> 
unmarshaller = protocolContext.createUnmarshaller();
+                response = unmarshaller.unmarshal(socket.getInputStream());
+            } catch (final IOException ioe) {
+                throw new ProtocolException("Failed unmarshalling '" + 
MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe);
+            }
+
+            if (MessageType.NODE_CONNECTION_STATUS_RESPONSE == 
response.getType()) {
+                return ((NodeConnectionStatusResponseMessage) 
response).getNodeConnectionStatus();
+            } else {
+                throw new ProtocolException("Expected message type '" + 
MessageType.NODE_CONNECTION_STATUS_RESPONSE + "' but found '" + 
response.getType() + "'");
+            }
+        } catch (final IOException ioe) {
+            throw new ProtocolException("Failed to request Node Identifer from 
" + hostname + ":" + port, ioe);
+        }
+    }
+
+    @Override
     public void notifyNodeStatusChange(final Set<NodeIdentifier> 
nodesToNotify, final NodeStatusChangeMessage msg) {
         if (nodesToNotify.isEmpty()) {
             return;
@@ -222,6 +267,8 @@ public class StandardClusterCoordinationProtocolSender 
implements ClusterCoordin
                     } catch (final IOException ioe) {
                         throw new ProtocolException("Failed to send Node 
Status Change message to " + nodeId, ioe);
                     }
+
+                    logger.debug("Notified {} of status change {}", nodeId, 
msg);
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
index 0093c3e..21d0bda 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
@@ -37,13 +37,15 @@ public class NodeConnectionStatusAdapter extends 
XmlAdapter<AdaptedNodeConnectio
     @Override
     public AdaptedNodeConnectionStatus marshal(final NodeConnectionStatus 
toAdapt) throws Exception {
         final AdaptedNodeConnectionStatus adapted = new 
AdaptedNodeConnectionStatus();
-        adapted.setUpdateId(toAdapt.getUpdateIdentifier());
-        adapted.setNodeId(toAdapt.getNodeIdentifier());
-        adapted.setConnectionRequestTime(toAdapt.getConnectionRequestTime());
-        adapted.setDisconnectCode(toAdapt.getDisconnectCode());
-        adapted.setDisconnectReason(toAdapt.getDisconnectReason());
-        adapted.setState(toAdapt.getState());
-        adapted.setRoles(toAdapt.getRoles());
+        if (toAdapt != null) {
+            adapted.setUpdateId(toAdapt.getUpdateIdentifier());
+            adapted.setNodeId(toAdapt.getNodeIdentifier());
+            
adapted.setConnectionRequestTime(toAdapt.getConnectionRequestTime());
+            adapted.setDisconnectCode(toAdapt.getDisconnectCode());
+            adapted.setDisconnectReason(toAdapt.getDisconnectReason());
+            adapted.setState(toAdapt.getState());
+            adapted.setRoles(toAdapt.getRoles());
+        }
         return adapted;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index 82df546..da13b02 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -25,6 +25,8 @@ import 
org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
+import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
+import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.PingMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -86,4 +88,13 @@ public class ObjectFactory {
     public NodeStatusChangeMessage createNodeStatusChangeMessage() {
         return new NodeStatusChangeMessage();
     }
+
+    public NodeConnectionStatusRequestMessage 
createNodeConnectionStatusRequestMessage() {
+        return new NodeConnectionStatusRequestMessage();
+    }
+
+    public NodeConnectionStatusResponseMessage 
createNodeConnectionStatusResponsetMessage() {
+        return new NodeConnectionStatusResponseMessage();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeConnectionStatusRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeConnectionStatusRequestMessage.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeConnectionStatusRequestMessage.java
new file mode 100644
index 0000000..ce32ab2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeConnectionStatusRequestMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "nodeConnectionStatusRequestMessage")
+public class NodeConnectionStatusRequestMessage extends ProtocolMessage {
+
+    @Override
+    public MessageType getType() {
+        return MessageType.NODE_CONNECTION_STATUS_REQUEST;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeConnectionStatusResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeConnectionStatusResponseMessage.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeConnectionStatusResponseMessage.java
new file mode 100644
index 0000000..1d43728
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/NodeConnectionStatusResponseMessage.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.message;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+
+@XmlRootElement(name = "nodeIdentifierResponseMessage")
+public class NodeConnectionStatusResponseMessage extends ProtocolMessage {
+    private NodeConnectionStatus nodeConnectionStatus;
+
+    @Override
+    public MessageType getType() {
+        return MessageType.NODE_CONNECTION_STATUS_RESPONSE;
+    }
+
+    public NodeConnectionStatus getNodeConnectionStatus() {
+        return nodeConnectionStatus;
+    }
+
+    public void setNodeConnectionStatus(final NodeConnectionStatus 
connectionStatus) {
+        this.nodeConnectionStatus = connectionStatus;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index 28cef5d..2e74689 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -32,6 +32,8 @@ public abstract class ProtocolMessage {
         RECONNECTION_RESPONSE,
         SERVICE_BROADCAST,
         HEARTBEAT,
+        NODE_CONNECTION_STATUS_REQUEST,
+        NODE_CONNECTION_STATUS_RESPONSE,
         NODE_STATUS_CHANGE;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
index 955df17..21636a4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/jaxb/message/TestJaxbProtocolUtils.java
@@ -35,6 +35,8 @@ import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
+import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
 import org.apache.nifi.web.Revision;
 import org.junit.Test;
 
@@ -65,4 +67,33 @@ public class TestJaxbProtocolUtils {
         assertEquals(revisions, 
unmarshalledMsg.getConnectionResponse().getComponentRevisions());
     }
 
+    @Test
+    public void testRoundTripConnectionStatusRequest() throws JAXBException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final NodeConnectionStatusRequestMessage msg = new 
NodeConnectionStatusRequestMessage();
+
+        JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
+        final Object unmarshalled = 
JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new 
ByteArrayInputStream(baos.toByteArray()));
+        assertTrue(unmarshalled instanceof NodeConnectionStatusRequestMessage);
+    }
+
+
+    @Test
+    public void testRoundTripConnectionStatusResponse() throws JAXBException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final NodeConnectionStatusResponseMessage msg = new 
NodeConnectionStatusResponseMessage();
+        final NodeIdentifier nodeId = new NodeIdentifier("id", "localhost", 
8000, "localhost", 8001, "localhost", 8002, 8003, true);
+        final NodeConnectionStatus nodeStatus = new 
NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
+        msg.setNodeConnectionStatus(nodeStatus);
+
+        JaxbProtocolUtils.JAXB_CONTEXT.createMarshaller().marshal(msg, baos);
+        final Object unmarshalled = 
JaxbProtocolUtils.JAXB_CONTEXT.createUnmarshaller().unmarshal(new 
ByteArrayInputStream(baos.toByteArray()));
+        assertTrue(unmarshalled instanceof 
NodeConnectionStatusResponseMessage);
+        final NodeConnectionStatusResponseMessage unmarshalledMsg = 
(NodeConnectionStatusResponseMessage) unmarshalled;
+
+        final NodeConnectionStatus unmarshalledStatus = 
unmarshalledMsg.getNodeConnectionStatus();
+        assertEquals(nodeStatus, unmarshalledStatus);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 22c9ab5..116ef3e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -57,7 +57,8 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
     @Override
     public synchronized final void start() {
         if (!stopped) {
-            throw new IllegalStateException("Heartbeat Monitor cannot be 
started because it is already started");
+            logger.debug("Attempted to start Heartbeat Monitor but it is 
already started");
+            return;
         }
 
         stopped = false;
@@ -150,8 +151,20 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
         final long threshold = System.currentTimeMillis() - maxMillis;
         for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
             if (heartbeat.getTimestamp() < threshold) {
-                
clusterCoordinator.requestNodeDisconnect(heartbeat.getNodeIdentifier(), 
DisconnectionCode.LACK_OF_HEARTBEAT,
-                    "Latest heartbeat from Node has expired");
+                final long secondsSinceLastHeartbeat = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
heartbeat.getTimestamp());
+
+                if (!clusterCoordinator.isActiveClusterCoordinator()) {
+                    // Occasionally Curator appears to not notify us that we 
have lost the elected leader role, or does so
+                    // on a very large delay. So before we kick the node out 
of the cluster, we want to first check what the
+                    // ZNode in ZooKeeper says, and ensure that this is the 
node that is being advertised as the appropriate
+                    // destination for heartbeats.
+                    logger.debug("Have not received a heartbeat from node in " 
+ secondsSinceLastHeartbeat +
+                        " seconds but it appears that this node is no longer 
the actively elected cluster coordinator. Will not request that node 
disconnect.");
+                    return;
+                }
+
+                
clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), 
DisconnectionCode.LACK_OF_HEARTBEAT,
+                    "Have not received a heartbeat from node in " + 
secondsSinceLastHeartbeat + " seconds");
 
                 try {
                     removeHeartbeat(heartbeat.getNodeIdentifier());
@@ -206,20 +219,20 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
             switch (disconnectionCode) {
                 case LACK_OF_HEARTBEAT:
                 case UNABLE_TO_COMMUNICATE:
-                case NODE_SHUTDOWN:
                 case NOT_YET_CONNECTED:
                 case STARTUP_FAILURE: {
                     clusterCoordinator.reportEvent(nodeId, Severity.INFO, 
"Received heartbeat from node previously "
                         + "disconnected due to " + disconnectionCode + ". 
Issuing reconnection request.");
 
                     clusterCoordinator.requestNodeConnect(nodeId, null);
+                    break;
                 }
                 default: {
                     // disconnected nodes should not heartbeat, so we need to 
issue a disconnection request.
                     logger.info("Ignoring received heartbeat from disconnected 
node " + nodeId + ".  Issuing disconnection request.");
-                    clusterCoordinator.requestNodeDisconnect(nodeId, 
DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE,
-                        
DisconnectionCode.HEARTBEAT_RECEIVED_FROM_DISCONNECTED_NODE.toString());
+                    clusterCoordinator.requestNodeDisconnect(nodeId, 
disconnectionCode, connectionStatus.getDisconnectReason());
                     removeHeartbeat(nodeId);
+                    break;
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 514d928..0c94a66 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -61,6 +61,7 @@ import 
org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderLi
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
+import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
@@ -185,7 +186,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             }).forPath(coordinatorPath);
             final String address = coordinatorAddress = new 
String(coordinatorAddressBytes, StandardCharsets.UTF_8);
 
-            logger.info("Determined that Cluster Coordinator is located at {}; 
will use this address for sending heartbeat messages", address);
+            logger.info("Determined that Cluster Coordinator is located at 
{}", address);
             return address;
         } catch (Exception e) {
             throw new IOException("Unable to determine Cluster Coordinator 
from ZooKeeper", e);
@@ -206,12 +207,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             boolean updated = false;
             while (!updated) {
                 final NodeConnectionStatus currentStatus = 
nodeStatuses.get(nodeId);
-
-                if (currentStatus == null || 
proposedStatus.getUpdateIdentifier() > currentStatus.getUpdateIdentifier()) {
-                    updated = replaceNodeStatus(nodeId, currentStatus, 
proposedStatus);
-                } else {
-                    updated = true;
-                }
+                updated = replaceNodeStatus(nodeId, currentStatus, 
proposedStatus);
             }
         }
     }
@@ -325,6 +321,9 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             case UNKNOWN:
                 severity = Severity.ERROR;
                 break;
+            case LACK_OF_HEARTBEAT:
+                severity = Severity.WARNING;
+                break;
             default:
                 severity = Severity.INFO;
                 break;
@@ -569,7 +568,31 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             .orElse(null);
 
         if (electedNodeId == null && warnOnError) {
-            logger.warn("Failed to determine which node is elected active 
Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node 
with this address", electedNodeAddress);
+            logger.debug("Failed to determine which node is elected active 
Cluster Coordinator: ZooKeeper reports the address as {},"
+                + "but there is no node with this address. Will attempt to 
communicate with node to determine its information", electedNodeAddress);
+
+            try {
+                final NodeConnectionStatus connectionStatus = 
senderListener.requestNodeConnectionStatus(electedNodeHostname, 
electedNodePort);
+                logger.debug("Received NodeConnectionStatus {}", 
connectionStatus);
+
+                if (connectionStatus == null) {
+                    return null;
+                }
+
+                final NodeConnectionStatus existingStatus = 
this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier(), 
connectionStatus);
+                if (existingStatus == null) {
+                    return connectionStatus.getNodeIdentifier();
+                } else {
+                    return existingStatus.getNodeIdentifier();
+                }
+            } catch (final Exception e) {
+                logger.warn("Failed to determine which node is elected active 
Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node 
with this address. "
+                    + "Attempted to determine the node's information but 
failed to retrieve its information due to {}", electedNodeAddress, 
e.toString());
+
+                if (logger.isDebugEnabled()) {
+                    logger.warn("", e);
+                }
+            }
         }
 
         return electedNodeId;
@@ -639,7 +662,15 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             // to broadcast to the cluster that this node is no longer the 
coordinator. Otherwise, all nodes but this one will still
             // believe that this node is connected to the cluster.
             final boolean notifyAllNodes = isActiveClusterCoordinator() || 
(currentStatus != null && 
currentStatus.getRoles().contains(ClusterRoles.CLUSTER_COORDINATOR));
+            if (notifyAllNodes) {
+                logger.debug("Notifying all nodes that status changed from {} 
to {}", currentStatus, status);
+            } else {
+                logger.debug("Notifying cluster coordinator that node status 
changed from {} to {}", currentStatus, status);
+            }
+
             notifyOthersOfNodeStatusChange(status, notifyAllNodes);
+        } else {
+            logger.debug("Not notifying other nodes that status changed 
because previous state of {} is same as new state of {}", currentState, 
status.getState());
         }
     }
 
@@ -766,11 +797,20 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             case NODE_STATUS_CHANGE:
                 handleNodeStatusChange((NodeStatusChangeMessage) 
protocolMessage);
                 return null;
+            case NODE_CONNECTION_STATUS_REQUEST:
+                return handleNodeConnectionStatusRequest();
             default:
                 throw new ProtocolException("Cannot handle Protocol Message " 
+ protocolMessage + " because it is not of the correct type");
         }
     }
 
+    private NodeConnectionStatusResponseMessage 
handleNodeConnectionStatusRequest() {
+        final NodeConnectionStatus connectionStatus = 
nodeStatuses.get(getLocalNodeIdentifier());
+        final NodeConnectionStatusResponseMessage msg = new 
NodeConnectionStatusResponseMessage();
+        msg.setNodeConnectionStatus(connectionStatus);
+        return msg;
+    }
+
     private String summarizeStatusChange(final NodeConnectionStatus oldStatus, 
final NodeConnectionStatus status) {
         final StringBuilder sb = new StringBuilder();
 
@@ -828,33 +868,28 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         boolean updated = false;
         while (!updated) {
             final NodeConnectionStatus oldStatus = 
nodeStatuses.get(statusChangeMessage.getNodeId());
-            if (oldStatus == null || updatedStatus.getUpdateIdentifier() >= 
oldStatus.getUpdateIdentifier()) {
-                // Either remove the value from the map or update the map 
depending on the connection state
-                if (statusChangeMessage.getNodeConnectionStatus().getState() 
== NodeConnectionState.REMOVED) {
-                    updated = nodeStatuses.remove(nodeId, oldStatus);
-                } else {
-                    updated = replaceNodeStatus(nodeId, oldStatus, 
updatedStatus);
-                }
 
-                if (updated) {
-                    logger.info("Status of {} changed from {} to {}", 
statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
+            // Either remove the value from the map or update the map 
depending on the connection state
+            if (statusChangeMessage.getNodeConnectionStatus().getState() == 
NodeConnectionState.REMOVED) {
+                updated = nodeStatuses.remove(nodeId, oldStatus);
+            } else {
+                updated = replaceNodeStatus(nodeId, oldStatus, updatedStatus);
+            }
 
-                    final NodeConnectionStatus status = 
statusChangeMessage.getNodeConnectionStatus();
-                    final String summary = summarizeStatusChange(oldStatus, 
status);
-                    if (!StringUtils.isEmpty(summary)) {
-                        addNodeEvent(nodeId, summary);
-                    }
+            if (updated) {
+                logger.info("Status of {} changed from {} to {}", 
statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
 
-                    // Update our counter so that we are in-sync with the 
cluster on the
-                    // most up-to-date version of the NodeConnectionStatus' 
Update Identifier.
-                    // We do this so that we can accurately compare status 
updates that are generated
-                    // locally against those generated from other nodes in the 
cluster.
-                    
NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier());
+                final NodeConnectionStatus status = 
statusChangeMessage.getNodeConnectionStatus();
+                final String summary = summarizeStatusChange(oldStatus, 
status);
+                if (!StringUtils.isEmpty(summary)) {
+                    addNodeEvent(nodeId, summary);
                 }
-            } else {
-                updated = true;
-                logger.info("Received Node Status update that indicates that 
{} should change to {} but disregarding because the current state of {} is 
newer",
-                    nodeId, updatedStatus, oldStatus);
+
+                // Update our counter so that we are in-sync with the cluster 
on the
+                // most up-to-date version of the NodeConnectionStatus' Update 
Identifier.
+                // We do this so that we can accurately compare status updates 
that are generated
+                // locally against those generated from other nodes in the 
cluster.
+                
NodeConnectionStatus.updateIdGenerator(updatedStatus.getUpdateIdentifier());
             }
         }
 
@@ -952,7 +987,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     @Override
     public boolean canHandle(final ProtocolMessage msg) {
-        return MessageType.CONNECTION_REQUEST == msg.getType() || 
MessageType.NODE_STATUS_CHANGE == msg.getType();
+        return MessageType.CONNECTION_REQUEST == msg.getType() || 
MessageType.NODE_STATUS_CHANGE == msg.getType()
+            || MessageType.NODE_CONNECTION_STATUS_REQUEST == msg.getType();
     }
 
     private boolean isMutableRequest(final String method) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index 6f0ad0f..319bd84 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -396,21 +395,6 @@ public class TestNodeClusterCoordinator {
         // Ensure that no status change message was send
         Thread.sleep(1000);
         assertTrue(nodeStatuses.isEmpty());
-
-        // Status should not have changed because our status id is too small.
-        NodeConnectionStatus curStatus = 
coordinator.getConnectionStatus(nodeId1);
-        assertEquals(NodeConnectionState.CONNECTED, curStatus.getState());
-
-        // Verify that resetMap updates only the newer statuses
-        final NodeConnectionStatus node2Disconnecting = new 
NodeConnectionStatus(nodeId2, NodeConnectionState.DISCONNECTING, 
Collections.emptySet());
-        final Map<NodeIdentifier, NodeConnectionStatus> resetMap = new 
HashMap<>();
-        resetMap.put(nodeId1, oldStatus);
-        resetMap.put(nodeId2, node2Disconnecting);
-        coordinator.resetNodeStatuses(resetMap);
-
-        curStatus = coordinator.getConnectionStatus(nodeId1);
-        assertEquals(NodeConnectionState.CONNECTED, curStatus.getState());
-        assertEquals(NodeConnectionState.DISCONNECTING, 
coordinator.getConnectionStatus(nodeId2).getState());
     }
 
     @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 6b952f3..4762029 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -605,6 +605,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             leaderElectionManager = null;
             heartbeater = null;
         }
+
+        if (heartbeatMonitor != null) {
+            heartbeatMonitor.start();
+        }
     }
 
     @Override
@@ -617,10 +621,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return ResourceFactory.getControllerResource();
     }
 
-    public HeartbeatMonitor getHeartbeatMonitor() {
-        return heartbeatMonitor;
-    }
-
     private static FlowFileRepository createFlowFileRepository(final 
NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
         final String implementationClassName = 
properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, 
DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
@@ -1297,6 +1297,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 leaderElectionManager.stop();
             }
 
+            if (heartbeatMonitor != null) {
+                heartbeatMonitor.stop();
+            }
+
             if (kill) {
                 this.timerDrivenEngineRef.get().shutdownNow();
                 this.eventDrivenEngineRef.get().shutdownNow();
@@ -3311,6 +3315,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new 
LeaderElectionStateChangeListener() {
             @Override
             public synchronized void onLeaderRelinquish() {
+                LOG.info("This node is no longer the elected Active Cluster 
Coordinator");
                 heartbeatMonitor.stop();
 
                 if (clusterCoordinator != null) {
@@ -3320,6 +3325,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
             @Override
             public synchronized void onLeaderElection() {
+                LOG.info("This node elected Active Cluster Coordinator");
                 heartbeatMonitor.start();
 
                 if (clusterCoordinator != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1c052af/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index 94c83df..9d076c0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -221,7 +221,14 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
             logger.info("{} This node has been elected Leader for Role '{}'", 
this, roleName);
 
             if (listener != null) {
-                listener.onLeaderElection();
+                try {
+                    listener.onLeaderElection();
+                } catch (final Exception e) {
+                    logger.error("This node was elected Leader for Role '{}' 
but failed to take leadership. Will relinquish leadership role. Failure was due 
to: {}", roleName, e);
+                    logger.error("", e);
+                    leader = false;
+                    return;
+                }
             }
 
             // Curator API states that we lose the leadership election when we 
return from this method,
@@ -241,7 +248,12 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
                 logger.info("{} This node is no longer leader for role '{}'", 
this, roleName);
 
                 if (listener != null) {
-                    listener.onLeaderRelinquish();
+                    try {
+                        listener.onLeaderRelinquish();
+                    } catch (final Exception e) {
+                        logger.error("This node is no longer leader for role 
'{}' but failed to shutdown leadership responsibilities properly due to: {}", 
roleName, e);
+                        logger.error("", e);
+                    }
                 }
             }
         }

Reply via email to