Repository: nifi
Updated Branches:
  refs/heads/master 18f415001 -> 42df02f01


NIFI-2406 This closes #820. Addressed regression introduced in NIFI-2406 where 
the cluster does not recognize a new Cluster Coordinator when the coordinator 
is shutdown


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/42df02f0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/42df02f0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/42df02f0

Branch: refs/heads/master
Commit: 42df02f014df6ac332f41120d1d01eac0118d946
Parents: 18f4150
Author: Mark Payne <[email protected]>
Authored: Tue Aug 9 11:11:08 2016 -0400
Committer: joewitt <[email protected]>
Committed: Tue Aug 9 15:19:49 2016 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/util/NiFiProperties.java    |  1 -
 .../heartbeat/AbstractHeartbeatMonitor.java     | 24 +++++++++-----------
 .../ClusterProtocolHeartbeatMonitor.java        |  9 +++++---
 .../heartbeat/TestAbstractHeartbeatMonitor.java | 10 ++++++++
 .../apache/nifi/controller/FlowController.java  | 21 +++++++++++------
 .../src/main/resources/conf/nifi.properties     |  3 ---
 6 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/42df02f0/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index a873c44..90115d5 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -164,7 +164,6 @@ public class NiFiProperties extends Properties {
     public static final String CLUSTER_NODE_ADDRESS = 
"nifi.cluster.node.address";
     public static final String CLUSTER_NODE_PROTOCOL_PORT = 
"nifi.cluster.node.protocol.port";
     public static final String CLUSTER_NODE_PROTOCOL_THREADS = 
"nifi.cluster.node.protocol.threads";
-    public static final String REQUEST_REPLICATION_CLAIM_TIMEOUT = 
"nifi.cluster.request.replication.claim.timeout";
     public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = 
"nifi.cluster.node.connection.timeout";
     public static final String CLUSTER_NODE_READ_TIMEOUT = 
"nifi.cluster.node.read.timeout";
     public static final String CLUSTER_FIREWALL_FILE = 
"nifi.cluster.firewall.file";

http://git-wip-us.apache.org/repos/asf/nifi/blob/42df02f0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 116ef3e..c216ed3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -42,7 +42,6 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
     protected final ClusterCoordinator clusterCoordinator;
     protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat 
Monitor", true);
 
-    protected volatile long latestHeartbeatTime;
     private volatile ScheduledFuture<?> future;
     private volatile boolean stopped = true;
 
@@ -57,8 +56,8 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
     @Override
     public synchronized final void start() {
         if (!stopped) {
-            logger.debug("Attempted to start Heartbeat Monitor but it is 
already started");
-            return;
+            logger.info("Attempted to start Heartbeat Monitor but it is 
already started. Stopping heartbeat monitor and re-starting it.");
+            stop();
         }
 
         stopped = false;
@@ -125,6 +124,15 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
      * Visible for testing.
      */
     protected synchronized void monitorHeartbeats() {
+        if (!clusterCoordinator.isActiveClusterCoordinator()) {
+            // Occasionally Curator appears to not notify us that we have lost 
the elected leader role, or does so
+            // on a very large delay. So before we kick the node out of the 
cluster, we want to first check what the
+            // ZNode in ZooKeeper says, and ensure that this is the node that 
is being advertised as the appropriate
+            // destination for heartbeats.
+            logger.debug("It appears that this node is no longer the actively 
elected cluster coordinator. Will not request that node disconnect.");
+            return;
+        }
+
         final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = 
getLatestHeartbeats();
         if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
             logger.debug("Received no new heartbeats. Will not disconnect any 
nodes due to lack of heartbeat");
@@ -153,16 +161,6 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
             if (heartbeat.getTimestamp() < threshold) {
                 final long secondsSinceLastHeartbeat = 
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - 
heartbeat.getTimestamp());
 
-                if (!clusterCoordinator.isActiveClusterCoordinator()) {
-                    // Occasionally Curator appears to not notify us that we 
have lost the elected leader role, or does so
-                    // on a very large delay. So before we kick the node out 
of the cluster, we want to first check what the
-                    // ZNode in ZooKeeper says, and ensure that this is the 
node that is being advertised as the appropriate
-                    // destination for heartbeats.
-                    logger.debug("Have not received a heartbeat from node in " 
+ secondsSinceLastHeartbeat +
-                        " seconds but it appears that this node is no longer 
the actively elected cluster coordinator. Will not request that node 
disconnect.");
-                    return;
-                }
-
                 
clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), 
DisconnectionCode.LACK_OF_HEARTBEAT,
                     "Have not received a heartbeat from node in " + 
secondsSinceLastHeartbeat + " seconds");
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/42df02f0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index f206a07..09dccad 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -65,6 +65,7 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
     private final String clusterNodesPath;
 
     private volatile Map<String, NodeIdentifier> clusterNodeIds = new 
HashMap<>();
+    private volatile CuratorFramework curatorClient;
 
     private final String heartbeatAddress;
     private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> 
heartbeatMessages = new ConcurrentHashMap<>();
@@ -112,7 +113,7 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
     @Override
     public void onStart() {
         final RetryPolicy retryPolicy = new RetryForever(5000);
-        final CuratorFramework curatorClient = 
CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
+        curatorClient = 
CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
             zkClientConfig.getSessionTimeoutMillis(), 
zkClientConfig.getConnectionTimeoutMillis(), retryPolicy);
         curatorClient.start();
 
@@ -136,14 +137,13 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
                     try {
                         try {
                             curatorClient.setData().forPath(path, 
heartbeatAddress.getBytes(StandardCharsets.UTF_8));
-                            curatorClient.close();
                             logger.info("Successfully published Cluster 
Heartbeat Monitor Address of {} to ZooKeeper", heartbeatAddress);
                             return;
                         } catch (final NoNodeException nne) {
                             // ensure that parents are created, using a 
wide-open ACL because the parents contain no data
                             // and the path is not in any way sensitive.
                             try {
-                                
curatorClient.create().creatingParentContainersIfNeeded().forPath(path);
+                                
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
                             } catch (final NodeExistsException nee) {
                                 // This is okay. Node already exists.
                             }
@@ -174,6 +174,9 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
 
     @Override
     public void onStop() {
+        if (curatorClient != null) {
+            curatorClient.close();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/42df02f0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 9ef0a14..0f1ce20 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -75,6 +75,11 @@ public class TestAbstractHeartbeatMonitor {
             public synchronized void requestNodeConnect(final NodeIdentifier 
nodeId, String userDn) {
                 requestedToConnect.add(nodeId);
             }
+
+            @Override
+            public boolean isActiveClusterCoordinator() {
+                return true;
+            }
         };
 
         final TestFriendlyHeartbeatMonitor monitor = 
createMonitor(coordinator);
@@ -141,6 +146,11 @@ public class TestAbstractHeartbeatMonitor {
                 super.finishNodeConnection(nodeId);
                 connected.add(nodeId);
             }
+
+            @Override
+            public boolean isActiveClusterCoordinator() {
+                return true;
+            }
         };
 
         final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);

http://git-wip-us.apache.org/repos/asf/nifi/blob/42df02f0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 4762029..fdb6f58 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -605,10 +605,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             leaderElectionManager = null;
             heartbeater = null;
         }
-
-        if (heartbeatMonitor != null) {
-            heartbeatMonitor.start();
-        }
     }
 
     @Override
@@ -3316,7 +3312,15 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             @Override
             public synchronized void onLeaderRelinquish() {
                 LOG.info("This node is no longer the elected Active Cluster 
Coordinator");
-                heartbeatMonitor.stop();
+
+                // We do not want to stop the heartbeat monitor. This is 
because even though ZooKeeper offers guarantees
+                // that watchers will see changes on a ZNode in the order they 
happened, there does not seem to be any
+                // guarantee that Curator will notify us that our leadership 
was gained or loss in the order that it happened.
+                // As a result, if nodes connect/disconnect from cluster 
quickly, we could invoke stop() then start() or
+                // start() then stop() in the wrong order, which can cause the 
cluster to behavior improperly. As a result, we simply
+                // call start() when we become the leader, and this will 
ensure that initialization is handled. The heartbeat monitor
+                // then will check the zookeeper znode to check if it is the 
cluster coordinator before kicking any nodes out of the
+                // cluster.
 
                 if (clusterCoordinator != null) {
                     
clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR);
@@ -3326,7 +3330,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             @Override
             public synchronized void onLeaderElection() {
                 LOG.info("This node elected Active Cluster Coordinator");
-                heartbeatMonitor.start();
+                heartbeatMonitor.start();   // ensure heartbeat monitor is 
started
 
                 if (clusterCoordinator != null) {
                     
clusterCoordinator.addRole(ClusterRoles.CLUSTER_COORDINATOR);
@@ -3885,7 +3889,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                     heartbeatLogger.debug(usae.getMessage());
                 }
             } catch (final Throwable ex) {
-                heartbeatLogger.warn("Failed to send heartbeat due to: " + ex, 
ex);
+                heartbeatLogger.warn("Failed to send heartbeat due to: " + ex);
+                if (heartbeatLogger.isDebugEnabled()) {
+                    heartbeatLogger.warn("", ex);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/42df02f0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index e2d3385..a65b265 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -171,9 +171,6 @@ 
nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout}
 nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout}
 nifi.cluster.firewall.file=${nifi.cluster.firewall.file}
 
-# How long a request should be allowed to hold a 'lock' on a component. #
-nifi.cluster.request.replication.claim.timeout=${nifi.cluster.request.replication.claim.timeout}
-
 # zookeeper properties, used for cluster management #
 nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string}
 nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}

Reply via email to