Repository: nifi
Updated Branches:
  refs/heads/master b3f36489a -> e42ea9ad4


http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
deleted file mode 100644
index 1146a39..0000000
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.nifi.cluster.protocol.ProtocolException;
-
-/**
- * The payload of the heartbeat. The payload contains status to inform the 
cluster manager the current workload of this node.
- *
- */
-@XmlRootElement
-public class HeartbeatPayload {
-
-    private static final JAXBContext JAXB_CONTEXT;
-
-    static {
-        try {
-            JAXB_CONTEXT = JAXBContext.newInstance(HeartbeatPayload.class);
-        } catch (JAXBException e) {
-            throw new RuntimeException("Unable to create JAXBContext.");
-        }
-    }
-
-    private int activeThreadCount;
-    private long totalFlowFileCount;
-    private long totalFlowFileBytes;
-    private long systemStartTime;
-
-    public int getActiveThreadCount() {
-        return activeThreadCount;
-    }
-
-    public void setActiveThreadCount(final int activeThreadCount) {
-        this.activeThreadCount = activeThreadCount;
-    }
-
-    public long getTotalFlowFileCount() {
-        return totalFlowFileCount;
-    }
-
-    public void setTotalFlowFileCount(final long totalFlowFileCount) {
-        this.totalFlowFileCount = totalFlowFileCount;
-    }
-
-    public long getTotalFlowFileBytes() {
-        return totalFlowFileBytes;
-    }
-
-    public void setTotalFlowFileBytes(final long totalFlowFileBytes) {
-        this.totalFlowFileBytes = totalFlowFileBytes;
-    }
-
-    public long getSystemStartTime() {
-        return systemStartTime;
-    }
-
-    public void setSystemStartTime(final long systemStartTime) {
-        this.systemStartTime = systemStartTime;
-    }
-
-    public byte[] marshal() throws ProtocolException {
-        final ByteArrayOutputStream payloadBytes = new ByteArrayOutputStream();
-        marshal(this, payloadBytes);
-        return payloadBytes.toByteArray();
-    }
-
-    public static void marshal(final HeartbeatPayload payload, final 
OutputStream os) throws ProtocolException {
-        try {
-            final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
-            marshaller.marshal(payload, os);
-        } catch (final JAXBException je) {
-            throw new ProtocolException(je);
-        }
-    }
-
-    public static HeartbeatPayload unmarshal(final InputStream is) throws 
ProtocolException {
-        try {
-            final Unmarshaller unmarshaller = 
JAXB_CONTEXT.createUnmarshaller();
-            return (HeartbeatPayload) unmarshaller.unmarshal(is);
-        } catch (final JAXBException je) {
-            throw new ProtocolException(je);
-        }
-    }
-
-    public static HeartbeatPayload unmarshal(final byte[] bytes) throws 
ProtocolException {
-        try {
-            final Unmarshaller unmarshaller = 
JAXB_CONTEXT.createUnmarshaller();
-            return (HeartbeatPayload) unmarshaller.unmarshal(new 
ByteArrayInputStream(bytes));
-        } catch (final JAXBException je) {
-            throw new ProtocolException(je);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 77c3dd7..efdf152 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -65,7 +65,6 @@ import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.DataAuthorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
@@ -74,6 +73,7 @@ import 
org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
@@ -96,7 +96,6 @@ import 
org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.label.StandardLabel;
-import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
 import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import 
org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
 import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -388,6 +387,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 bulletinRepo,
                 /* cluster coordinator */ null,
                 /* heartbeat monitor */ null,
+            /* leader election manager */ null,
                 /* variable registry */ variableRegistry);
     }
 
@@ -401,7 +401,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             final BulletinRepository bulletinRepo,
             final ClusterCoordinator clusterCoordinator,
             final HeartbeatMonitor heartbeatMonitor,
-            VariableRegistry variableRegistry) {
+        final LeaderElectionManager leaderElectionManager,
+        final VariableRegistry variableRegistry) {
 
         final FlowController flowController = new FlowController(
                 flowFileEventRepo,
@@ -413,7 +414,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 protocolSender,
                 bulletinRepo,
                 clusterCoordinator,
-                heartbeatMonitor, variableRegistry);
+            heartbeatMonitor,
+            leaderElectionManager,
+            variableRegistry);
 
         return flowController;
     }
@@ -429,6 +432,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             final BulletinRepository bulletinRepo,
             final ClusterCoordinator clusterCoordinator,
             final HeartbeatMonitor heartbeatMonitor,
+        final LeaderElectionManager leaderElectionManager,
             final VariableRegistry variableRegistry) {
 
         maxTimerDrivenThreads = new AtomicInteger(10);
@@ -578,10 +582,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         this.connectionStatus = new NodeConnectionStatus(nodeId, 
DisconnectionCode.NOT_YET_CONNECTED);
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
+        this.leaderElectionManager = leaderElectionManager;
 
         if (configuredForClustering) {
-            leaderElectionManager = new CuratorLeaderElectionManager(4, 
properties);
-            heartbeater = new ClusterProtocolHeartbeater(protocolSender, 
properties);
+            heartbeater = new ClusterProtocolHeartbeater(protocolSender, 
clusterCoordinator, leaderElectionManager);
 
             // Check if there is already a cluster coordinator elected. If 
not, go ahead
             // and register for coordinator role. If there is already one 
elected, do not register until
@@ -601,7 +605,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
             leaderElectionManager.start();
         } else {
-            leaderElectionManager = null;
             heartbeater = null;
         }
     }
@@ -3307,6 +3310,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
 
     private void registerForClusterCoordinator() {
+        final String participantId = heartbeatMonitor.getHeartbeatAddress();
+
         leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new 
LeaderElectionStateChangeListener() {
             @Override
             public synchronized void onLeaderRelinquish() {
@@ -3320,25 +3325,19 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 // call start() when we become the leader, and this will 
ensure that initialization is handled. The heartbeat monitor
                 // then will check the zookeeper znode to check if it is the 
cluster coordinator before kicking any nodes out of the
                 // cluster.
-
-                if (clusterCoordinator != null) {
-                    
clusterCoordinator.removeRole(ClusterRoles.CLUSTER_COORDINATOR);
-                }
             }
 
             @Override
             public synchronized void onLeaderElection() {
                 LOG.info("This node elected Active Cluster Coordinator");
                 heartbeatMonitor.start();   // ensure heartbeat monitor is 
started
-
-                if (clusterCoordinator != null) {
-                    
clusterCoordinator.addRole(ClusterRoles.CLUSTER_COORDINATOR);
-                }
             }
-        });
+        }, participantId);
     }
 
     private void registerForPrimaryNode() {
+        final String participantId = heartbeatMonitor.getHeartbeatAddress();
+
         leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new 
LeaderElectionStateChangeListener() {
             @Override
             public void onLeaderElection() {
@@ -3349,7 +3348,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             public void onLeaderRelinquish() {
                 setPrimary(false);
             }
-        });
+        }, participantId);
     }
 
     /**
@@ -3854,7 +3853,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         @Override
         public void run() {
-            try {
+            try (final NarCloseable narCloseable = 
NarCloseable.withFrameworkNar()) {
                 if (heartbeatsSuspended.get()) {
                     return;
                 }
@@ -3916,6 +3915,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             final QueueSize queueSize = 
getTotalFlowFileCount(bean.getRootGroup());
             hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
             hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
+            
hbPayload.setClusterStatus(clusterCoordinator.getConnectionStatuses());
 
             // create heartbeat message
             final NodeIdentifier nodeId = getNodeId();
@@ -3924,15 +3924,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 return null;
             }
 
-            final Set<String> roles = new HashSet<>();
-            if (bean.isPrimary()) {
-                roles.add(ClusterRoles.PRIMARY_NODE);
-            }
-            if (clusterCoordinator.isActiveClusterCoordinator()) {
-                roles.add(ClusterRoles.CLUSTER_COORDINATOR);
-            }
-
-            final Heartbeat heartbeat = new Heartbeat(nodeId, roles, 
connectionStatus, hbPayload.marshal());
+            final Heartbeat heartbeat = new Heartbeat(nodeId, 
connectionStatus, hbPayload.marshal());
             final HeartbeatMessage message = new HeartbeatMessage();
             message.setHeartbeat(heartbeat);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 091e59c..42f239c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -35,7 +35,6 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -853,9 +852,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             // mark the node as clustered
             controller.setClustered(true, response.getInstanceId());
 
-            final NodeConnectionStatus status = 
clusterCoordinator.getConnectionStatus(nodeId);
-            final Set<String> roles = status == null ? Collections.emptySet() 
: status.getRoles();
-            controller.setConnectionStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTED, roles));
+            controller.setConnectionStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTED));
 
             // start the processors as indicated by the dataflow
             controller.onFlowInitialized(autoResumeState);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index d1822ef..cb663e1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -16,6 +16,34 @@
  */
 package org.apache.nifi.controller;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
 import org.apache.nifi.authorization.Authorizer;
@@ -41,7 +69,6 @@ import 
org.apache.nifi.controller.serialization.FlowSynchronizer;
 import org.apache.nifi.controller.serialization.StandardFlowSerializer;
 import org.apache.nifi.controller.service.ControllerServiceLoader;
 import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.fingerprint.FingerprintException;
@@ -66,7 +93,6 @@ import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.file.FileUtils;
 import org.apache.nifi.web.api.dto.ConnectableDTO;
 import org.apache.nifi.web.api.dto.ConnectionDTO;
-import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.apache.nifi.web.api.dto.FunnelDTO;
 import org.apache.nifi.web.api.dto.LabelDTO;
@@ -86,33 +112,6 @@ import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;
 
-import javax.xml.XMLConstants;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPInputStream;
-
 /**
  */
 public class StandardFlowSynchronizer implements FlowSynchronizer {
@@ -360,10 +359,6 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
 
                             // enable all the original controller services
                             
ControllerServiceLoader.enableControllerServices(controllerServices, 
controller, encryptor, autoResumeState);
-                        } else {
-                            for (final Element serviceElement : 
serviceElements) {
-                                updateControllerService(controller, 
serviceElement, encryptor);
-                            }
                         }
                     }
 
@@ -505,22 +500,6 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         return baos.toByteArray();
     }
 
-    private void updateControllerService(final FlowController controller, 
final Element controllerServiceElement, final StringEncryptor encryptor) {
-        final ControllerServiceDTO dto = 
FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
-
-        final ControllerServiceState dtoState = 
ControllerServiceState.valueOf(dto.getState());
-        final boolean dtoEnabled = (dtoState == ControllerServiceState.ENABLED 
|| dtoState == ControllerServiceState.ENABLING);
-
-        final ControllerServiceNode serviceNode = 
controller.getControllerServiceNode(dto.getId());
-        final ControllerServiceState serviceState = serviceNode.getState();
-        final boolean serviceEnabled = (serviceState == 
ControllerServiceState.ENABLED || serviceState == 
ControllerServiceState.ENABLING);
-
-        if (dtoEnabled && !serviceEnabled) {
-            
controller.enableControllerService(controller.getControllerServiceNode(dto.getId()));
-        } else if (!dtoEnabled && serviceEnabled) {
-            
controller.disableControllerService(controller.getControllerServiceNode(dto.getId()));
-        }
-    }
 
     private ReportingTaskNode getOrCreateReportingTask(final FlowController 
controller, final ReportingTaskDTO dto, final boolean controllerInitialized, 
final boolean existingFlowEmpty)
             throws ReportingTaskInstantiationException {
@@ -665,12 +644,6 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         // get the real process group and ID
         final ProcessGroup processGroup = 
controller.getGroup(processGroupDto.getId());
 
-        // Update Controller Services
-        final List<Element> serviceNodeList = 
getChildrenByTagName(processGroupElement, "controllerService");
-        for (final Element serviceNodeElement : serviceNodeList) {
-            updateControllerService(controller, serviceNodeElement, encryptor);
-        }
-
         // processors & ports cannot be updated - they must be the same. 
Except for the scheduled state.
         final List<Element> processorNodeList = 
getChildrenByTagName(processGroupElement, "processor");
         for (final Element processorElement : processorNodeList) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
index 0240318..d675d0c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
@@ -18,102 +18,79 @@
 package org.apache.nifi.controller.cluster;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Properties;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.HeartbeatPayload;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Uses ZooKeeper in order to determine which node is the elected Cluster 
Coordinator and to indicate
- * that this node is part of the cluster. However, once the Cluster 
Coordinator is known, heartbeats are
+ * Uses Leader Election Manager in order to determine which node is the 
elected Cluster Coordinator and to indicate
+ * that this node is part of the cluster. Once the Cluster Coordinator is 
known, heartbeats are
  * sent directly to the Cluster Coordinator.
  */
 public class ClusterProtocolHeartbeater implements Heartbeater {
     private static final Logger logger = 
LoggerFactory.getLogger(ClusterProtocolHeartbeater.class);
 
     private final NodeProtocolSender protocolSender;
-    private final CuratorFramework curatorClient;
-    private final String nodesPathPrefix;
-
-    private final String coordinatorPath;
-    private volatile String coordinatorAddress;
+    private final LeaderElectionManager electionManager;
+    private final ClusterCoordinator clusterCoordinator;
 
-
-    public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, 
final Properties properties) {
+    public ClusterProtocolHeartbeater(final NodeProtocolSender protocolSender, 
final ClusterCoordinator clusterCoordinator, final LeaderElectionManager 
electionManager) {
         this.protocolSender = protocolSender;
-
-        final RetryPolicy retryPolicy = new RetryNTimes(10, 500);
-        final ZooKeeperClientConfig zkConfig = 
ZooKeeperClientConfig.createConfig(properties);
-
-        curatorClient = 
CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
-            zkConfig.getSessionTimeoutMillis(), 
zkConfig.getConnectionTimeoutMillis(), retryPolicy);
-
-        curatorClient.start();
-        nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
-        coordinatorPath = nodesPathPrefix + "/coordinator";
+        this.clusterCoordinator = clusterCoordinator;
+        this.electionManager = electionManager;
     }
 
     @Override
     public String getHeartbeatAddress() throws IOException {
-        final String curAddress = coordinatorAddress;
-        if (curAddress != null) {
-            return curAddress;
+        final String heartbeatAddress = 
electionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
+        if (heartbeatAddress == null) {
+            throw new ProtocolException("Cannot send heartbeat because there 
is no Cluster Coordinator currently elected");
         }
 
-        try {
-            // Get coordinator address and add watcher to change who we are 
heartbeating to if the value changes.
-            final byte[] coordinatorAddressBytes = 
curatorClient.getData().usingWatcher(new Watcher() {
-                @Override
-                public void process(final WatchedEvent event) {
-                    coordinatorAddress = null;
-                }
-            }).forPath(coordinatorPath);
-            final String address = coordinatorAddress = new 
String(coordinatorAddressBytes, StandardCharsets.UTF_8);
-
-            logger.info("Determined that Cluster Coordinator is located at {}; 
will use this address for sending heartbeat messages", address);
-            return address;
-        } catch (Exception e) {
-            throw new IOException("Unable to determine Cluster Coordinator 
from ZooKeeper", e);
-        }
+        return heartbeatAddress;
     }
 
-
     @Override
     public synchronized void send(final HeartbeatMessage heartbeatMessage) 
throws IOException {
         final String heartbeatAddress = getHeartbeatAddress();
-
-        try {
-            protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
-        } catch (final ProtocolException pe) {
-            // a ProtocolException is likely the result of not being able to 
communicate
-            // with the coordinator. If we do get an IOException communicating 
with the coordinator,
-            // it will be the cause of the Protocol Exception. In this case, 
set coordinatorAddress
-            // to null so that we double-check next time that the coordinator 
has not changed.
-            if (pe.getCause() instanceof IOException) {
-                coordinatorAddress = null;
+        final HeartbeatResponseMessage responseMessage = 
protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
+
+        final byte[] payloadBytes = 
heartbeatMessage.getHeartbeat().getPayload();
+        final HeartbeatPayload payload = 
HeartbeatPayload.unmarshal(payloadBytes);
+        final List<NodeConnectionStatus> nodeStatusList = 
payload.getClusterStatus();
+        final Map<NodeIdentifier, Long> updateIdMap = 
nodeStatusList.stream().collect(
+            Collectors.toMap(status -> status.getNodeIdentifier(), status -> 
status.getUpdateIdentifier()));
+
+        final List<NodeConnectionStatus> updatedStatuses = 
responseMessage.getUpdatedNodeStatuses();
+        if (updatedStatuses != null) {
+            for (final NodeConnectionStatus updatedStatus : updatedStatuses) {
+                final NodeIdentifier nodeId = 
updatedStatus.getNodeIdentifier();
+                final Long updateId = updateIdMap.get(nodeId);
+
+                final boolean updated = 
clusterCoordinator.resetNodeStatus(updatedStatus, updateId == null ? -1L : 
updateId);
+                if (updated) {
+                    logger.info("After receiving heartbeat response, updated 
status of {} to {}", updatedStatus.getNodeIdentifier(), updatedStatus);
+                } else {
+                    logger.debug("After receiving heartbeat response, did not 
update status of {} to {} because the update is out-of-date", 
updatedStatus.getNodeIdentifier(), updatedStatus);
+                }
             }
-
-            throw pe;
         }
     }
 
-
     @Override
     public void close() throws IOException {
-        if (curatorClient != null) {
-            curatorClient.close();
-        }
-
-        logger.info("ZooKeeper heartbeater closed. Will no longer send 
Heartbeat messages to ZooKeeper");
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index 7bf7494..1435182 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -21,12 +21,14 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
 import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
 import 
org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryForever;
 import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
@@ -47,7 +49,7 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
     private volatile boolean stopped = true;
 
     private final Map<String, LeaderRole> leaderRoles = new HashMap<>();
-    private final Map<String, LeaderElectionStateChangeListener> 
registeredRoles = new HashMap<>();
+    private final Map<String, RegisteredRole> registeredRoles = new 
HashMap<>();
 
 
     public CuratorLeaderElectionManager(final int threadPoolSize) {
@@ -82,8 +84,9 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
         // Call #register for each already-registered role. This will
         // cause us to start listening for leader elections for that
         // role again
-        for (final Map.Entry<String, LeaderElectionStateChangeListener> entry 
: registeredRoles.entrySet()) {
-            register(entry.getKey(), entry.getValue());
+        for (final Map.Entry<String, RegisteredRole> entry : 
registeredRoles.entrySet()) {
+            final RegisteredRole role = entry.getValue();
+            register(entry.getKey(), role.getListener(), 
role.getParticipantId());
         }
 
         logger.info("{} started", this);
@@ -97,7 +100,12 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
 
 
     @Override
-    public synchronized void register(final String roleName, final 
LeaderElectionStateChangeListener listener) {
+    public void register(String roleName, LeaderElectionStateChangeListener 
listener) {
+        register(roleName, listener, null);
+    }
+
+    @Override
+    public synchronized void register(final String roleName, final 
LeaderElectionStateChangeListener listener, final String participantId) {
         logger.debug("{} Registering new Leader Selector for role {}", this, 
roleName);
 
         if (leaderRoles.containsKey(roleName)) {
@@ -114,18 +122,23 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
             throw new IllegalStateException("Cannot register leader election 
for role '" + roleName + "' because this is not a valid role name");
         }
 
-        registeredRoles.put(roleName, listener);
+        registeredRoles.put(roleName, new RegisteredRole(participantId, 
listener));
 
         if (!isStopped()) {
             final ElectionListener electionListener = new 
ElectionListener(roleName, listener);
             final LeaderSelector leaderSelector = new 
LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, 
electionListener);
             leaderSelector.autoRequeue();
+            if (participantId != null) {
+                leaderSelector.setId(participantId);
+            }
+
             leaderSelector.start();
 
             final LeaderRole leaderRole = new LeaderRole(leaderSelector, 
electionListener);
 
             leaderRoles.put(roleName, leaderRole);
         }
+
         logger.info("{} Registered new Leader Selector for role {}", this, 
roleName);
     }
 
@@ -185,6 +198,32 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
         return role.isLeader();
     }
 
+    @Override
+    public synchronized String getLeader(final String roleName) {
+        final LeaderRole role = leaderRoles.get(roleName);
+        if (role == null) {
+            return null;
+        }
+
+        Participant participant;
+        try {
+            participant = role.getLeaderSelector().getLeader();
+        } catch (Exception e) {
+            logger.debug("Unable to determine leader for role '{}'; returning 
null", roleName);
+            return null;
+        }
+
+        if (participant == null) {
+            return null;
+        }
+
+        final String participantId = participant.getId();
+        if (StringUtils.isEmpty(participantId)) {
+            return null;
+        }
+
+        return participantId;
+    }
 
     private static class LeaderRole {
         private final LeaderSelector leaderSelector;
@@ -204,6 +243,23 @@ public class CuratorLeaderElectionManager implements 
LeaderElectionManager {
         }
     }
 
+    private static class RegisteredRole {
+        private final LeaderElectionStateChangeListener listener;
+        private final String participantId;
+
+        public RegisteredRole(final String participantId, final 
LeaderElectionStateChangeListener listener) {
+            this.participantId = participantId;
+            this.listener = listener;
+        }
+
+        public LeaderElectionStateChangeListener getListener() {
+            return listener;
+        }
+
+        public String getParticipantId() {
+            return participantId;
+        }
+    }
 
     private class ElectionListener extends LeaderSelectorListenerAdapter 
implements LeaderSelectorListener {
         private final String roleName;

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
index d16dbdb..ef36528 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
@@ -31,7 +31,7 @@ public interface LeaderElectionManager {
     void register(String roleName);
 
     /**
-     * Adds a new role for which a leader is required
+     * Adds a new role for which a leader is required, without providing a 
Participant ID
      *
      * @param roleName the name of the role
      * @param listener a listener that will be called when the node gains or 
relinquishes
@@ -40,6 +40,28 @@ public interface LeaderElectionManager {
     void register(String roleName, LeaderElectionStateChangeListener listener);
 
     /**
+     * Adds a new role for which a leader is required, providing the given 
value for this node as the Participant ID
+     *
+     * @param roleName the name of the role
+     * @param listener a listener that will be called when the node gains or 
relinquishes
+     *            the role of leader
+     * @param participantId the ID to register as this node's Participant ID. 
All nodes will see this as the identifier when
+     *            asking to see who the leader is via the {@link 
#getLeader(String)} method
+     */
+    void register(String roleName, LeaderElectionStateChangeListener listener, 
String participantId);
+
+    /**
+     * Returns the Participant ID of the node that is elected the leader, if 
one was provided when the node registered
+     * for the role via {@link #register(String, 
LeaderElectionStateChangeListener, String)}. If there is currently no leader
+     * known or if the role was registered without providing a Participant ID, 
this will return <code>null</code>.
+     *
+     * @param roleName the name of the role
+     * @return the Participant ID of the node that is elected leader, or 
<code>null</code> if either no leader is known or the leader
+     *         did not register with a Participant ID.
+     */
+    String getLeader(String roleName);
+
+    /**
      * Removes the role with the given name from this manager. If this
      * node is the elected leader for the given role, this node will relinquish
      * the leadership role

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
new file mode 100644
index 0000000..a2ed86e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/StandaloneLeaderElectionManager.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.leader.election;
+
+/**
+ * <p>
+ * A LeaderElectionManager to use when running a standalone (un-clustered) 
NiFi instance
+ * </p>
+ */
+public class StandaloneLeaderElectionManager implements LeaderElectionManager {
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void register(final String roleName) {
+    }
+
+    @Override
+    public void register(final String roleName, final 
LeaderElectionStateChangeListener listener) {
+    }
+
+    @Override
+    public void register(final String roleName, final 
LeaderElectionStateChangeListener listener, final String participantId) {
+    }
+
+    @Override
+    public String getLeader(final String roleName) {
+        return null;
+    }
+
+    @Override
+    public void unregister(final String roleName) {
+    }
+
+    @Override
+    public boolean isLeader(final String roleName) {
+        return false;
+    }
+
+    @Override
+    public boolean isStopped() {
+        return false;
+    }
+
+    @Override
+    public void stop() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index 88fbcdd..d50d31b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -908,6 +908,7 @@ public final class FingerprintFactory {
         builder.append(dto.getName());
         builder.append(dto.getComments());
         builder.append(dto.getAnnotationData());
+        builder.append(dto.getState());
 
         final Map<String, String> properties = dto.getProperties();
         if (properties == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
index 2760ca9..7ed9187 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
@@ -22,13 +22,12 @@ import 
org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.util.NiFiProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.FactoryBean;
 import org.springframework.context.ApplicationContext;
@@ -40,8 +39,6 @@ import org.springframework.context.ApplicationContextAware;
 @SuppressWarnings("rawtypes")
 public class FlowControllerFactoryBean implements FactoryBean, 
ApplicationContextAware {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(FlowControllerFactoryBean.class);
-
     private ApplicationContext applicationContext;
     private FlowController flowController;
     private NiFiProperties properties;
@@ -51,6 +48,7 @@ public class FlowControllerFactoryBean implements 
FactoryBean, ApplicationContex
     private BulletinRepository bulletinRepository;
     private ClusterCoordinator clusterCoordinator;
     private VariableRegistry variableRegistry;
+    private LeaderElectionManager leaderElectionManager;
 
     @Override
     public Object getObject() throws Exception {
@@ -69,7 +67,9 @@ public class FlowControllerFactoryBean implements 
FactoryBean, ApplicationContex
                     nodeProtocolSender,
                     bulletinRepository,
                     clusterCoordinator,
-                    heartbeatMonitor, variableRegistry);
+                    heartbeatMonitor,
+                    leaderElectionManager,
+                    variableRegistry);
             } else {
                 flowController = FlowController.createStandaloneInstance(
                     flowFileEventRepository,
@@ -129,4 +129,8 @@ public class FlowControllerFactoryBean implements 
FactoryBean, ApplicationContex
     public void setClusterCoordinator(final ClusterCoordinator 
clusterCoordinator) {
         this.clusterCoordinator = clusterCoordinator;
     }
+
+    public void setLeaderElectionManager(final LeaderElectionManager 
leaderElectionManager) {
+        this.leaderElectionManager = leaderElectionManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/LeaderElectionManagerFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/LeaderElectionManagerFactoryBean.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/LeaderElectionManagerFactoryBean.java
new file mode 100644
index 0000000..f17cf1b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/LeaderElectionManagerFactoryBean.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.spring;
+
+import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import 
org.apache.nifi.controller.leader.election.StandaloneLeaderElectionManager;
+import org.apache.nifi.util.NiFiProperties;
+import org.springframework.beans.factory.FactoryBean;
+
+public class LeaderElectionManagerFactoryBean implements 
FactoryBean<LeaderElectionManager> {
+    private int numThreads;
+    private NiFiProperties properties;
+
+    @Override
+    public LeaderElectionManager getObject() throws Exception {
+        final boolean isNode = properties.isNode();
+        if (isNode) {
+            return new CuratorLeaderElectionManager(numThreads, properties);
+        } else {
+            return new StandaloneLeaderElectionManager();
+        }
+    }
+
+    @Override
+    public Class<?> getObjectType() {
+        return LeaderElectionManager.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public void setNumThreads(final int numThreads) {
+        this.numThreads = numThreads;
+    }
+
+    public void setProperties(final NiFiProperties properties) {
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
index f03e26c..1503208 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
@@ -47,6 +47,7 @@
         <property name="bulletinRepository" ref="bulletinRepository" />
         <property name="clusterCoordinator" ref="clusterCoordinator" />
         <property name="variableRegistry" ref="variableRegistry"/>
+        <property name="leaderElectionManager" ref="leaderElectionManager" />
     </bean>
 
     <!-- flow service -->

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java
index af73eef..429a791 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 
+import org.apache.nifi.cluster.protocol.HeartbeatPayload;
 import org.apache.nifi.util.NiFiProperties;
 import org.junit.Before;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
index b25c90b..cbb96b1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
@@ -18,10 +18,14 @@ package org.apache.nifi.nar;
 
 import java.io.Closeable;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  *
  */
 public class NarCloseable implements Closeable {
+    private static final Logger logger = 
LoggerFactory.getLogger(NarCloseable.class);
 
     public static NarCloseable withNarLoader() {
         final ClassLoader current = 
Thread.currentThread().getContextClassLoader();
@@ -29,6 +33,31 @@ public class NarCloseable implements Closeable {
         return new NarCloseable(current);
     }
 
+    /**
+     * Creates a Closeable object that can be used to to switch to current 
class loader to the framework class loader
+     * and will automatically set the ClassLoader back to the previous class 
loader when closed
+     *
+     * @return a NarCloseable
+     */
+    public static NarCloseable withFrameworkNar() {
+        final ClassLoader frameworkClassLoader;
+        try {
+            frameworkClassLoader = 
NarClassLoaders.getInstance().getFrameworkClassLoader();
+        } catch (final Exception e) {
+            // This should never happen in a running instance, but it will 
occur in unit tests
+            logger.error("Unable to access Framework ClassLoader due to " + e 
+ ". Will continue without change ClassLoaders.");
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+
+            return new NarCloseable(null);
+        }
+
+        final ClassLoader current = 
Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(frameworkClassLoader);
+        return new NarCloseable(current);
+    }
+
     private final ClassLoader toSet;
 
     private NarCloseable(final ClassLoader toSet) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index d336b51..2b269a7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -42,6 +42,7 @@ import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
+import org.apache.nifi.cluster.coordination.node.ClusterRoles;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -65,6 +66,7 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceReference;
@@ -261,6 +263,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     private AccessPolicyDAO accessPolicyDAO;
     private ClusterCoordinator clusterCoordinator;
     private HeartbeatMonitor heartbeatMonitor;
+    private LeaderElectionManager leaderElectionManager;
 
     // administrative services
     private AuditService auditService;
@@ -3116,19 +3119,10 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         clusterDto.setGenerated(new Date());
 
         // create node dtos
-        final Collection<NodeDTO> nodeDtos = new ArrayList<>();
+        final List<NodeDTO> nodeDtos = 
clusterCoordinator.getNodeIdentifiers().stream()
+            .map(nodeId -> getNode(nodeId))
+            .collect(Collectors.toList());
         clusterDto.setNodes(nodeDtos);
-        for (final NodeIdentifier nodeId : 
clusterCoordinator.getNodeIdentifiers()) {
-            final NodeConnectionStatus status = 
clusterCoordinator.getConnectionStatus(nodeId);
-            if (status == null) {
-                continue;
-            }
-
-            final List<NodeEvent> events = 
clusterCoordinator.getNodeEvents(nodeId);
-            final Set<String> nodeRoles = 
clusterCoordinator.getConnectionStatus(nodeId).getRoles();
-            final NodeHeartbeat heartbeat = 
heartbeatMonitor.getLatestHeartbeat(nodeId);
-            nodeDtos.add(dtoFactory.createNodeDTO(nodeId, status, heartbeat, 
events, nodeRoles));
-        }
 
         return clusterDto;
     }
@@ -3142,11 +3136,29 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     private NodeDTO getNode(final NodeIdentifier nodeId) {
         final NodeConnectionStatus nodeStatus = 
clusterCoordinator.getConnectionStatus(nodeId);
         final List<NodeEvent> events = 
clusterCoordinator.getNodeEvents(nodeId);
-        final Set<String> roles = 
clusterCoordinator.getConnectionStatus(nodeId).getRoles();
+        final Set<String> roles = getRoles(nodeId);
         final NodeHeartbeat heartbeat = 
heartbeatMonitor.getLatestHeartbeat(nodeId);
         return dtoFactory.createNodeDTO(nodeId, nodeStatus, heartbeat, events, 
roles);
     }
 
+    private Set<String> getRoles(final NodeIdentifier nodeId) {
+        final Set<String> roles = new HashSet<>();
+        final String nodeAddress = nodeId.getSocketAddress() + ":" + 
nodeId.getSocketPort();
+
+        for (final String roleName : ClusterRoles.getAllRoles()) {
+            final String leader = leaderElectionManager.getLeader(roleName);
+            if (leader == null) {
+                continue;
+            }
+
+            if (leader.equals(nodeAddress)) {
+                roles.add(roleName);
+            }
+        }
+
+        return roles;
+    }
+
     @Override
     public void deleteNode(final String nodeId) {
         final NiFiUser user = NiFiUserUtils.getNiFiUser();
@@ -3290,4 +3302,8 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
     public void setBulletinRepository(final BulletinRepository 
bulletinRepository) {
         this.bulletinRepository = bulletinRepository;
     }
+
+    public void setLeaderElectionManager(final LeaderElectionManager 
leaderElectionManager) {
+        this.leaderElectionManager = leaderElectionManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e42ea9ad/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
index 28dbb62..614043a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
@@ -163,6 +163,7 @@
         <property name="clusterCoordinator" ref="clusterCoordinator"/>
         <property name="heartbeatMonitor" ref="heartbeatMonitor" />
         <property name="bulletinRepository" ref="bulletinRepository"/>
+        <property name="leaderElectionManager" ref="leaderElectionManager" />
     </bean>
     
     <!-- component ui extension configuration context -->

Reply via email to