Repository: nifi Updated Branches: refs/heads/master a1b07b1e9 -> ded396f0e
NIFI-3933: - When monitoring heartbeats use the connected nodes as the basis for the check. This addresses the case when a node is terminated and no corresponding heartbeats exist. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d33c4c72 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d33c4c72 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d33c4c72 Branch: refs/heads/master Commit: d33c4c72d49979ab04db489e429dd202d51585b1 Parents: a1b07b1 Author: Matt Gilman <[email protected]> Authored: Mon May 22 15:28:30 2017 -0400 Committer: Mark Payne <[email protected]> Committed: Mon May 22 16:50:30 2017 -0400 ---------------------------------------------------------------------- .../heartbeat/HeartbeatMonitor.java | 7 ++ .../heartbeat/AbstractHeartbeatMonitor.java | 41 ++++++++--- .../ClusterProtocolHeartbeatMonitor.java | 37 ++++++---- .../heartbeat/TestAbstractHeartbeatMonitor.java | 75 +++++++++++++++----- 4 files changed, 119 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d33c4c72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java index 6a0937d..3cc5fd0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java @@ -59,6 +59,13 @@ public interface HeartbeatMonitor { void purgeHeartbeats(); /** + * Returns when the heartbeats were purged last. + * + * @return when the heartbeats were purged last + */ + long getPurgeTimestamp(); + + /** * @return the address that heartbeats should be sent to when this node is elected coordinator. */ String getHeartbeatAddress(); http://git-wip-us.apache.org/repos/asf/nifi/blob/d33c4c72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java index 5119dac..c5d9e4b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java @@ -154,19 +154,38 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor { // Disconnect any node that hasn't sent a heartbeat in a long time (8 times the heartbeat interval) final long maxMillis = heartbeatIntervalMillis * 8; - final long threshold = System.currentTimeMillis() - maxMillis; - for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) { - if (heartbeat.getTimestamp() < threshold) { - final long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - heartbeat.getTimestamp()); + final long currentTimestamp = System.currentTimeMillis(); + final long threshold = currentTimestamp - maxMillis; - clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT, - "Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds"); + // consider all connected nodes + for (final NodeIdentifier nodeIdentifier : clusterCoordinator.getNodeIdentifiers(NodeConnectionState.CONNECTED)) { + final NodeHeartbeat heartbeat = latestHeartbeats.get(nodeIdentifier); - try { - removeHeartbeat(heartbeat.getNodeIdentifier()); - } catch (final Exception e) { - logger.warn("Failed to remove heartbeat for {} due to {}", heartbeat.getNodeIdentifier(), e.toString()); - logger.warn("", e); + // consider the most recent heartbeat for this node + if (heartbeat == null) { + final long purgeTimestamp = getPurgeTimestamp(); + + // if there is no heartbeat for this node, see if we purged the heartbeats beyond the allowed heartbeat threshold + if (purgeTimestamp < threshold) { + final long secondsSinceLastPurge = TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - purgeTimestamp); + + clusterCoordinator.disconnectionRequestedByNode(nodeIdentifier, DisconnectionCode.LACK_OF_HEARTBEAT, + "Have not received a heartbeat from node in " + secondsSinceLastPurge + " seconds"); + } + } else { + // see if the heartbeat occurred before the allowed heartbeat threshold + if (heartbeat.getTimestamp() < threshold) { + final long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(currentTimestamp - heartbeat.getTimestamp()); + + clusterCoordinator.disconnectionRequestedByNode(nodeIdentifier, DisconnectionCode.LACK_OF_HEARTBEAT, + "Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds"); + + try { + removeHeartbeat(nodeIdentifier); + } catch (final Exception e) { + logger.warn("Failed to remove heartbeat for {} due to {}", nodeIdentifier, e.toString()); + logger.warn("", e); + } } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d33c4c72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java index 3e98368..78ec8df 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java @@ -16,18 +16,6 @@ */ package org.apache.nifi.cluster.coordination.heartbeat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Function; -import java.util.stream.Collectors; -import javax.xml.bind.JAXBContext; -import javax.xml.bind.Unmarshaller; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; @@ -38,16 +26,29 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; import org.apache.nifi.cluster.protocol.ProtocolListener; -import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage; import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; +import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.stream.Collectors; + /** * Uses Apache ZooKeeper to advertise the address to send heartbeats to, and * then relies on the NiFi Cluster Protocol to receive heartbeat messages from @@ -60,6 +61,8 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im private final String heartbeatAddress; private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<>(); + private volatile long purgeTimestamp = System.currentTimeMillis(); + protected static final Unmarshaller nodeIdentifierUnmarshaller; static { @@ -136,6 +139,12 @@ public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor im public synchronized void purgeHeartbeats() { logger.debug("Purging old heartbeats"); heartbeatMessages.clear(); + purgeTimestamp = System.currentTimeMillis(); + } + + @Override + public synchronized long getPurgeTimestamp() { + return purgeTimestamp; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/d33c4c72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java index 6610231..50bdd0d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java @@ -17,20 +17,6 @@ package org.apache.nifi.cluster.coordination.heartbeat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - import org.apache.nifi.cluster.ReportedEvent; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; @@ -46,6 +32,22 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestAbstractHeartbeatMonitor { private NodeIdentifier nodeId; private TestFriendlyHeartbeatMonitor monitor; @@ -131,6 +133,38 @@ public class TestAbstractHeartbeatMonitor { assertTrue(requestedToConnect.isEmpty()); } + @Test + public void testDisconnectionOfTerminatedNodeDueToLackOfHeartbeat() throws Exception { + final NodeIdentifier nodeId1 = nodeId; + final NodeIdentifier nodeId2 = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 7777, "localhost", 6666, "localhost", null, null, false); + + final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter(); + final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter); + + // set state to connecting + adapter.requestNodeConnect(nodeId1); + adapter.requestNodeConnect(nodeId2); + + // ensure each node is connected + assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.CONNECTING).containsAll(Arrays.asList(nodeId1, nodeId2))); + + // let each node heartbeat in + monitor.addHeartbeat(createHeartbeat(nodeId1, NodeConnectionState.CONNECTED)); + monitor.addHeartbeat(createHeartbeat(nodeId2, NodeConnectionState.CONNECTED)); + monitor.waitForProcessed(); + + // ensure each node is now connected + assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.CONNECTED).containsAll(Arrays.asList(nodeId1, nodeId2))); + + // purge the heartbeats, simulate nodeId2 termination by only having a nodeId1 heartbeat be present + monitor.purgeHeartbeats(); + monitor.addHeartbeat(createHeartbeat(nodeId1, NodeConnectionState.CONNECTED)); + monitor.waitForProcessed(); + + // the node that did not heartbeat in should be disconnected + assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.CONNECTED).contains(nodeId1)); + assertTrue(adapter.getNodeIdentifiers(NodeConnectionState.DISCONNECTED).contains(nodeId2)); + } @Test public void testConnectingNodeMarkedConnectedWhenHeartbeatReceived() throws InterruptedException { @@ -339,8 +373,9 @@ public class TestAbstractHeartbeatMonitor { private static class TestFriendlyHeartbeatMonitor extends AbstractHeartbeatMonitor { - private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new HashMap<>(); + private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new ConcurrentHashMap<>(); private final Object mutex = new Object(); + private long purgeTimestamp = System.currentTimeMillis(); public TestFriendlyHeartbeatMonitor(ClusterCoordinator clusterCoordinator, NiFiProperties nifiProperties) { super(clusterCoordinator, nifiProperties); @@ -348,7 +383,7 @@ public class TestAbstractHeartbeatMonitor { @Override protected synchronized Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() { - return heartbeats; + return Collections.unmodifiableMap(heartbeats); } @Override @@ -372,6 +407,14 @@ public class TestAbstractHeartbeatMonitor { @Override public synchronized void purgeHeartbeats() { heartbeats.clear(); + purgeTimestamp = System.currentTimeMillis(); + } + + @Override + public long getPurgeTimestamp() { + // deduct 90 because it is greater than 10 ms (interval defined in this test) * 8 (defined by the threshold in the heartbeat monitor)... + // it will ensure that the amount of time since the last heartbeat is outside the allowed threshold leading to the disconnection of the node + return purgeTimestamp - 90; } void waitForProcessed() throws InterruptedException {
