Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/866#discussion_r74833401
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
 ---
    @@ -18,102 +18,80 @@
     package org.apache.nifi.controller.cluster;
     
     import java.io.IOException;
    -import java.nio.charset.StandardCharsets;
    -import java.util.Properties;
    -
    -import org.apache.curator.RetryPolicy;
    -import org.apache.curator.framework.CuratorFramework;
    -import org.apache.curator.framework.CuratorFrameworkFactory;
    -import org.apache.curator.retry.RetryNTimes;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import org.apache.nifi.cluster.coordination.ClusterCoordinator;
    +import org.apache.nifi.cluster.coordination.node.ClusterRoles;
    +import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
    +import org.apache.nifi.cluster.protocol.HeartbeatPayload;
    +import org.apache.nifi.cluster.protocol.NodeIdentifier;
     import org.apache.nifi.cluster.protocol.NodeProtocolSender;
     import org.apache.nifi.cluster.protocol.ProtocolException;
     import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
    -import org.apache.zookeeper.WatchedEvent;
    -import org.apache.zookeeper.Watcher;
    +import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
    +import org.apache.nifi.controller.leader.election.LeaderElectionManager;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     /**
    - * Uses ZooKeeper in order to determine which node is the elected Cluster 
Coordinator and to indicate
    - * that this node is part of the cluster. However, once the Cluster 
Coordinator is known, heartbeats are
    + * Uses Leader Election Manager in order to determine which node is the 
elected Cluster Coordinator and to indicate
    + * that this node is part of the cluster. Once the Cluster Coordinator is 
known, heartbeats are
      * sent directly to the Cluster Coordinator.
      */
     public class ClusterProtocolHeartbeater implements Heartbeater {
         private static final Logger logger = 
LoggerFactory.getLogger(ClusterProtocolHeartbeater.class);
     
         private final NodeProtocolSender protocolSender;
    -    private final CuratorFramework curatorClient;
    -    private final String nodesPathPrefix;
    -
    -    private final String coordinatorPath;
    -    private volatile String coordinatorAddress;
    +    private final LeaderElectionManager electionManager;
    +    private final ClusterCoordinator clusterCoordinator;
     
    -
    -    public ClusterProtocolHeartbeater(final NodeProtocolSender 
protocolSender, final Properties properties) {
    +    public ClusterProtocolHeartbeater(final NodeProtocolSender 
protocolSender, final ClusterCoordinator clusterCoordinator, final 
LeaderElectionManager electionManager) {
             this.protocolSender = protocolSender;
    -
    -        final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
    -        final ZooKeeperClientConfig zkConfig = 
ZooKeeperClientConfig.createConfig(properties);
    -
    -        curatorClient = 
CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
    -            zkConfig.getSessionTimeoutMillis(), 
zkConfig.getConnectionTimeoutMillis(), retryPolicy);
    -
    -        curatorClient.start();
    -        nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
    -        coordinatorPath = nodesPathPrefix + "/coordinator";
    +        this.clusterCoordinator = clusterCoordinator;
    +        this.electionManager = electionManager;
         }
     
         @Override
         public String getHeartbeatAddress() throws IOException {
    -        final String curAddress = coordinatorAddress;
    -        if (curAddress != null) {
    -            return curAddress;
    +        final String heartbeatAddress = 
electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
    +        if (heartbeatAddress == null) {
    +            throw new ProtocolException("Cannot send heartbeat because 
there is no Cluster Coordinator currently elected");
             }
     
    -        try {
    -            // Get coordinator address and add watcher to change who we 
are heartbeating to if the value changes.
    -            final byte[] coordinatorAddressBytes = 
curatorClient.getData().usingWatcher(new Watcher() {
    -                @Override
    -                public void process(final WatchedEvent event) {
    -                    coordinatorAddress = null;
    -                }
    -            }).forPath(coordinatorPath);
    -            final String address = coordinatorAddress = new 
String(coordinatorAddressBytes, StandardCharsets.UTF_8);
    -
    -            logger.info("Determined that Cluster Coordinator is located at 
{}; will use this address for sending heartbeat messages", address);
    -            return address;
    -        } catch (Exception e) {
    -            throw new IOException("Unable to determine Cluster Coordinator 
from ZooKeeper", e);
    -        }
    +        return heartbeatAddress;
         }
     
    -
         @Override
         public synchronized void send(final HeartbeatMessage heartbeatMessage) 
throws IOException {
             final String heartbeatAddress = getHeartbeatAddress();
    -
    -        try {
    -            protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
    -        } catch (final ProtocolException pe) {
    -            // a ProtocolException is likely the result of not being able 
to communicate
    -            // with the coordinator. If we do get an IOException 
communicating with the coordinator,
    -            // it will be the cause of the Protocol Exception. In this 
case, set coordinatorAddress
    -            // to null so that we double-check next time that the 
coordinator has not changed.
    -            if (pe.getCause() instanceof IOException) {
    -                coordinatorAddress = null;
    +        final HeartbeatResponseMessage responseMessage = 
protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
    +
    +        final byte[] payloadBytes = 
heartbeatMessage.getHeartbeat().getPayload();
    +        final HeartbeatPayload payload = 
HeartbeatPayload.unmarshal(payloadBytes);
    +        final List<NodeConnectionStatus> nodeStatusList = 
payload.getClusterStatus();
    +        final Map<NodeIdentifier, Long> updateIdMap = 
nodeStatusList.stream().collect(
    +            Collectors.toMap(status -> status.getNodeIdentifier(), status 
-> status.getUpdateIdentifier()));
    +
    +        final List<NodeConnectionStatus> updatedStatuses = 
responseMessage.getUpdatedNodeStatuses();
    +        if (updatedStatuses != null) {
    +            for (final NodeConnectionStatus updatedStatus : 
updatedStatuses) {
    +                final NodeIdentifier nodeId = 
updatedStatus.getNodeIdentifier();
    +                final Long updateId = updateIdMap.get(nodeId);
    +
    +                final boolean updated = 
clusterCoordinator.resetNodeStatus(updatedStatus, updateId == null ? -1L : 
updateId);
    +                if (updated) {
    +                    // TODO - REMOVE THE ASTERISKS
    --- End diff --
    
    Lingering Todo, can it be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to