Repository: nifi
Updated Branches:
  refs/heads/master a1b07b1e9 -> ded396f0e


NIFI-3933:
- When monitoring heartbeats use the connected nodes as the basis for the 
check. This addresses the case when a node is terminated and no corresponding 
heartbeats exist.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d33c4c72
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d33c4c72
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d33c4c72

Branch: refs/heads/master
Commit: d33c4c72d49979ab04db489e429dd202d51585b1
Parents: a1b07b1
Author: Matt Gilman <[email protected]>
Authored: Mon May 22 15:28:30 2017 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon May 22 16:50:30 2017 -0400

----------------------------------------------------------------------
 .../heartbeat/HeartbeatMonitor.java             |  7 ++
 .../heartbeat/AbstractHeartbeatMonitor.java     | 41 ++++++++---
 .../ClusterProtocolHeartbeatMonitor.java        | 37 ++++++----
 .../heartbeat/TestAbstractHeartbeatMonitor.java | 75 +++++++++++++++-----
 4 files changed, 119 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d33c4c72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
index 6a0937d..3cc5fd0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
@@ -59,6 +59,13 @@ public interface HeartbeatMonitor {
     void purgeHeartbeats();
 
     /**
+     * Returns when the heartbeats were purged last.
+     *
+     * @return when the heartbeats were purged last
+     */
+    long getPurgeTimestamp();
+
+    /**
      * @return the address that heartbeats should be sent to when this node is 
elected coordinator.
      */
     String getHeartbeatAddress();

http://git-wip-us.apache.org/repos/asf/nifi/blob/d33c4c72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 5119dac..c5d9e4b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -154,19 +154,38 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
 
         // Disconnect any node that hasn't sent a heartbeat in a long time (8 
times the heartbeat interval)
         final long maxMillis = heartbeatIntervalMillis * 8;
-        final long threshold = System.currentTimeMillis() - maxMillis;
-        for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
-            if (heartbeat.getTimestamp() < threshold) {
-                final long secondsSinceLastHeartbeat = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
heartbeat.getTimestamp());
+        final long currentTimestamp = System.currentTimeMillis();
+        final long threshold = currentTimestamp - maxMillis;
 
-                
clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), 
DisconnectionCode.LACK_OF_HEARTBEAT,
-                        "Have not received a heartbeat from node in " + 
secondsSinceLastHeartbeat + " seconds");
+        // consider all connected nodes
+        for (final NodeIdentifier nodeIdentifier : 
clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED)) {
+            final NodeHeartbeat heartbeat = 
latestHeartbeats.get(nodeIdentifier);
 
-                try {
-                    removeHeartbeat(heartbeat.getNodeIdentifier());
-                } catch (final Exception e) {
-                    logger.warn("Failed to remove heartbeat for {} due to {}", 
heartbeat.getNodeIdentifier(), e.toString());
-                    logger.warn("", e);
+            // consider the most recent heartbeat for this node
+            if (heartbeat == null) {
+                final long purgeTimestamp = getPurgeTimestamp();
+
+                // if there is no heartbeat for this node, see if we purged 
the heartbeats beyond the allowed heartbeat threshold
+                if (purgeTimestamp < threshold) {
+                    final long secondsSinceLastPurge = 
TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - purgeTimestamp);
+
+                    
clusterCoordinator.disconnectionRequestedByNode(nodeIdentifier, 
DisconnectionCode.LACK_OF_HEARTBEAT,
+                            "Have not received a heartbeat from node in " + 
secondsSinceLastPurge + " seconds");
+                }
+            } else {
+                // see if the heartbeat occurred before the allowed heartbeat 
threshold
+                if (heartbeat.getTimestamp() < threshold) {
+                    final long secondsSinceLastHeartbeat = 
TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - heartbeat.getTimestamp());
+
+                    
clusterCoordinator.disconnectionRequestedByNode(nodeIdentifier, 
DisconnectionCode.LACK_OF_HEARTBEAT,
+                            "Have not received a heartbeat from node in " + 
secondsSinceLastHeartbeat + " seconds");
+
+                    try {
+                        removeHeartbeat(nodeIdentifier);
+                    } catch (final Exception e) {
+                        logger.warn("Failed to remove heartbeat for {} due to 
{}", nodeIdentifier, e.toString());
+                        logger.warn("", e);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d33c4c72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 3e98368..78ec8df 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -16,18 +16,6 @@
  */
 package org.apache.nifi.cluster.coordination.heartbeat;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.Unmarshaller;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -38,16 +26,29 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 /**
  * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and
  * then relies on the NiFi Cluster Protocol to receive heartbeat messages from
@@ -60,6 +61,8 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
     private final String heartbeatAddress;
     private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> 
heartbeatMessages = new ConcurrentHashMap<>();
 
+    private volatile long purgeTimestamp = System.currentTimeMillis();
+
     protected static final Unmarshaller nodeIdentifierUnmarshaller;
 
     static {
@@ -136,6 +139,12 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
     public synchronized void purgeHeartbeats() {
         logger.debug("Purging old heartbeats");
         heartbeatMessages.clear();
+        purgeTimestamp = System.currentTimeMillis();
+    }
+
+    @Override
+    public synchronized long getPurgeTimestamp() {
+        return purgeTimestamp;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/d33c4c72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 6610231..50bdd0d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -17,20 +17,6 @@
 
 package org.apache.nifi.cluster.coordination.heartbeat;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.cluster.ReportedEvent;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
@@ -46,6 +32,22 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class TestAbstractHeartbeatMonitor {
     private NodeIdentifier nodeId;
     private TestFriendlyHeartbeatMonitor monitor;
@@ -131,6 +133,38 @@ public class TestAbstractHeartbeatMonitor {
         assertTrue(requestedToConnect.isEmpty());
     }
 
+    @Test
+    public void testDisconnectionOfTerminatedNodeDueToLackOfHeartbeat() throws 
Exception {
+        final NodeIdentifier nodeId1 = nodeId;
+        final NodeIdentifier nodeId2 = new 
NodeIdentifier(UUID.randomUUID().toString(), "localhost", 7777, "localhost", 
6666, "localhost", null, null, false);
+
+        final ClusterCoordinatorAdapter adapter = new 
ClusterCoordinatorAdapter();
+        final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+        // set state to connecting
+        adapter.requestNodeConnect(nodeId1);
+        adapter.requestNodeConnect(nodeId2);
+
+        // ensure each node is connected
+        
assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.CONNECTING).containsAll(Arrays.asList(nodeId1,
 nodeId2)));
+
+        // let each node heartbeat in
+        monitor.addHeartbeat(createHeartbeat(nodeId1, 
NodeConnectionState.CONNECTED));
+        monitor.addHeartbeat(createHeartbeat(nodeId2, 
NodeConnectionState.CONNECTED));
+        monitor.waitForProcessed();
+
+        // ensure each node is now connected
+        
assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.CONNECTED).containsAll(Arrays.asList(nodeId1,
 nodeId2)));
+
+        // purge the heartbeats, simulate nodeId2 termination by only having a 
nodeId1 heartbeat be present
+        monitor.purgeHeartbeats();
+        monitor.addHeartbeat(createHeartbeat(nodeId1, 
NodeConnectionState.CONNECTED));
+        monitor.waitForProcessed();
+
+        // the node that did not heartbeat in should be disconnected
+        
assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.CONNECTED).contains(nodeId1));
+        
assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.DISCONNECTED).contains(nodeId2));
+    }
 
     @Test
     public void testConnectingNodeMarkedConnectedWhenHeartbeatReceived() 
throws InterruptedException {
@@ -339,8 +373,9 @@ public class TestAbstractHeartbeatMonitor {
 
 
     private static class TestFriendlyHeartbeatMonitor extends 
AbstractHeartbeatMonitor {
-        private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new 
HashMap<>();
+        private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new 
ConcurrentHashMap<>();
         private final Object mutex = new Object();
+        private long purgeTimestamp = System.currentTimeMillis();
 
         public TestFriendlyHeartbeatMonitor(ClusterCoordinator 
clusterCoordinator, NiFiProperties nifiProperties) {
             super(clusterCoordinator, nifiProperties);
@@ -348,7 +383,7 @@ public class TestAbstractHeartbeatMonitor {
 
         @Override
         protected synchronized Map<NodeIdentifier, NodeHeartbeat> 
getLatestHeartbeats() {
-            return heartbeats;
+            return Collections.unmodifiableMap(heartbeats);
         }
 
         @Override
@@ -372,6 +407,14 @@ public class TestAbstractHeartbeatMonitor {
         @Override
         public synchronized void purgeHeartbeats() {
             heartbeats.clear();
+            purgeTimestamp = System.currentTimeMillis();
+        }
+
+        @Override
+        public long getPurgeTimestamp() {
+            // deduct 90 because it is greater than 10 ms (interval defined in 
this test) * 8 (defined by the threshold in the heartbeat monitor)...
+            // it will ensure that the amount of time since the last heartbeat 
is outside the allowed threshold leading to the disconnection of the node
+            return purgeTimestamp - 90;
         }
 
         void waitForProcessed() throws InterruptedException {

Reply via email to