Repository: nifi Updated Branches: refs/heads/master a2ed0f061 -> c2ae7a6d7
NIFI-2605: - Fixing a regression bug where nodes would potentially be elected leader for Cluster Coordinator role when they do not have the correct flow - Ensure that we log which node is the cluster coordinator on startup instead of just indicating that there is one. If we later determine that there is none, ensure that we register for the role This closes #900 Signed-off-by: jpercivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c2ae7a6d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c2ae7a6d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c2ae7a6d Branch: refs/heads/master Commit: c2ae7a6d7cf94b817e288c28038d18db51fcb457 Parents: a2ed0f0 Author: Mark Payne <[email protected]> Authored: Fri Aug 19 13:41:58 2016 -0400 Committer: jpercivall <[email protected]> Committed: Wed Aug 24 12:30:48 2016 -0400 ---------------------------------------------------------------------- .../node/CuratorNodeProtocolSender.java | 2 +- .../node/LeaderElectionNodeProtocolSender.java | 2 +- .../node/NodeClusterCoordinator.java | 2 +- .../exception/BlockedByFirewallException.java | 1 + .../manager/exception/ClusterException.java | 39 ----- .../exception/IllegalClusterStateException.java | 2 + .../NoClusterCoordinatorException.java | 31 ---- .../exception/NoConnectedNodesException.java | 2 + .../exception/NoResponseFromNodesException.java | 2 + .../exception/NodeDisconnectionException.java | 2 + .../exception/NodeReconnectionException.java | 2 + .../manager/exception/UnknownNodeException.java | 2 + .../nifi/cluster/integration/Cluster.java | 6 +- .../integration/ClusterConnectionIT.java | 15 +- .../apache/nifi/cluster/integration/Node.java | 8 +- .../cluster/exception/ClusterException.java | 39 +++++ .../NoClusterCoordinatorException.java | 33 ++++ .../apache/nifi/controller/FlowController.java | 29 ++-- .../nifi/controller/StandardFlowService.java | 13 ++ .../election/CuratorLeaderElectionManager.java | 150 +++++++++++++++---- .../leader/election/LeaderElectionManager.java | 22 +-- .../StandaloneLeaderElectionManager.java | 9 +- .../java/org/apache/nifi/nar/NarCloseable.java | 2 +- .../src/main/resources/conf/logback.xml | 1 + .../nifi/web/StandardNiFiContentAccess.java | 2 +- .../StandardNiFiWebConfigurationContext.java | 2 +- .../nifi/web/api/ApplicationResource.java | 2 +- .../web/api/config/ClusterExceptionMapper.java | 3 +- .../NoClusterCoordinatorExceptionMapper.java | 2 +- 29 files changed, 288 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java index 0806959..ee6e930 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/CuratorNodeProtocolSender.java @@ -23,7 +23,7 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java index 03de329..c5ee671 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/LeaderElectionNodeProtocolSender.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.AbstractNodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index b1a088e..e50d8fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -39,10 +39,10 @@ import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback; import org.apache.nifi.cluster.event.Event; import org.apache.nifi.cluster.event.NodeEvent; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.ComponentRevision; import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java index c13e9d9..bf0138e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/BlockedByFirewallException.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; import org.apache.nifi.cluster.protocol.NodeIdentifier; /** http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java deleted file mode 100644 index e93acca..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/ClusterException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.manager.exception; - -/** - * The base exception class for cluster related exceptions. - * - */ -public class ClusterException extends RuntimeException { - - public ClusterException() { - } - - public ClusterException(String msg) { - super(msg); - } - - public ClusterException(Throwable cause) { - super(cause); - } - - public ClusterException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java index e10e9c3..1ba9c8f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Signals that an operation to be performed on a cluster has been invoked at an illegal or inappropriate time. * http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java deleted file mode 100644 index 89b6722..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoClusterCoordinatorException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.cluster.manager.exception; - -public class NoClusterCoordinatorException extends ClusterException { - private static final long serialVersionUID = -1782098541351698293L; - - public NoClusterCoordinatorException() { - super("Action cannot be performed because there is currently no Cluster Coordinator elected. " - + "The request should be tried again after a moment, after a Cluster Coordinator has been automatically elected."); - } - - public NoClusterCoordinatorException(final String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java index 7a828e3..8694878 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Represents the exceptional case when the cluster is unable to service a request because no nodes are connected. * http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java index 9616ad3..9198c5e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Represents the exceptional case when the cluster is unable to service a request because no nodes returned a response. When the given request is not mutable the nodes are left in their previous * state. http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java index 2f59eed..c788f16 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Represents the exceptional case when a disconnection request to a node failed. * http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java index be3edf4..e7c2baa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Represents the exceptional case when a reconnection request to a node failed. * http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java index 2d43e8a..21843cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.cluster.manager.exception; +import org.apache.nifi.cluster.exception.ClusterException; + /** * Represents the exceptional case when a request is made for a node that does not exist. * http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java index a0c8648..ce2017e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java @@ -17,6 +17,7 @@ package org.apache.nifi.cluster.integration; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -40,7 +41,7 @@ public class Cluster { private final Set<Node> nodes = new HashSet<>(); private final TestingServer zookeeperServer; - public Cluster() { + public Cluster() throws IOException { try { zookeeperServer = new TestingServer(); } catch (final Exception e) { @@ -114,7 +115,8 @@ public class Cluster { addProps.put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, getZooKeeperConnectString()); addProps.put(NiFiProperties.CLUSTER_IS_NODE, "true"); - final Node node = new Node(NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", addProps)); + final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", addProps); + final Node node = new Node(nifiProperties); node.start(); nodes.add(node); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java index 3439263..45a2e42 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/ClusterConnectionIT.java @@ -46,7 +46,7 @@ public class ClusterConnectionIT { } @Before - public void createCluster() { + public void createCluster() throws IOException { cluster = new Cluster(); cluster.start(); } @@ -140,9 +140,8 @@ public class ClusterConnectionIT { cluster.waitForPrimaryNode(10, TimeUnit.SECONDS); } - @Test(timeout = 60000) - public void testRestartAllNodes() throws IOException { + public void testRestartAllNodes() throws IOException, InterruptedException { final Node firstNode = cluster.createNode(); final Node secondNode = cluster.createNode(); final Node thirdNode = cluster.createNode(); @@ -164,7 +163,13 @@ public class ClusterConnectionIT { firstNode.start(); secondNode.start(); - cluster.waitUntilAllNodesConnected(10, TimeUnit.SECONDS); + + firstNode.waitUntilConnected(20, TimeUnit.SECONDS); + System.out.println("\n\n\n**** Node 1 Re-Connected ****\n\n\n"); + secondNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 2 Re-Connected ****"); + thirdNode.waitUntilConnected(10, TimeUnit.SECONDS); + System.out.println("**** Node 3 Re-Connected ****"); // wait for all 3 nodes to agree that node 2 is connected Stream.of(firstNode, secondNode, thirdNode).forEach(node -> { @@ -205,7 +210,7 @@ public class ClusterConnectionIT { otherNode.assertNodeConnects(nodeToSuspend.getIdentifier(), 10, TimeUnit.SECONDS); } - @Test + @Test(timeout = 60000) public void testNodeInheritsClusterTopologyOnHeartbeat() throws InterruptedException { final Node node1 = cluster.createNode(); final Node node2 = cluster.createNode(); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java index b9372a6..7ba718b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java @@ -87,8 +87,9 @@ public class Node { private ScheduledExecutorService executor = new FlowEngine(8, "Node tasks", true); + public Node(final NiFiProperties properties) { - this(new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null), properties); + this(createNodeId(), properties); } public Node(final NodeIdentifier nodeId, final NiFiProperties properties) { @@ -121,6 +122,10 @@ public class Node { } + private static NodeIdentifier createNodeId() { + return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", createPort(), "localhost", createPort(), "localhost", null, null, false, null); + } + public synchronized void start() { running = true; @@ -148,6 +153,7 @@ public class Node { StringEncryptor.createEncryptor(nodeProperties), revisionManager, Mockito.mock(Authorizer.class)); flowService.start(); + flowService.load(null); } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java new file mode 100644 index 0000000..22de7c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/ClusterException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.exception; + +/** + * The base exception class for cluster related exceptions. + * + */ +public class ClusterException extends RuntimeException { + + public ClusterException() { + } + + public ClusterException(String msg) { + super(msg); + } + + public ClusterException(Throwable cause) { + super(cause); + } + + public ClusterException(String msg, Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java new file mode 100644 index 0000000..10e8457 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/exception/NoClusterCoordinatorException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.exception; + +import org.apache.nifi.cluster.exception.ClusterException; + +public class NoClusterCoordinatorException extends ClusterException { + private static final long serialVersionUID = -1782098541351698293L; + + public NoClusterCoordinatorException() { + super("Action cannot be performed because there is currently no Cluster Coordinator elected. " + + "The request should be tried again after a moment, after a Cluster Coordinator has been automatically elected."); + } + + public NoClusterCoordinatorException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 4e67558..3acba94 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -591,16 +591,26 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // kicking everyone out. This way, we instead inherit the cluster flow before we attempt to be // the coordinator. LOG.info("Checking if there is already a Cluster Coordinator Elected..."); - final NodeIdentifier electedCoordinatorNodeId = clusterCoordinator.getElectedActiveCoordinatorNode(); - if (electedCoordinatorNodeId == null) { + final String clusterCoordinatorAddress = leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR); + if (StringUtils.isEmpty(clusterCoordinatorAddress)) { LOG.info("It appears that no Cluster Coordinator has been Elected yet. Registering for Cluster Coordinator Role."); - registerForClusterCoordinator(); + registerForClusterCoordinator(true); } else { - LOG.info("The Elected Cluster Coordinator is {}. Will not register to be elected for this role until after connecting " - + "to the cluster and inheriting the cluster's flow.", electedCoordinatorNodeId); + // At this point, we have determined that there is a Cluster Coordinator elected. It is important to note, though, + // that if we are running an embedded ZooKeeper, and we have just restarted the cluster (at least the nodes that run the + // embedded ZooKeeper), that we could possibly determine that the Cluster Coordinator is at an address that is not really + // valid. This is because the latest stable ZooKeeper does not support "Container ZNodes" and as a result the ZNodes that + // are created are persistent, not ephemeral. Upon restart, we can get this persisted value, even though the node that belongs + // to that address has not started. ZooKeeper/Curator will recognize this after a while and delete the ZNode. As a result, + // we may later determine that there is in fact no Cluster Coordinator. If this happens, we will automatically register for + // Cluster Coordinator through the StandardFlowService. + LOG.info("The Election for Cluster Coordinator has already begun (Leader is {}). Will not register to be elected for this role until after connecting " + + "to the cluster and inheriting the cluster's flow.", clusterCoordinatorAddress); + registerForClusterCoordinator(false); } leaderElectionManager.start(); + heartbeatMonitor.start(); } else { heartbeater = null; } @@ -3321,8 +3331,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return configuredForClustering; } - private void registerForClusterCoordinator() { - final String participantId = heartbeatMonitor.getHeartbeatAddress(); + void registerForClusterCoordinator(final boolean participate) { + final String participantId = participate ? heartbeatMonitor.getHeartbeatAddress() : null; leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() { @Override @@ -3342,12 +3352,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R @Override public synchronized void onLeaderElection() { LOG.info("This node elected Active Cluster Coordinator"); - heartbeatMonitor.start(); // ensure heartbeat monitor is started } }, participantId); } - private void registerForPrimaryNode() { + void registerForPrimaryNode() { final String participantId = heartbeatMonitor.getHeartbeatAddress(); leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new LeaderElectionStateChangeListener() { @@ -3401,7 +3410,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Participate in Leader Election for Heartbeat Monitor. Start the heartbeat monitor // if/when we become leader and stop it when we lose leader role - registerForClusterCoordinator(); + registerForClusterCoordinator(true); leaderElectionManager.start(); stateManagerProvider.enableClusterProvider(); http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index c5f9684..af96cfd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -54,6 +54,7 @@ import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.node.DisconnectionCode; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; import org.apache.nifi.cluster.protocol.DataFlow; @@ -787,6 +788,17 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // we received a successful connection response from manager break; } + } catch (final NoClusterCoordinatorException ncce) { + logger.warn("There is currently no Cluster Coordinator. This often happens upon restart of NiFi when running an embedded ZooKeeper. Will register this node " + + "to become the active Cluster Coordinator and will attempt to connect to cluster again"); + controller.registerForClusterCoordinator(true); + + try { + Thread.sleep(1000L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } } catch (final Exception pe) { // could not create a socket and communicate with manager logger.warn("Failed to connect to cluster due to: " + pe); @@ -798,6 +810,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { try { Thread.sleep(response == null ? 5000 : response.getTryLaterSeconds()); } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); break; } } else { http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java index dfe456b..efc7366 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java @@ -31,6 +31,7 @@ import org.apache.curator.retry.RetryNTimes; import org.apache.nifi.controller.cluster.ZooKeeperClientConfig; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.util.NiFiProperties; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.common.PathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,16 +63,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { stopped = false; - final RetryPolicy retryPolicy = new RetryNTimes(1, 100); - curatorClient = CuratorFrameworkFactory.builder() - .connectString(zkConfig.getConnectString()) - .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis()) - .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis()) - .retryPolicy(retryPolicy) - .defaultData(new byte[0]) - .build(); - - curatorClient.start(); + curatorClient = createClient(); // Call #register for each already-registered role. This will // cause us to start listening for leader elections for that @@ -85,51 +77,59 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { } @Override - public synchronized void register(final String roleName) { - register(roleName, null); - } - - @Override public void register(String roleName, LeaderElectionStateChangeListener listener) { register(roleName, listener, null); } + private String getElectionPath(final String roleName) { + final String rootPath = zkConfig.getRootPath(); + final String leaderPath = rootPath + (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName; + return leaderPath; + } + @Override public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) { logger.debug("{} Registering new Leader Selector for role {}", this, roleName); - if (leaderRoles.containsKey(roleName)) { + // If we already have a Leader Role registered and either the Leader Role is participating in election, + // or the given participant id == null (don't want to participant in election) then we're done. + final LeaderRole currentRole = leaderRoles.get(roleName); + if (currentRole != null && (currentRole.isParticipant() || participantId == null)) { logger.info("{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName); return; } - final String rootPath = zkConfig.getRootPath(); - final String leaderPath = rootPath + (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName; + final String leaderPath = getElectionPath(roleName); try { - PathUtils.validatePath(rootPath); + PathUtils.validatePath(leaderPath); } catch (final IllegalArgumentException e) { throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name"); } registeredRoles.put(roleName, new RegisteredRole(participantId, listener)); + final boolean isParticipant = participantId != null && !participantId.trim().isEmpty(); + if (!isStopped()) { final ElectionListener electionListener = new ElectionListener(roleName, listener); final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener); - leaderSelector.autoRequeue(); - if (participantId != null) { + if (isParticipant) { + leaderSelector.autoRequeue(); leaderSelector.setId(participantId); + leaderSelector.start(); } - leaderSelector.start(); - - final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener); + final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener, isParticipant); leaderRoles.put(roleName, leaderRole); } - logger.info("{} Registered new Leader Selector for role {}", this, roleName); + if (isParticipant) { + logger.info("{} Registered new Leader Selector for role {}; this node is an active participant in the election.", this, roleName); + } else { + logger.info("{} Registered new Leader Selector for role {}; this node is a silent observer in the election.", this, roleName); + } } @Override @@ -151,9 +151,15 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { public synchronized void stop() { stopped = true; - for (final LeaderRole role : leaderRoles.values()) { + for (final Map.Entry<String, LeaderRole> entry : leaderRoles.entrySet()) { + final LeaderRole role = entry.getValue(); final LeaderSelector selector = role.getLeaderSelector(); - selector.close(); + + try { + selector.close(); + } catch (final Exception e) { + logger.warn("Failed to close Leader Selector for {}", entry.getKey(), e); + } } leaderRoles.clear(); @@ -192,9 +198,13 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { @Override public String getLeader(final String roleName) { + if (isStopped()) { + return determineLeaderExternal(roleName); + } + final LeaderRole role = getLeaderRole(roleName); if (role == null) { - return null; + return determineLeaderExternal(roleName); } Participant participant; @@ -217,14 +227,92 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { return participantId; } + + /** + * Determines whether or not leader election has already begun for the role with the given name + * + * @param roleName the role of interest + * @return <code>true</code> if leader election has already begun, <code>false</code> if it has not or if unable to determine this. + */ + @Override + public boolean isLeaderElected(final String roleName) { + final String leaderAddress = determineLeaderExternal(roleName); + return !StringUtils.isEmpty(leaderAddress); + } + + + /** + * Use a new Curator client to determine which node is the elected leader for the given role. + * + * @param roleName the name of the role + * @return the id of the elected leader, or <code>null</code> if no leader has been selected or if unable to determine + * the leader from ZooKeeper + */ + private String determineLeaderExternal(final String roleName) { + final CuratorFramework client = createClient(); + try { + final LeaderSelectorListener electionListener = new LeaderSelectorListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + } + + @Override + public void takeLeadership(CuratorFramework client) throws Exception { + } + }; + + final String electionPath = getElectionPath(roleName); + + // Note that we intentionally do not auto-requeue here, and we do not start the selector. We do not + // want to join the leader election. We simply want to observe. + final LeaderSelector selector = new LeaderSelector(client, electionPath, electionListener); + + try { + final Participant leader = selector.getLeader(); + return leader == null ? null : leader.getId(); + } catch (final KeeperException.NoNodeException nne) { + // If there is no ZNode, then there is no elected leader. + return null; + } catch (final Exception e) { + logger.warn("Unable to determine the Elected Leader for role '{}' due to {}; assuming no leader has been elected", roleName, e.toString()); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + + return null; + } + } finally { + client.close(); + } + } + + private CuratorFramework createClient() { + // Create a new client because we don't want to try indefinitely for this to occur. + final RetryPolicy retryPolicy = new RetryNTimes(1, 100); + + final CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(zkConfig.getConnectString()) + .sessionTimeoutMs(zkConfig.getSessionTimeoutMillis()) + .connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis()) + .retryPolicy(retryPolicy) + .defaultData(new byte[0]) + .build(); + + client.start(); + return client; + } + + private static class LeaderRole { private final LeaderSelector leaderSelector; private final ElectionListener electionListener; + private final boolean participant; - public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener) { + public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener, final boolean participant) { this.leaderSelector = leaderSelector; this.electionListener = electionListener; + this.participant = participant; } public LeaderSelector getLeaderSelector() { @@ -234,6 +322,10 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager { public boolean isLeader() { return electionListener.isLeader(); } + + public boolean isParticipant() { + return participant; + } } private static class RegisteredRole { http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java index ef36528..d9d4e71 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java @@ -24,14 +24,9 @@ public interface LeaderElectionManager { void start(); /** - * Adds a new role for which a leader is required - * - * @param roleName the name of the role - */ - void register(String roleName); - - /** - * Adds a new role for which a leader is required, without providing a Participant ID + * Adds a new role for which a leader is required, without participating in the leader election. I.e., this node + * will not be elected leader but will passively observe changes to the leadership. This allows calls to {@link #isLeader(String)} + * and {@link #getLeader(String)} to know which node is currently elected the leader. * * @param roleName the name of the role * @param listener a listener that will be called when the node gains or relinquishes @@ -40,7 +35,8 @@ public interface LeaderElectionManager { void register(String roleName, LeaderElectionStateChangeListener listener); /** - * Adds a new role for which a leader is required, providing the given value for this node as the Participant ID + * Adds a new role for which a leader is required, providing the given value for this node as the Participant ID. If the Participant ID + * is <code>null</code>, this node will never be elected leader but will passively observe changes to the leadership. * * @param roleName the name of the role * @param listener a listener that will be called when the node gains or relinquishes @@ -90,4 +86,12 @@ public interface LeaderElectionManager { * again, all previously registered roles will still be registered. */ void stop(); + + /** + * Returns <code>true</code> if a leader has been elected for the given role, <code>false</code> otherwise. + * + * @param roleName the name of the role + * @return <code>true</code> if a leader has been elected, <code>false</code> otherwise. + */ + boolean isLeaderElected(String roleName); } http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java index a2ed86e..182e83a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java @@ -29,10 +29,6 @@ public class StandaloneLeaderElectionManager implements LeaderElectionManager { } @Override - public void register(final String roleName) { - } - - @Override public void register(final String roleName, final LeaderElectionStateChangeListener listener) { } @@ -62,4 +58,9 @@ public class StandaloneLeaderElectionManager implements LeaderElectionManager { @Override public void stop() { } + + @Override + public boolean isLeaderElected(String roleName) { + return false; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java index cbb96b1..c65501c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java @@ -45,7 +45,7 @@ public class NarCloseable implements Closeable { frameworkClassLoader = NarClassLoaders.getInstance().getFrameworkClassLoader(); } catch (final Exception e) { // This should never happen in a running instance, but it will occur in unit tests - logger.error("Unable to access Framework ClassLoader due to " + e + ". Will continue without change ClassLoaders."); + logger.error("Unable to access Framework ClassLoader due to " + e + ". Will continue without changing ClassLoaders."); if (logger.isDebugEnabled()) { logger.error("", e); } http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml index 871265e..1a52cc6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml @@ -96,6 +96,7 @@ <logger name="org.apache.zookeeper.server.NIOServerCnxnFactory" level="ERROR" /> <logger name="org.apache.zookeeper.server.quorum" level="ERROR" /> <logger name="org.apache.zookeeper.ZooKeeper" level="ERROR" /> + <logger name="org.apache.zookeeper.server.PrepRequestProcessor" level="ERROR" /> <logger name="org.apache.curator.framework.recipes.leader.LeaderSelector" level="OFF" /> <logger name="org.apache.curator.ConnectionState" level="OFF" /> http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java index e17b147..c39fbc3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiContentAccess.java @@ -23,9 +23,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.util.NiFiProperties; http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java index 8269d79..021f216 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java @@ -40,9 +40,9 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.reporting.ReportingTaskProvider; http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java index 880186d..b233a35 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java @@ -30,8 +30,8 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.Snippet; http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java index 2a67cf8..dd332f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/ClusterExceptionMapper.java @@ -19,7 +19,8 @@ package org.apache.nifi.web.api.config; import javax.ws.rs.core.Response; import javax.ws.rs.ext.ExceptionMapper; import javax.ws.rs.ext.Provider; -import org.apache.nifi.cluster.manager.exception.ClusterException; + +import org.apache.nifi.cluster.exception.ClusterException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/nifi/blob/c2ae7a6d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java index 4b15a70..c052c88 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/NoClusterCoordinatorExceptionMapper.java @@ -20,7 +20,7 @@ package org.apache.nifi.web.api.config; import javax.ws.rs.core.Response; import javax.ws.rs.ext.ExceptionMapper; -import org.apache.nifi.cluster.manager.exception.NoClusterCoordinatorException; +import org.apache.nifi.cluster.exception.NoClusterCoordinatorException; import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException; import org.apache.nifi.util.StringUtils; import org.slf4j.Logger;
