[ 
https://issues.apache.org/jira/browse/NIFI-2566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421657#comment-15421657
 ] 

ASF GitHub Bot commented on NIFI-2566:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/866#discussion_r74833401
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
 ---
    @@ -18,102 +18,80 @@
     package org.apache.nifi.controller.cluster;
     
     import java.io.IOException;
    -import java.nio.charset.StandardCharsets;
    -import java.util.Properties;
    -
    -import org.apache.curator.RetryPolicy;
    -import org.apache.curator.framework.CuratorFramework;
    -import org.apache.curator.framework.CuratorFrameworkFactory;
    -import org.apache.curator.retry.RetryNTimes;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import org.apache.nifi.cluster.coordination.ClusterCoordinator;
    +import org.apache.nifi.cluster.coordination.node.ClusterRoles;
    +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
    +import org.apache.nifi.cluster.protocol.HeartbeatPayload;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
     import org.apache.nifi.cluster.protocol.NodeProtocolSender;
     import org.apache.nifi.cluster.protocol.ProtocolException;
     import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
    -import org.apache.zookeeper.WatchedEvent;
    -import org.apache.zookeeper.Watcher;
    +import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
    +import org.apache.nifi.controller.leader.election.LeaderElectionManager;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     /**
    - * Uses ZooKeeper in order to determine which node is the elected Cluster 
Coordinator and to indicate
    - * that this node is part of the cluster. However, once the Cluster 
Coordinator is known, heartbeats are
    + * Uses Leader Election Manager in order to determine which node is the 
elected Cluster Coordinator and to indicate
    + * that this node is part of the cluster. Once the Cluster Coordinator is 
known, heartbeats are
      * sent directly to the Cluster Coordinator.
      */
     public class ClusterProtocolHeartbeater implements Heartbeater {
         private static final Logger logger = 
LoggerFactory.getLogger(ClusterProtocolHeartbeater.class);
     
         private final NodeProtocolSender protocolSender;
    -    private final CuratorFramework curatorClient;
    -    private final String nodesPathPrefix;
    -
    -    private final String coordinatorPath;
    -    private volatile String coordinatorAddress;
    +    private final LeaderElectionManager electionManager;
    +    private final ClusterCoordinator clusterCoordinator;
     
    -
    -    public ClusterProtocolHeartbeater(final NodeProtocolSender 
protocolSender, final Properties properties) {
    +    public ClusterProtocolHeartbeater(final NodeProtocolSender 
protocolSender, final ClusterCoordinator clusterCoordinator, final 
LeaderElectionManager electionManager) {
             this.protocolSender = protocolSender;
    -
    -        final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
    -        final ZooKeeperClientConfig zkConfig = 
ZooKeeperClientConfig.createConfig(properties);
    -
    -        curatorClient = 
CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
    -            zkConfig.getSessionTimeoutMillis(), 
zkConfig.getConnectionTimeoutMillis(), retryPolicy);
    -
    -        curatorClient.start();
    -        nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
    -        coordinatorPath = nodesPathPrefix + "/coordinator";
    +        this.clusterCoordinator = clusterCoordinator;
    +        this.electionManager = electionManager;
         }
     
         @Override
         public String getHeartbeatAddress() throws IOException {
    -        final String curAddress = coordinatorAddress;
    -        if (curAddress != null) {
    -            return curAddress;
    +        final String heartbeatAddress = 
electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
    +        if (heartbeatAddress == null) {
    +            throw new ProtocolException("Cannot send heartbeat because 
there is no Cluster Coordinator currently elected");
             }
     
    -        try {
    -            // Get coordinator address and add watcher to change who we 
are heartbeating to if the value changes.
    -            final byte[] coordinatorAddressBytes = 
curatorClient.getData().usingWatcher(new Watcher() {
    -                @Override
    -                public void process(final WatchedEvent event) {
    -                    coordinatorAddress = null;
    -                }
    -            }).forPath(coordinatorPath);
    -            final String address = coordinatorAddress = new 
String(coordinatorAddressBytes, StandardCharsets.UTF_8);
    -
    -            logger.info("Determined that Cluster Coordinator is located at 
{}; will use this address for sending heartbeat messages", address);
    -            return address;
    -        } catch (Exception e) {
    -            throw new IOException("Unable to determine Cluster Coordinator 
from ZooKeeper", e);
    -        }
    +        return heartbeatAddress;
         }
     
    -
         @Override
         public synchronized void send(final HeartbeatMessage heartbeatMessage) 
throws IOException {
             final String heartbeatAddress = getHeartbeatAddress();
    -
    -        try {
    -            protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
    -        } catch (final ProtocolException pe) {
    -            // a ProtocolException is likely the result of not being able 
to communicate
    -            // with the coordinator. If we do get an IOException 
communicating with the coordinator,
    -            // it will be the cause of the Protocol Exception. In this 
case, set coordinatorAddress
    -            // to null so that we double-check next time that the 
coordinator has not changed.
    -            if (pe.getCause() instanceof IOException) {
    -                coordinatorAddress = null;
    +        final HeartbeatResponseMessage responseMessage = 
protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
    +
    +        final byte[] payloadBytes = 
heartbeatMessage.getHeartbeat().getPayload();
    +        final HeartbeatPayload payload = 
HeartbeatPayload.unmarshal(payloadBytes);
    +        final List<NodeConnectionStatus> nodeStatusList = 
payload.getClusterStatus();
    +        final Map<NodeIdentifier, Long> updateIdMap = 
nodeStatusList.stream().collect(
    +            Collectors.toMap(status -> status.getNodeIdentifier(), status 
-> status.getUpdateIdentifier()));
    +
    +        final List<NodeConnectionStatus> updatedStatuses = 
responseMessage.getUpdatedNodeStatuses();
    +        if (updatedStatuses != null) {
    +            for (final NodeConnectionStatus updatedStatus : 
updatedStatuses) {
    +                final NodeIdentifier nodeId = 
updatedStatus.getNodeIdentifier();
    +                final Long updateId = updateIdMap.get(nodeId);
    +
    +                final boolean updated = 
clusterCoordinator.resetNodeStatus(updatedStatus, updateId == null ? -1L : 
updateId);
    +                if (updated) {
    +                    // TODO - REMOVE THE ASTERISKS
    --- End diff --
    
    Lingering Todo, can it be removed?


> Clustered Nodes can become out of sync regarding which node is coordinator
> --------------------------------------------------------------------------
>
>                 Key: NIFI-2566
>                 URL: https://issues.apache.org/jira/browse/NIFI-2566
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>             Fix For: 1.0.0
>
>
> Occasionally, I will see the UI telling me that no Cluster Coordinator has 
> been elected. However, I can see in the logs that the node is sending 
> heartbeats to the coordinator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to