This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new fbd6200  NIFI-6589: This closes #3670. Cache results from zookeeper 
when determining the leader NIFI-6589: Updated CuratorLeaderElectionManager to 
cache results for no more than 5 seconds per review feedback
fbd6200 is described below

commit fbd6200ab3e4410fd9cf05f31348ec56a89d5af7
Author: Mark Payne <[email protected]>
AuthorDate: Mon Aug 26 11:16:30 2019 -0400

    NIFI-6589: This closes #3670. Cache results from zookeeper when determining 
the leader
    NIFI-6589: Updated CuratorLeaderElectionManager to cache results for no 
more than 5 seconds per review feedback
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../heartbeat/AbstractHeartbeatMonitor.java        |  6 +---
 .../coordination/node/NodeClusterCoordinator.java  |  3 +-
 .../heartbeat/TestAbstractHeartbeatMonitor.java    |  2 +-
 .../election/CuratorLeaderElectionManager.java     | 39 ++++++++++++++++++----
 .../nifi/integration/parameters/ParametersIT.java  |  2 ++
 .../web/StandardNiFiWebConfigurationContext.java   | 10 +++---
 6 files changed, 42 insertions(+), 20 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 5fbe3f8..86b4cc1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -111,9 +111,6 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
         return clusterCoordinator;
     }
 
-    protected long getHeartbeatInterval(final TimeUnit timeUnit) {
-        return timeUnit.convert(heartbeatIntervalMillis, 
TimeUnit.MILLISECONDS);
-    }
 
     /**
      * Fetches all of the latest heartbeats and updates the Cluster Coordinator
@@ -122,8 +119,7 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
      * Visible for testing.
      */
     protected synchronized void monitorHeartbeats() {
-        final NodeIdentifier activeCoordinator = 
clusterCoordinator.getElectedActiveCoordinatorNode();
-        if (activeCoordinator != null && 
!activeCoordinator.equals(clusterCoordinator.getLocalNodeIdentifier())) {
+        if (!clusterCoordinator.isActiveClusterCoordinator()) {
             // Occasionally Curator appears to not notify us that we have lost 
the elected leader role, or does so
             // on a very large delay. So before we kick the node out of the 
cluster, we want to first check what the
             // ZNode in ZooKeeper says, and ensure that this is the node that 
is being advertised as the appropriate
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 2ca8ef8..aec3a7a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -784,8 +784,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     @Override
     public boolean isActiveClusterCoordinator() {
-        final NodeIdentifier self = getLocalNodeIdentifier();
-        return self != null && self.equals(getElectedActiveCoordinatorNode());
+        return leaderElectionManager != null && 
leaderElectionManager.isLeader(ClusterRoles.CLUSTER_COORDINATOR);
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 4aeff7b..2245c6e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -349,7 +349,7 @@ public class TestAbstractHeartbeatMonitor {
 
         @Override
         public boolean isActiveClusterCoordinator() {
-            return false;
+            return true;
         }
 
         @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index d07c776..eac9447 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -446,6 +446,8 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
         private final String participantId;
 
         private volatile boolean leader;
+        private long leaderUpdateTimestamp = 0L;
+        private final long MAX_CACHE_MILLIS = TimeUnit.SECONDS.toMillis(5L);
 
         public ElectionListener(final String roleName, final 
LeaderElectionStateChangeListener listener, final String participantId) {
             this.roleName = roleName;
@@ -453,12 +455,34 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
             this.participantId = participantId;
         }
 
-        public boolean isLeader() {
+        public synchronized boolean isLeader() {
+            if (leaderUpdateTimestamp < System.currentTimeMillis() - 
MAX_CACHE_MILLIS) {
+                try {
+                    final long start = System.nanoTime();
+                    final boolean zkLeader = verifyLeader();
+                    final long nanos = System.nanoTime() - start;
+
+                    setLeader(zkLeader);
+                    logger.debug("Took {} nanoseconds to reach out to 
ZooKeeper in order to check whether or not this node is currently the leader 
for Role '{}'. ZooKeeper reported {}",
+                        nanos, roleName, zkLeader);
+                } catch (final Exception e) {
+                    logger.warn("Attempted to reach out to ZooKeeper to 
determine whether or not this node is the elected leader for Role '{}' but 
failed to communicate with ZooKeeper. " +
+                        "Assuming that this node is not the leader.", 
roleName, e);
+
+                    return false;
+                }
+            }
+
             return leader;
         }
 
+        private synchronized void setLeader(final boolean leader) {
+            this.leader = leader;
+            this.leaderUpdateTimestamp = System.currentTimeMillis();
+        }
+
         @Override
-        public void stateChanged(final CuratorFramework client, final 
ConnectionState newState) {
+        public synchronized void stateChanged(final CuratorFramework client, 
final ConnectionState newState) {
             logger.info("{} Connection State changed to {}", this, 
newState.name());
 
             if (newState == ConnectionState.SUSPENDED || newState == 
ConnectionState.LOST) {
@@ -466,7 +490,7 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
                     logger.info("Because Connection State was changed to {}, 
will relinquish leadership for role '{}'", newState, roleName);
                 }
 
-                leader = false;
+                setLeader(false);
             }
 
             super.stateChanged(client, newState);
@@ -482,18 +506,19 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
             final String leader = getLeader(roleName);
             if (leader == null) {
                 logger.debug("Reached out to ZooKeeper to determine which node 
is the elected leader for Role '{}' but found that there is no leader.", 
roleName);
-                return false;
+                setLeader(false);
             }
 
             final boolean match = leader.equals(participantId);
             logger.debug("Reached out to ZooKeeper to determine which node is 
the elected leader for Role '{}'. Elected Leader = '{}', Participant ID = '{}', 
This Node Elected = {}",
                 roleName, leader, participantId, match);
+            setLeader(match);
             return match;
         }
 
         @Override
         public void takeLeadership(final CuratorFramework client) throws 
Exception {
-            leader = true;
+            setLeader(true);
             logger.info("{} This node has been elected Leader for Role '{}'", 
this, roleName);
 
             if (listener != null) {
@@ -502,7 +527,7 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
                 } catch (final Exception e) {
                     logger.error("This node was elected Leader for Role '{}' 
but failed to take leadership. Will relinquish leadership role. Failure was due 
to: {}", roleName, e);
                     logger.error("", e);
-                    leader = false;
+                    setLeader(false);
                     Thread.sleep(1000L);
                     return;
                 }
@@ -547,7 +572,7 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
                     }
                 }
             } finally {
-                leader = false;
+                setLeader(false);
                 logger.info("{} This node is no longer leader for role '{}'", 
this, roleName);
 
                 if (listener != null) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
index 4a394ae..33d7639 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
@@ -366,4 +366,6 @@ public class ParametersIT extends FrameworkIntegrationTest {
         assertEquals(allParamNames, referencedParameters);
 
     }
+
+
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
index 5f45085..e24c25c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
@@ -279,16 +279,16 @@ public class StandardNiFiWebConfigurationContext 
implements NiFiWebConfiguration
     }
 
     private NodeResponse replicate(final String method, final URI uri, final 
Object entity, final Map<String, String> headers) throws InterruptedException {
-        final NodeIdentifier coordinatorNode = 
clusterCoordinator.getElectedActiveCoordinatorNode();
-        if (coordinatorNode == null) {
-            throw new NoClusterCoordinatorException();
-        }
-
         // Determine whether we should replicate only to the cluster 
coordinator, or if we should replicate directly
         // to the cluster nodes themselves.
         if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
             return requestReplicator.replicate(method, uri, entity, 
headers).awaitMergedResponse();
         } else {
+            final NodeIdentifier coordinatorNode = 
clusterCoordinator.getElectedActiveCoordinatorNode();
+            if (coordinatorNode == null) {
+                throw new NoClusterCoordinatorException();
+            }
+
             return requestReplicator.forwardToCoordinator(coordinatorNode, 
method, uri, entity, headers).awaitMergedResponse();
         }
     }

Reply via email to