[
https://issues.apache.org/jira/browse/NIFI-2566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421657#comment-15421657
]
ASF GitHub Bot commented on NIFI-2566:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/866#discussion_r74833401
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
---
@@ -18,102 +18,80 @@
package org.apache.nifi.controller.cluster;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Properties;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.HeartbeatPayload;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Uses ZooKeeper in order to determine which node is the elected Cluster
Coordinator and to indicate
- * that this node is part of the cluster. However, once the Cluster
Coordinator is known, heartbeats are
+ * Uses Leader Election Manager in order to determine which node is the
elected Cluster Coordinator and to indicate
+ * that this node is part of the cluster. Once the Cluster Coordinator is
known, heartbeats are
* sent directly to the Cluster Coordinator.
*/
public class ClusterProtocolHeartbeater implements Heartbeater {
private static final Logger logger =
LoggerFactory.getLogger(ClusterProtocolHeartbeater.class);
private final NodeProtocolSender protocolSender;
- private final CuratorFramework curatorClient;
- private final String nodesPathPrefix;
-
- private final String coordinatorPath;
- private volatile String coordinatorAddress;
+ private final LeaderElectionManager electionManager;
+ private final ClusterCoordinator clusterCoordinator;
-
- public ClusterProtocolHeartbeater(final NodeProtocolSender
protocolSender, final Properties properties) {
+ public ClusterProtocolHeartbeater(final NodeProtocolSender
protocolSender, final ClusterCoordinator clusterCoordinator, final
LeaderElectionManager electionManager) {
this.protocolSender = protocolSender;
-
- final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
- final ZooKeeperClientConfig zkConfig =
ZooKeeperClientConfig.createConfig(properties);
-
- curatorClient =
CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
- zkConfig.getSessionTimeoutMillis(),
zkConfig.getConnectionTimeoutMillis(), retryPolicy);
-
- curatorClient.start();
- nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
- coordinatorPath = nodesPathPrefix + "/coordinator";
+ this.clusterCoordinator = clusterCoordinator;
+ this.electionManager = electionManager;
}
@Override
public String getHeartbeatAddress() throws IOException {
- final String curAddress = coordinatorAddress;
- if (curAddress != null) {
- return curAddress;
+ final String heartbeatAddress =
electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
+ if (heartbeatAddress == null) {
+ throw new ProtocolException("Cannot send heartbeat because
there is no Cluster Coordinator currently elected");
}
- try {
- // Get coordinator address and add watcher to change who we
are heartbeating to if the value changes.
- final byte[] coordinatorAddressBytes =
curatorClient.getData().usingWatcher(new Watcher() {
- @Override
- public void process(final WatchedEvent event) {
- coordinatorAddress = null;
- }
- }).forPath(coordinatorPath);
- final String address = coordinatorAddress = new
String(coordinatorAddressBytes, StandardCharsets.UTF_8);
-
- logger.info("Determined that Cluster Coordinator is located at
{}; will use this address for sending heartbeat messages", address);
- return address;
- } catch (Exception e) {
- throw new IOException("Unable to determine Cluster Coordinator
from ZooKeeper", e);
- }
+ return heartbeatAddress;
}
-
@Override
public synchronized void send(final HeartbeatMessage heartbeatMessage)
throws IOException {
final String heartbeatAddress = getHeartbeatAddress();
-
- try {
- protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
- } catch (final ProtocolException pe) {
- // a ProtocolException is likely the result of not being able
to communicate
- // with the coordinator. If we do get an IOException
communicating with the coordinator,
- // it will be the cause of the Protocol Exception. In this
case, set coordinatorAddress
- // to null so that we double-check next time that the
coordinator has not changed.
- if (pe.getCause() instanceof IOException) {
- coordinatorAddress = null;
+ final HeartbeatResponseMessage responseMessage =
protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
+
+ final byte[] payloadBytes =
heartbeatMessage.getHeartbeat().getPayload();
+ final HeartbeatPayload payload =
HeartbeatPayload.unmarshal(payloadBytes);
+ final List<NodeConnectionStatus> nodeStatusList =
payload.getClusterStatus();
+ final Map<NodeIdentifier, Long> updateIdMap =
nodeStatusList.stream().collect(
+ Collectors.toMap(status -> status.getNodeIdentifier(), status
-> status.getUpdateIdentifier()));
+
+ final List<NodeConnectionStatus> updatedStatuses =
responseMessage.getUpdatedNodeStatuses();
+ if (updatedStatuses != null) {
+ for (final NodeConnectionStatus updatedStatus :
updatedStatuses) {
+ final NodeIdentifier nodeId =
updatedStatus.getNodeIdentifier();
+ final Long updateId = updateIdMap.get(nodeId);
+
+ final boolean updated =
clusterCoordinator.resetNodeStatus(updatedStatus, updateId == null ? -1L :
updateId);
+ if (updated) {
+ // TODO - REMOVE THE ASTERISKS
--- End diff --
Lingering Todo, can it be removed?
> Clustered Nodes can become out of sync regarding which node is coordinator
> --------------------------------------------------------------------------
>
> Key: NIFI-2566
> URL: https://issues.apache.org/jira/browse/NIFI-2566
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.0.0
>
>
> Occasionally, I will see the UI telling me that no Cluster Coordinator has
> been elected. However, I can see in the logs that the node is sending
> heartbeats to the coordinator.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)