NIFI-1966: When cluster is started up, do not assume that Cluster Coordinator 
has the golden copy of the flow but instead wait for some period of time or 
until the required number of nodes have connected, and then choose which flow 
is correct. This closes #977


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a7e76cc0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a7e76cc0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a7e76cc0

Branch: refs/heads/master
Commit: a7e76cc00a13815d899267ce9e4ca1ad6cf51ce1
Parents: 7a45193
Author: Mark Payne <[email protected]>
Authored: Tue Aug 30 16:59:50 2016 -0400
Committer: Matt Gilman <[email protected]>
Committed: Tue Sep 6 16:31:37 2016 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/util/NiFiProperties.java    |  15 +-
 .../src/main/asciidoc/administration-guide.adoc |  23 +-
 .../coordination/ClusterCoordinator.java        |  10 +
 .../cluster/protocol/ConnectionRequest.java     |   8 +-
 .../cluster/protocol/ConnectionResponse.java    |  10 +-
 .../jaxb/message/AdaptedConnectionRequest.java  |  11 +
 .../jaxb/message/ConnectionRequestAdapter.java  |   3 +-
 .../jaxb/message/ConnectionResponseAdapter.java |   2 +-
 .../message/HeartbeatResponseMessage.java       |  10 +-
 .../cluster/coordination/flow/FlowElection.java |  75 ++++++
 .../flow/PopularVoteFlowElection.java           | 240 +++++++++++++++++++
 .../PopularVoteFlowElectionFactoryBean.java     |  66 +++++
 .../heartbeat/AbstractHeartbeatMonitor.java     |   3 +-
 .../ClusterProtocolHeartbeatMonitor.java        |  10 +-
 .../node/NodeClusterCoordinator.java            | 154 +++++++++---
 .../NodeClusterCoordinatorFactoryBean.java      |   4 +-
 .../resources/nifi-cluster-manager-context.xml  |   4 +
 .../flow/TestPopularVoteFlowElection.java       | 104 ++++++++
 .../heartbeat/TestAbstractHeartbeatMonitor.java |  10 +
 .../node/TestNodeClusterCoordinator.java        |  55 ++++-
 .../nifi/cluster/integration/Cluster.java       |  20 +-
 .../apache/nifi/cluster/integration/Node.java   |  14 +-
 .../src/test/resources/conf/empty-flow.xml      |  27 +++
 .../src/test/resources/conf/non-empty-flow.xml  |  25 ++
 .../apache/nifi/controller/FlowController.java  | 103 ++++----
 .../nifi/controller/StandardFlowService.java    |  42 ++--
 .../cluster/ClusterProtocolHeartbeater.java     |  24 +-
 .../election/CuratorLeaderElectionManager.java  |   7 +-
 .../nifi/fingerprint/FingerprintFactory.java    |  36 ++-
 .../nifi-framework/nifi-resources/pom.xml       |   2 +
 .../src/main/resources/conf/nifi.properties     |   2 +
 .../nifi/web/api/ApplicationResource.java       | 101 +++++---
 .../src/main/resources/nifi-web-api-context.xml |  23 ++
 .../src/main/webapp/js/nf/nf-common.js          |   2 +
 34 files changed, 1052 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 619c104..3fab706 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -169,6 +169,8 @@ public abstract class NiFiProperties {
     public static final String CLUSTER_NODE_CONNECTION_TIMEOUT = 
"nifi.cluster.node.connection.timeout";
     public static final String CLUSTER_NODE_READ_TIMEOUT = 
"nifi.cluster.node.read.timeout";
     public static final String CLUSTER_FIREWALL_FILE = 
"nifi.cluster.firewall.file";
+    public static final String FLOW_ELECTION_MAX_WAIT_TIME = 
"nifi.cluster.flow.election.max.wait.time";
+    public static final String FLOW_ELECTION_MAX_CANDIDATES = 
"nifi.cluster.flow.election.max.candidates";
 
     // zookeeper properties
     public static final String ZOOKEEPER_CONNECT_STRING = 
"nifi.zookeeper.connect.string";
@@ -239,6 +241,7 @@ public abstract class NiFiProperties {
     // cluster node defaults
     public static final int DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS = 2;
     public static final String DEFAULT_REQUEST_REPLICATION_CLAIM_TIMEOUT = "15 
secs";
+    public static final String DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME = "5 mins";
 
     // state management defaults
     public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = 
"conf/state-management.xml";
@@ -309,12 +312,12 @@ public abstract class NiFiProperties {
 
     public Integer getIntegerProperty(final String propertyName, final Integer 
defaultValue) {
         final String value = getProperty(propertyName);
-        if (value == null) {
+        if (value == null || value.trim().isEmpty()) {
             return defaultValue;
         }
 
         try {
-            return Integer.parseInt(getProperty(propertyName));
+            return Integer.parseInt(value.trim());
         } catch (final Exception e) {
             return defaultValue;
         }
@@ -935,6 +938,14 @@ public abstract class NiFiProperties {
         return getProperty(FLOW_CONFIGURATION_ARCHIVE_DIR);
     }
 
+    public String getFlowElectionMaxWaitTime() {
+        return getProperty(FLOW_ELECTION_MAX_WAIT_TIME, 
DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME);
+    }
+
+    public Integer getFlowElectionMaxCandidates() {
+        return getIntegerProperty(FLOW_ELECTION_MAX_CANDIDATES, null);
+    }
+
     public String getFlowConfigurationArchiveMaxTime() {
         return getProperty(FLOW_CONFIGURATION_ARCHIVE_MAX_TIME, 
DEFAULT_FLOW_CONFIGURATION_ARCHIVE_MAX_TIME);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index eb0599b..5f0301e 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -1171,6 +1171,20 @@ There are cases where a DFM may wish to continue making 
changes to the flow, eve
 In this case, they DFM may elect to remove the node from the cluster entirely 
through the Cluster Management dialog. Once removed,
 the node cannot be rejoined to the cluster until it has been restarted.
 
+*Flow Election* +
+When a cluster first starts up, NiFi must determine which of the nodes have the
+"correct" version of the flow. This is done by voting on the flows that each 
of the nodes has. When a node
+attempts to connect to a cluster, it provides a copy of its local flow to the 
Cluster Coordinator. If no flow
+has yet been elected the "correct" flow, the node's flow is compared to each 
of the other Nodes' flows. If another
+Node's flow matches this one, a vote is cast for this flow. If no other Node 
has reported the same flow yet, this
+flow will be added to the pool of possibly elected flows with one vote. After
+some amount of time has elapsed (configured by setting the 
`nifi.cluster.flow.election.max.wait.time` property) or
+some number of Nodes have cast votes (configured by setting the 
`nifi.cluster.flow.election.max.candidates` property),
+a flow is elected to be the "correct" copy of the flow. All nodes that have 
incompatible flows are then disconnected
+from the cluster while those with compatible flows inherit the cluster's flow. 
Election is performed according to
+the "popular vote" with the caveat that the winner will never be an "empty 
flow" unless all flows are empty. This
+allows an administrator to remove a node's `flow.xml.gz` file and restart the 
node, knowing that the node's flow will
+not be voted to be the "correct" flow unless no other flow is found.
 
 *Basic Cluster Setup* +
 
@@ -1204,8 +1218,13 @@ For each Node, the minimum properties to configure are 
as follows:
    that should be used for storing data. The default value is _/root_. This is 
important to set correctly, as which cluster
    the NiFi instance attempts to join is determined by which ZooKeeper 
instance it connects to and the ZooKeeper Root Node
    that is specified.
-** nifi.cluster.request.replication.claim.timeout - Specifies how long a 
component can be 'locked' during a request replication
-   before the lock expires and is automatically unlocked. See 
<<claim_management>> for more information.
+** nifi.cluster.flow.election.max.wait.time - Specifies the amount of time to 
wait before electing a Flow as the "correct" Flow.
+   If the number of Nodes that have voted is equal to the number specified by 
the `nifi.cluster.flow.election.max.candidates`
+   property, the cluster will not wait this long. The default is 5 minutes. 
Note that the time starts as soon as the first vote
+   is cast.
+** nifi.cluster.flow.election.max.candidates - Specifies the number of Nodes 
required in the cluster to cause early election
+   of Flows. This allows the Nodes in the cluster to avoid having to wait a 
long time before starting processing if we reach
+   at least this number of nodes in the cluster.
 
 Now, it is possible to start up the cluster. It does not matter which order 
the instances start up. Navigate to the URL for
 one of the nodes, and the User Interface should look similar to the following:

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 49c6142..723a374 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -226,4 +226,14 @@ public interface ClusterCoordinator {
      * @return <code>true</code> if connected, <code>false</code> otherwise
      */
     boolean isConnected();
+
+    /**
+     * @return <code>true</code> if Flow Election is complete, 
<code>false</code> otherwise
+     */
+    boolean isFlowElectionComplete();
+
+    /**
+     * @return the current status of Flow Election.
+     */
+    String getFlowElectionStatus();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
index 0e27155..2e87402 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
@@ -28,16 +28,22 @@ import 
org.apache.nifi.cluster.protocol.jaxb.message.ConnectionRequestAdapter;
 public class ConnectionRequest {
 
     private final NodeIdentifier proposedNodeIdentifier;
+    private final DataFlow dataFlow;
 
-    public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier) {
+    public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier, 
final DataFlow dataFlow) {
         if (proposedNodeIdentifier == null) {
             throw new IllegalArgumentException("Proposed node identifier may 
not be null.");
         }
+
         this.proposedNodeIdentifier = proposedNodeIdentifier;
+        this.dataFlow = dataFlow;
     }
 
     public NodeIdentifier getProposedNodeIdentifier() {
         return proposedNodeIdentifier;
     }
 
+    public DataFlow getDataFlow() {
+        return dataFlow;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
index 4e572af..c0717a9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
@@ -49,9 +49,8 @@ public class ConnectionResponse {
 
         if (nodeIdentifier == null) {
             throw new IllegalArgumentException("Node identifier may not be 
empty or null.");
-        } else if (dataFlow == null) {
-            throw new IllegalArgumentException("DataFlow may not be null.");
         }
+
         this.nodeIdentifier = nodeIdentifier;
         this.dataFlow = dataFlow;
         this.tryLaterSeconds = 0;
@@ -61,14 +60,14 @@ public class ConnectionResponse {
         this.componentRevisions = componentRevisions == null ? 
Collections.emptyList() : Collections.unmodifiableList(new 
ArrayList<>(componentRevisions));
     }
 
-    public ConnectionResponse(final int tryLaterSeconds) {
+    public ConnectionResponse(final int tryLaterSeconds, final String 
explanation) {
         if (tryLaterSeconds <= 0) {
-            throw new IllegalArgumentException("Try-Later seconds may not be 
nonnegative: " + tryLaterSeconds);
+            throw new IllegalArgumentException("Try-Later seconds must be 
nonnegative: " + tryLaterSeconds);
         }
         this.dataFlow = null;
         this.nodeIdentifier = null;
         this.tryLaterSeconds = tryLaterSeconds;
-        this.rejectionReason = null;
+        this.rejectionReason = explanation;
         this.instanceId = null;
         this.nodeStatuses = null;
         this.componentRevisions = null;
@@ -120,7 +119,6 @@ public class ConnectionResponse {
         return instanceId;
     }
 
-
     public List<NodeConnectionStatus> getNodeConnectionStatuses() {
         return nodeStatuses;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
index 85f4a3f..06ce14a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedConnectionRequest.java
@@ -17,6 +17,8 @@
 package org.apache.nifi.cluster.protocol.jaxb.message;
 
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
 /**
@@ -24,6 +26,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
 public class AdaptedConnectionRequest {
 
     private NodeIdentifier nodeIdentifier;
+    private DataFlow dataFlow;
 
     public AdaptedConnectionRequest() {
     }
@@ -37,4 +40,12 @@ public class AdaptedConnectionRequest {
         this.nodeIdentifier = nodeIdentifier;
     }
 
+    @XmlJavaTypeAdapter(DataFlowAdapter.class)
+    public DataFlow getDataFlow() {
+        return dataFlow;
+    }
+
+    public void setDataFlow(final DataFlow dataFlow) {
+        this.dataFlow = dataFlow;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
index 21f9770..0faea39 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionRequestAdapter.java
@@ -28,13 +28,14 @@ public class ConnectionRequestAdapter extends 
XmlAdapter<AdaptedConnectionReques
         final AdaptedConnectionRequest aCr = new AdaptedConnectionRequest();
         if (cr != null) {
             aCr.setNodeIdentifier(cr.getProposedNodeIdentifier());
+            aCr.setDataFlow(cr.getDataFlow());
         }
         return aCr;
     }
 
     @Override
     public ConnectionRequest unmarshal(final AdaptedConnectionRequest aCr) {
-        return new ConnectionRequest(aCr.getNodeIdentifier());
+        return new ConnectionRequest(aCr.getNodeIdentifier(), 
aCr.getDataFlow());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
index 470843e..4b4c0fc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ConnectionResponseAdapter.java
@@ -41,7 +41,7 @@ public class ConnectionResponseAdapter extends 
XmlAdapter<AdaptedConnectionRespo
     @Override
     public ConnectionResponse unmarshal(final AdaptedConnectionResponse aCr) {
         if (aCr.shouldTryLater()) {
-            return new ConnectionResponse(aCr.getTryLaterSeconds());
+            return new ConnectionResponse(aCr.getTryLaterSeconds(), 
aCr.getRejectionReason());
         } else if (aCr.getRejectionReason() != null) {
             return 
ConnectionResponse.createRejectionResponse(aCr.getRejectionReason());
         } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
index cbb8b48..e508185 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
@@ -28,7 +28,7 @@ import 
org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 public class HeartbeatResponseMessage extends ProtocolMessage {
 
     private List<NodeConnectionStatus> updatedNodeStatuses = new ArrayList<>();
-
+    private String flowElectionMessage = null;
 
     @Override
     public MessageType getType() {
@@ -42,4 +42,12 @@ public class HeartbeatResponseMessage extends 
ProtocolMessage {
     public void setUpdatedNodeStatuses(final List<NodeConnectionStatus> 
nodeStatuses) {
         this.updatedNodeStatuses = new ArrayList<>(nodeStatuses);
     }
+
+    public String getFlowElectionMessage() {
+        return flowElectionMessage;
+    }
+
+    public void setFlowElectionMessage(String flowElectionMessage) {
+        this.flowElectionMessage = flowElectionMessage;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/FlowElection.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/FlowElection.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/FlowElection.java
new file mode 100644
index 0000000..70e72c5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/FlowElection.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.flow;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * <p>
+ * A FlowElection is responsible for examining multiple versions of a dataflow 
and determining which of
+ * the versions is the "correct" version of the flow.
+ * </p>
+ */
+public interface FlowElection {
+
+    /**
+     * Checks if the election has completed or not.
+     *
+     * @return <code>true</code> if the election has completed, 
<code>false</code> otherwise.
+     */
+    boolean isElectionComplete();
+
+    /**
+     * Returns <code>true</code> if a vote has already been counted for the 
given Node Identifier, <code>false</code> otherwise.
+     *
+     * @param nodeIdentifier the identifier of the node
+     * @return <code>true</code> if a vote has already been counted for the 
given Node Identifier, <code>false</code> otherwise.
+     */
+    boolean isVoteCounted(NodeIdentifier nodeIdentifier);
+
+    /**
+     * If the election has not yet completed, adds the given DataFlow to the 
list of candidates
+     * (if it is not already in the running) and increments the number of 
votes for this DataFlow by 1.
+     * If the election has completed, the given candidate is ignored, and the 
already-elected DataFlow
+     * will be returned. If the election has not yet completed, a vote will be 
cast for the given
+     * candidate and <code>null</code> will be returned, signifying that no 
candidate has yet been chosen.
+     *
+     * @param candidate the DataFlow to vote for and add to the pool of 
candidates if not already present
+     * @param nodeIdentifier the identifier of the node casting the vote
+     *
+     * @return the elected {@link DataFlow}, or <code>null</code> if no 
DataFlow has yet been elected
+     */
+    DataFlow castVote(DataFlow candidate, NodeIdentifier nodeIdentifier);
+
+    /**
+     * Returns the DataFlow that has been elected as the "correct" version of 
the flow, or <code>null</code>
+     * if the election has not yet completed.
+     *
+     * @return the DataFlow that has been elected as the "correct" version of 
the flow, or <code>null</code>
+     *         if the election has not yet completed.
+     */
+    DataFlow getElectedDataFlow();
+
+    /**
+     * Returns a human-readable description of the status of the election
+     *
+     * @return a human-readable description of the status of the election
+     */
+    String getStatusDescription();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java
new file mode 100644
index 0000000..bc730d8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElection.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.flow;
+
+import static java.util.Objects.requireNonNull;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.StandardFlowSynchronizer;
+import org.apache.nifi.fingerprint.FingerprintFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * An implementation of {@link FlowElection} that waits until either a maximum 
amount of time has elapsed
+ * or a maximum number of Data Flows have entered the running to be elected, 
and then elects the 'winner'
+ * based on the number of 'votes' that a particular DataFlow has received. 
This implementation considers
+ * two Flows with the same fingerprint to be the same Flow. If there is a tie 
in the number of votes for
+ * a particular DataFlow, one will be chosen in a non-deterministic manner. If 
multiple DataFlows are
+ * presented with the same fingerprint but different Flows (for instance, the 
position of a component has
+ * changed), one of the Flows with that fingerprint will be chosen in a 
non-deterministic manner.
+ * </p>
+ */
+public class PopularVoteFlowElection implements FlowElection {
+    private static final Logger logger = 
LoggerFactory.getLogger(PopularVoteFlowElection.class);
+
+    private final long maxWaitNanos;
+    private final Integer maxNodes;
+    private final FingerprintFactory fingerprintFactory;
+
+    private volatile Long startNanos = null;
+    private volatile DataFlow electedDataFlow = null;
+
+    private final Map<String, FlowCandidate> candidateByFingerprint = new 
HashMap<>();
+
+    public PopularVoteFlowElection(final long maxWait, final TimeUnit 
maxWaitPeriod, final Integer maxNodes, final FingerprintFactory 
fingerprintFactory) {
+        this.maxWaitNanos = maxWaitPeriod.toNanos(maxWait);
+        if (maxWaitNanos < 1) {
+            throw new IllegalArgumentException("Maximum wait time to elect 
Cluster Flow cannot be less than 1 nanosecond");
+        }
+
+        this.maxNodes = maxNodes;
+        if (maxNodes != null && maxNodes < 1) {
+            throw new IllegalArgumentException("Maximum number of nodes to 
wait on before electing Cluster Flow cannot be less than 1");
+        }
+
+        this.fingerprintFactory = requireNonNull(fingerprintFactory);
+    }
+
+    @Override
+    public synchronized boolean isElectionComplete() {
+        if (electedDataFlow != null) {
+            return true;
+        }
+
+        if (startNanos == null) {
+            return false;
+        }
+
+        final long nanosSinceStart = System.nanoTime() - startNanos;
+        if (nanosSinceStart > maxWaitNanos) {
+            final FlowCandidate elected = performElection();
+            logger.info("Election is complete because the maximum allowed time 
has elapsed. "
+                + "The elected dataflow is held by the following nodes: {}", 
elected.getNodes());
+
+            return true;
+        } else if (maxNodes != null) {
+            final int numVotes = getVoteCount();
+            if (numVotes >= maxNodes) {
+                final FlowCandidate elected = performElection();
+                logger.info("Election is complete because the required number 
of nodes ({}) have voted. "
+                    + "The elected dataflow is held by the following nodes: 
{}", maxNodes, elected.getNodes());
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean isVoteCounted(final NodeIdentifier nodeIdentifier) {
+        return candidateByFingerprint.values().stream()
+            .anyMatch(candidate -> 
candidate.getNodes().contains(nodeIdentifier));
+    }
+
+    private synchronized int getVoteCount() {
+        return candidateByFingerprint.values().stream().mapToInt(candidate -> 
candidate.getVotes()).sum();
+    }
+
+    @Override
+    public synchronized DataFlow castVote(final DataFlow candidate, final 
NodeIdentifier nodeId) {
+        if (candidate == null || isElectionComplete()) {
+            return getElectedDataFlow();
+        }
+
+        final String fingerprint = fingerprint(candidate);
+        final FlowCandidate flowCandidate = 
candidateByFingerprint.computeIfAbsent(fingerprint, key -> new 
FlowCandidate(candidate));
+        final boolean voteCast = flowCandidate.vote(nodeId);
+
+        if (startNanos == null) {
+            startNanos = System.nanoTime();
+        }
+
+        if (voteCast) {
+            logger.info("Vote cast by {}; this flow now has {} votes", nodeId, 
flowCandidate.getVotes());
+        }
+
+        if (isElectionComplete()) {
+            return getElectedDataFlow();
+        }
+
+        return null; // no elected candidate so return null
+    }
+
+    private String fingerprint(final DataFlow dataFlow) {
+        final String flowFingerprint = 
fingerprintFactory.createFingerprint(dataFlow.getFlow());
+        final String authFingerprint = dataFlow.getAuthorizerFingerprint() == 
null ? "" : new String(dataFlow.getAuthorizerFingerprint(), 
StandardCharsets.UTF_8);
+        final String candidateFingerprint = flowFingerprint + authFingerprint;
+
+        return candidateFingerprint;
+    }
+
+    @Override
+    public DataFlow getElectedDataFlow() {
+        return electedDataFlow;
+    }
+
+    private FlowCandidate performElection() {
+        if (candidateByFingerprint.isEmpty()) {
+            return null;
+        }
+
+        final FlowCandidate elected;
+        if (candidateByFingerprint.size() == 1) {
+            elected = candidateByFingerprint.values().iterator().next();
+        } else {
+            elected = candidateByFingerprint.values().stream()
+                .filter(candidate -> !candidate.isFlowEmpty())  // We have 
more than 1 fingerprint. Do not consider empty flows.
+                .max((candidate1, candidate2) -> 
Integer.compare(candidate1.getVotes(), candidate2.getVotes()))
+                .get();
+        }
+
+        this.electedDataFlow = elected.getDataFlow();
+        return elected;
+    }
+
+    @Override
+    public synchronized String getStatusDescription() {
+        if (startNanos == null) {
+            return "No votes have yet been cast.";
+        }
+
+        final StringBuilder descriptionBuilder = new StringBuilder("Election 
will complete in ");
+        final long nanosElapsed = System.nanoTime() - startNanos;
+        final long nanosLeft = maxWaitNanos - nanosElapsed;
+        final long secsLeft = TimeUnit.NANOSECONDS.toSeconds(nanosLeft);
+        if (secsLeft < 1) {
+            descriptionBuilder.append("less than 1 second");
+        } else {
+            descriptionBuilder.append(secsLeft).append(" seconds");
+        }
+
+        if (maxNodes != null) {
+            final int votesNeeded = maxNodes.intValue() - getVoteCount();
+            descriptionBuilder.append(" or after 
").append(votesNeeded).append(" more vote");
+            descriptionBuilder.append(votesNeeded == 1 ? " is " : "s are ");
+            descriptionBuilder.append("cast, whichever occurs first.");
+        }
+
+        return descriptionBuilder.toString();
+    }
+
+    private static class FlowCandidate {
+        private final DataFlow dataFlow;
+        private final AtomicInteger voteCount = new AtomicInteger(0);
+        private final Set<NodeIdentifier> nodeIds = 
Collections.synchronizedSet(new HashSet<>());
+
+        public FlowCandidate(final DataFlow dataFlow) {
+            this.dataFlow = dataFlow;
+        }
+
+        /**
+         * Casts a vote for this candidate for the given node identifier, if a 
vote has not already
+         * been cast for this node identifier
+         *
+         * @param nodeId the node id that is casting the vote
+         * @return <code>true</code> if the vote was case, <code>false</code> 
if this node id has already cast its vote
+         */
+        public boolean vote(final NodeIdentifier nodeId) {
+            if (nodeIds.add(nodeId)) {
+                voteCount.incrementAndGet();
+                return true;
+            }
+
+            return false;
+        }
+
+        public int getVotes() {
+            return voteCount.get();
+        }
+
+        public DataFlow getDataFlow() {
+            return dataFlow;
+        }
+
+        public boolean isFlowEmpty() {
+            return StandardFlowSynchronizer.isEmpty(dataFlow);
+        }
+
+        public Set<NodeIdentifier> getNodes() {
+            return nodeIds;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElectionFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElectionFactoryBean.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElectionFactoryBean.java
new file mode 100644
index 0000000..09d3845
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/flow/PopularVoteFlowElectionFactoryBean.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.flow;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.fingerprint.FingerprintFactory;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.FactoryBean;
+
+public class PopularVoteFlowElectionFactoryBean implements 
FactoryBean<PopularVoteFlowElection> {
+    private static final Logger logger = 
LoggerFactory.getLogger(PopularVoteFlowElectionFactoryBean.class);
+    private NiFiProperties properties;
+
+    @Override
+    public PopularVoteFlowElection getObject() throws Exception {
+        final String maxWaitTime = properties.getFlowElectionMaxWaitTime();
+        long maxWaitMillis;
+        try {
+            maxWaitMillis = FormatUtils.getTimeDuration(maxWaitTime, 
TimeUnit.MILLISECONDS);
+        } catch (final Exception e) {
+            logger.warn("Failed to parse value of property '{}' as a valid 
time period. Value was '{}'. Ignoring this value and using the default value of 
'{}'",
+                NiFiProperties.FLOW_ELECTION_MAX_WAIT_TIME, maxWaitTime, 
NiFiProperties.DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME);
+            maxWaitMillis = 
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_FLOW_ELECTION_MAX_WAIT_TIME, 
TimeUnit.MILLISECONDS);
+        }
+
+        final Integer maxNodes = properties.getFlowElectionMaxCandidates();
+
+        final StringEncryptor encryptor = 
StringEncryptor.createEncryptor(properties);
+        final FingerprintFactory fingerprintFactory = new 
FingerprintFactory(encryptor);
+        return new PopularVoteFlowElection(maxWaitMillis, 
TimeUnit.MILLISECONDS, maxNodes, fingerprintFactory);
+    }
+
+    @Override
+    public Class<?> getObjectType() {
+        return PopularVoteFlowElection.class;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public void setProperties(final NiFiProperties properties) {
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 8b0dc97..5119dac 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -121,7 +121,8 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
      * Visible for testing.
      */
     protected synchronized void monitorHeartbeats() {
-        if (!clusterCoordinator.isActiveClusterCoordinator()) {
+        final NodeIdentifier activeCoordinator = 
clusterCoordinator.getElectedActiveCoordinatorNode();
+        if (activeCoordinator != null && 
!activeCoordinator.equals(clusterCoordinator.getLocalNodeIdentifier())) {
             // Occasionally Curator appears to not notify us that we have lost 
the elected leader role, or does so
             // on a very large delay. So before we kick the node out of the 
cluster, we want to first check what the
             // ZNode in ZooKeeper says, and ensure that this is the node that 
is being advertised as the appropriate

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
index 716610c..9f620d9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -153,11 +153,19 @@ public class ClusterProtocolHeartbeatMonitor extends 
AbstractHeartbeatMonitor im
 
         // Formulate a List of differences between our view of the cluster 
topology and the node's view
         // and send that back to the node so that it is in-sync with us
-        final List<NodeConnectionStatus> nodeStatusList = 
payload.getClusterStatus();
+        List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus();
+        if (nodeStatusList == null) {
+            nodeStatusList = Collections.emptyList();
+        }
         final List<NodeConnectionStatus> updatedStatuses = 
getUpdatedStatuses(nodeStatusList);
 
         final HeartbeatResponseMessage responseMessage = new 
HeartbeatResponseMessage();
         responseMessage.setUpdatedNodeStatuses(updatedStatuses);
+
+        if (!getClusterCoordinator().isFlowElectionComplete()) {
+            
responseMessage.setFlowElectionMessage(getClusterCoordinator().getFlowElectionStatus());
+        }
+
         return responseMessage;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index e50d8fa..a6a6009 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
 import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
 import 
org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
@@ -86,16 +87,18 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     private final NiFiProperties nifiProperties;
     private final LeaderElectionManager leaderElectionManager;
     private final AtomicLong latestUpdateId = new AtomicLong(-1);
+    private final FlowElection flowElection;
 
     private volatile FlowService flowService;
     private volatile boolean connected;
     private volatile boolean closed = false;
+    private volatile boolean requireElection = true;
 
     private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> 
nodeStatuses = new ConcurrentHashMap<>();
     private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> 
nodeEvents = new ConcurrentHashMap<>();
 
     public NodeClusterCoordinator(final 
ClusterCoordinationProtocolSenderListener senderListener, final EventReporter 
eventReporter, final LeaderElectionManager leaderElectionManager,
-            final ClusterNodeFirewall firewall, final RevisionManager 
revisionManager, final NiFiProperties nifiProperties) {
+            final FlowElection flowElection, final ClusterNodeFirewall 
firewall, final RevisionManager revisionManager, final NiFiProperties 
nifiProperties) {
         this.senderListener = senderListener;
         this.flowService = null;
         this.eventReporter = eventReporter;
@@ -103,6 +106,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         this.revisionManager = revisionManager;
         this.nifiProperties = nifiProperties;
         this.leaderElectionManager = leaderElectionManager;
+        this.flowElection = flowElection;
 
         senderListener.addHandler(this);
     }
@@ -115,9 +119,12 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
         closed = true;
 
-        final NodeConnectionStatus shutdownStatus = new 
NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
-        updateNodeStatus(shutdownStatus, false);
-        logger.info("Successfully notified other nodes that I am shutting 
down");
+        final NodeIdentifier localId = getLocalNodeIdentifier();
+        if (localId != null) {
+            final NodeConnectionStatus shutdownStatus = new 
NodeConnectionStatus(localId, DisconnectionCode.NODE_SHUTDOWN);
+            updateNodeStatus(shutdownStatus, false);
+            logger.info("Successfully notified other nodes that I am shutting 
down");
+        }
     }
 
     @Override
@@ -230,6 +237,15 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     @Override
     public void requestNodeConnect(final NodeIdentifier nodeId, final String 
userDn) {
+        if (requireElection && !flowElection.isElectionComplete() && 
flowElection.isVoteCounted(nodeId)) {
+            // If we receive a heartbeat from a node that we already know, we 
don't want to request that it reconnect
+            // to the cluster because no flow has yet been elected. However, 
if the node has not yet voted, we want to send
+            // a reconnect request because we want this node to cast its vote 
for the flow, and this happens on connection
+            logger.debug("Received heartbeat for {} and node is not connected. 
Will not request node connect to cluster, "
+                + "though, because the Flow Election is still in progress", 
nodeId);
+            return;
+        }
+
         if (userDn == null) {
             reportEvent(nodeId, Severity.INFO, "Requesting that node connect 
to cluster");
         } else {
@@ -243,7 +259,11 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         request.setNodeId(nodeId);
         request.setInstanceId(instanceId);
 
-        requestReconnectionAsynchronously(request, 10, 5);
+        // If we still are requiring that an election take place, we do not 
want to include our local dataflow, because we don't
+        // yet know what the cluster's dataflow looks like. However, if we 
don't require election, then we've connected to the
+        // cluster, which means that our flow is correct.
+        final boolean includeDataFlow = !requireElection;
+        requestReconnectionAsynchronously(request, 10, 5, includeDataFlow);
     }
 
     @Override
@@ -652,7 +672,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         disconnectThread.start();
     }
 
-    private void requestReconnectionAsynchronously(final 
ReconnectionRequestMessage request, final int reconnectionAttempts, final int 
retrySeconds) {
+    private void requestReconnectionAsynchronously(final 
ReconnectionRequestMessage request, final int reconnectionAttempts, final int 
retrySeconds, final boolean includeDataFlow) {
         final Thread reconnectionThread = new Thread(new Runnable() {
             @Override
             public void run() {
@@ -675,7 +695,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                             return;
                         }
 
-                        request.setDataFlow(new 
StandardDataFlow(flowService.createDataFlow()));
+                        if (includeDataFlow) {
+                            request.setDataFlow(new 
StandardDataFlow(flowService.createDataFlow()));
+                        }
+
                         
request.setNodeConnectionStatuses(getConnectionStatuses());
                         
request.setComponentRevisions(revisionManager.getAllRevisions().stream().map(rev
 -> ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
 
@@ -726,9 +749,13 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     }
 
     private NodeConnectionStatusResponseMessage 
handleNodeConnectionStatusRequest() {
-        final NodeConnectionStatus connectionStatus = 
nodeStatuses.get(getLocalNodeIdentifier());
         final NodeConnectionStatusResponseMessage msg = new 
NodeConnectionStatusResponseMessage();
-        msg.setNodeConnectionStatus(connectionStatus);
+        final NodeIdentifier self = getLocalNodeIdentifier();
+        if (self != null) {
+            final NodeConnectionStatus connectionStatus = 
nodeStatuses.get(self);
+            msg.setNodeConnectionStatus(connectionStatus);
+        }
+
         return msg;
     }
 
@@ -781,6 +808,20 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         }
     }
 
+    @Override
+    public String getFlowElectionStatus() {
+        if (!requireElection) {
+            return null;
+        }
+
+        return flowElection.getStatusDescription();
+    }
+
+    @Override
+    public boolean isFlowElectionComplete() {
+        return !requireElection || flowElection.isElectionComplete();
+    }
+
     private NodeIdentifier resolveNodeId(final NodeIdentifier 
proposedIdentifier) {
         final NodeConnectionStatus proposedConnectionStatus = new 
NodeConnectionStatus(proposedIdentifier, DisconnectionCode.NOT_YET_CONNECTED);
         final NodeConnectionStatus existingStatus = 
nodeStatuses.putIfAbsent(proposedIdentifier, proposedConnectionStatus);
@@ -808,59 +849,86 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
     private ConnectionResponseMessage handleConnectionRequest(final 
ConnectionRequestMessage requestMessage) {
         final NodeIdentifier proposedIdentifier = 
requestMessage.getConnectionRequest().getProposedNodeIdentifier();
-        final ConnectionRequest requestWithDn = new 
ConnectionRequest(addRequestorDn(proposedIdentifier, 
requestMessage.getRequestorDN()));
+        final NodeIdentifier withRequestorDn = 
addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN());
+        final DataFlow dataFlow = 
requestMessage.getConnectionRequest().getDataFlow();
+        final ConnectionRequest requestWithDn = new 
ConnectionRequest(withRequestorDn, dataFlow);
 
         // Resolve Node identifier.
         final NodeIdentifier resolvedNodeId = 
resolveNodeId(proposedIdentifier);
-        final ConnectionResponse response = 
createConnectionResponse(requestWithDn, resolvedNodeId);
+
+        if (requireElection) {
+            final DataFlow electedDataFlow = flowElection.castVote(dataFlow, 
withRequestorDn);
+            if (electedDataFlow == null) {
+                logger.info("Received Connection Request from {}; responding 
with Flow Election In Progress message", withRequestorDn);
+                return createFlowElectionInProgressResponse();
+            } else {
+                logger.info("Received Connection Request from {}; responding 
with DataFlow that was elected", withRequestorDn);
+                return createConnectionResponse(requestWithDn, resolvedNodeId, 
electedDataFlow);
+            }
+        }
+
+        logger.info("Received Connection Request from {}; responding with my 
DataFlow", withRequestorDn);
+        return createConnectionResponse(requestWithDn, resolvedNodeId);
+    }
+
+    private ConnectionResponseMessage createFlowElectionInProgressResponse() {
         final ConnectionResponseMessage responseMessage = new 
ConnectionResponseMessage();
-        responseMessage.setConnectionResponse(response);
+        final String statusDescription = flowElection.getStatusDescription();
+        responseMessage.setConnectionResponse(new ConnectionResponse(5, 
"Cluster is still voting on which Flow is the correct flow for the cluster. " + 
statusDescription));
         return responseMessage;
     }
 
-    private ConnectionResponse createConnectionResponse(final 
ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier) {
+    private ConnectionResponseMessage createConnectionResponse(final 
ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier) {
+        DataFlow dataFlow = null;
+        if (flowService != null) {
+            try {
+                dataFlow = flowService.createDataFlow();
+            } catch (final IOException ioe) {
+                logger.error("Unable to obtain current dataflow from 
FlowService in order to provide the flow to "
+                    + resolvedNodeIdentifier + ". Will tell node to try again 
later", ioe);
+            }
+        }
+
+        return createConnectionResponse(request, resolvedNodeIdentifier, 
dataFlow);
+    }
+
+
+    private ConnectionResponseMessage createConnectionResponse(final 
ConnectionRequest request, final NodeIdentifier resolvedNodeIdentifier, final 
DataFlow clusterDataFlow) {
         if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
             // if the socket address is not listed in the firewall, then 
return a null response
             logger.info("Firewall blocked connection request from node " + 
resolvedNodeIdentifier);
-            return ConnectionResponse.createBlockedByFirewallResponse();
+            final ConnectionResponse response = 
ConnectionResponse.createBlockedByFirewallResponse();
+            final ConnectionResponseMessage responseMessage = new 
ConnectionResponseMessage();
+            responseMessage.setConnectionResponse(response);
+            return responseMessage;
+        }
+
+        if (clusterDataFlow == null) {
+            final ConnectionResponseMessage responseMessage = new 
ConnectionResponseMessage();
+            responseMessage.setConnectionResponse(new ConnectionResponse(5, 
"The cluster dataflow is not yet available"));
+            return responseMessage;
         }
 
         // Set node's status to 'CONNECTING'
         NodeConnectionStatus status = 
getConnectionStatus(resolvedNodeIdentifier);
         if (status == null) {
-            addNodeEvent(resolvedNodeIdentifier, "Connection requested from 
new node.  Setting status to connecting.");
+            addNodeEvent(resolvedNodeIdentifier, "Connection requested from 
new node. Setting status to connecting.");
         } else {
-            addNodeEvent(resolvedNodeIdentifier, "Connection requested from 
existing node.  Setting status to connecting");
+            addNodeEvent(resolvedNodeIdentifier, "Connection requested from 
existing node. Setting status to connecting.");
         }
 
         status = new NodeConnectionStatus(resolvedNodeIdentifier, 
NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis());
         updateNodeStatus(status);
 
-        DataFlow dataFlow = null;
-        if (flowService != null) {
-            try {
-                dataFlow = flowService.createDataFlow();
-            } catch (final IOException ioe) {
-                logger.error("Unable to obtain current dataflow from 
FlowService in order to provide the flow to "
-                        + resolvedNodeIdentifier + ". Will tell node to try 
again later", ioe);
-            }
-        }
-
-        if (dataFlow == null) {
-            // Create try-later response based on flow retrieval delay to give
-            // the flow management service a chance to retrieve a current flow
-            final int tryAgainSeconds = 5;
-            addNodeEvent(resolvedNodeIdentifier, Severity.WARNING, "Connection 
requested from node, but manager was unable to obtain current flow. "
-                    + "Instructing node to try again in " + tryAgainSeconds + 
" seconds.");
-
-            // return try later response
-            return new ConnectionResponse(tryAgainSeconds);
-        }
-
-        return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, 
instanceId, getConnectionStatuses(),
+        final ConnectionResponse response = new 
ConnectionResponse(resolvedNodeIdentifier, clusterDataFlow, instanceId, 
getConnectionStatuses(),
                 revisionManager.getAllRevisions().stream().map(rev -> 
ComponentRevision.fromRevision(rev)).collect(Collectors.toList()));
+
+        final ConnectionResponseMessage responseMessage = new 
ConnectionResponseMessage();
+        responseMessage.setConnectionResponse(response);
+        return responseMessage;
     }
 
+
     private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final 
String dn) {
         return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), 
nodeId.getApiPort(),
                 nodeId.getSocketAddress(), nodeId.getSocketPort(),
@@ -959,6 +1027,16 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     @Override
     public void setConnected(final boolean connected) {
         this.connected = connected;
+
+        // Once we have connected to the cluster, election is no longer 
required.
+        // It is required only upon startup so that if multiple nodes are 
started up
+        // at the same time, and they have different flows, that we don't 
choose the
+        // wrong flow as the 'golden copy' by electing that node as the elected
+        // active Cluster Coordinator.
+        if (connected) {
+            logger.info("This node is now connected to the cluster. Will no 
longer require election of DataFlow.");
+            requireElection = false;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
index fc52ee1..2845a01 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/NodeClusterCoordinatorFactoryBean.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.cluster.spring;
 
+import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import 
org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
@@ -44,8 +45,9 @@ public class NodeClusterCoordinatorFactoryBean implements 
FactoryBean<NodeCluste
             final ClusterNodeFirewall clusterFirewall = 
applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class);
             final RevisionManager revisionManager = 
applicationContext.getBean("revisionManager", RevisionManager.class);
             final LeaderElectionManager electionManager = 
applicationContext.getBean("leaderElectionManager", 
LeaderElectionManager.class);
+            final FlowElection flowElection = 
applicationContext.getBean("flowElection", FlowElection.class);
 
-            nodeClusterCoordinator = new 
NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, 
clusterFirewall, revisionManager, properties);
+            nodeClusterCoordinator = new 
NodeClusterCoordinator(protocolSenderListener, eventReporter, electionManager, 
flowElection, clusterFirewall, revisionManager, properties);
         }
 
         return nodeClusterCoordinator;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
index 84c9deb..d261590 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -41,6 +41,10 @@
         <property name="properties" ref="nifiProperties" />
     </bean>
 
+    <bean id="flowElection" 
class="org.apache.nifi.cluster.coordination.flow.PopularVoteFlowElectionFactoryBean">
+        <property name="properties" ref="nifiProperties" />
+    </bean>
+
     <!-- Cluster Coordinator -->
     <bean id="clusterCoordinator" 
class="org.apache.nifi.cluster.spring.NodeClusterCoordinatorFactoryBean">
         <property name="properties" ref="nifiProperties"/>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
new file mode 100644
index 0000000..c01371db
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.StandardDataFlow;
+import org.apache.nifi.fingerprint.FingerprintFactory;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestPopularVoteFlowElection {
+
+    @Test
+    public void testOnlyEmptyFlows() throws IOException {
+        final FingerprintFactory fingerprintFactory = 
Mockito.mock(FingerprintFactory.class);
+        
Mockito.when(fingerprintFactory.createFingerprint(Mockito.any(byte[].class))).thenReturn("fingerprint");
+
+        final PopularVoteFlowElection election = new 
PopularVoteFlowElection(1, TimeUnit.MINUTES, 3, fingerprintFactory);
+        final byte[] flow = 
Files.readAllBytes(Paths.get("src/test/resources/conf/empty-flow.xml"));
+
+        assertFalse(election.isElectionComplete());
+        assertNull(election.getElectedDataFlow());
+        assertNull(election.castVote(createDataFlow(flow), createNodeId(1)));
+
+        assertFalse(election.isElectionComplete());
+        assertNull(election.getElectedDataFlow());
+        assertNull(election.castVote(createDataFlow(flow), createNodeId(2)));
+
+        assertFalse(election.isElectionComplete());
+        assertNull(election.getElectedDataFlow());
+
+        final DataFlow electedDataFlow = 
election.castVote(createDataFlow(flow), createNodeId(3));
+        assertNotNull(electedDataFlow);
+
+        assertEquals(new String(flow), new String(electedDataFlow.getFlow()));
+    }
+
+
+    @Test
+    public void testEmptyFlowIgnoredIfNonEmptyFlowExists() throws IOException {
+        final FingerprintFactory fingerprintFactory = 
Mockito.mock(FingerprintFactory.class);
+        
Mockito.when(fingerprintFactory.createFingerprint(Mockito.any(byte[].class))).thenReturn("fingerprint");
+
+        final PopularVoteFlowElection election = new 
PopularVoteFlowElection(1, TimeUnit.MINUTES, 8, fingerprintFactory);
+        final byte[] emptyFlow = 
Files.readAllBytes(Paths.get("src/test/resources/conf/empty-flow.xml"));
+        final byte[] nonEmptyFlow = 
Files.readAllBytes(Paths.get("src/test/resources/conf/non-empty-flow.xml"));
+
+        for (int i = 0; i < 8; i++) {
+            assertFalse(election.isElectionComplete());
+            assertNull(election.getElectedDataFlow());
+
+            final DataFlow dataFlow;
+            if (i % 4 == 0) {
+                dataFlow = createDataFlow(nonEmptyFlow);
+            } else {
+                dataFlow = createDataFlow(emptyFlow);
+            }
+
+            final DataFlow electedDataFlow = election.castVote(dataFlow, 
createNodeId(i));
+            if (i == 7) {
+                assertNotNull(electedDataFlow);
+                assertEquals(new String(nonEmptyFlow), new 
String(electedDataFlow.getFlow()));
+            } else {
+                assertNull(electedDataFlow);
+            }
+        }
+    }
+
+
+    private NodeIdentifier createNodeId(final int index) {
+        return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 
9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + 
index, true);
+    }
+
+    private DataFlow createDataFlow(final byte[] flow) {
+        return new StandardDataFlow(flow, new byte[0], new byte[0]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 634ad41..e83999a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -319,6 +319,16 @@ public class TestAbstractHeartbeatMonitor {
         public boolean resetNodeStatus(NodeConnectionStatus connectionStatus, 
long qualifyingUpdateId) {
             return false;
         }
+
+        @Override
+        public boolean isFlowElectionComplete() {
+            return true;
+        }
+
+        @Override
+        public String getFlowElectionStatus() {
+            return null;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index e55605b..be9b862 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -32,9 +32,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+
+import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import 
org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
+import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import 
org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
@@ -77,7 +80,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        coordinator = new NodeClusterCoordinator(senderListener, 
eventReporter, null, null, revisionManager, createProperties()) {
+        coordinator = new NodeClusterCoordinator(senderListener, 
eventReporter, null, new FirstVoteWinsFlowElection(), null, revisionManager, 
createProperties()) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
                 nodeStatuses.add(updatedStatus);
@@ -132,17 +135,19 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, null, 
revisionManager, createProperties()) {
+        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, new 
FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
         };
 
         final NodeIdentifier requestedNodeId = createNodeId(6);
-        final ConnectionRequest request = new 
ConnectionRequest(requestedNodeId);
+        final ConnectionRequest request = new 
ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new 
byte[0], new byte[0]));
         final ConnectionRequestMessage requestMsg = new 
ConnectionRequestMessage();
         requestMsg.setConnectionRequest(request);
 
+        coordinator.setConnected(true);
+
         final ProtocolMessage protocolResponse = 
coordinator.handle(requestMsg);
         assertNotNull(protocolResponse);
         assertTrue(protocolResponse instanceof ConnectionResponseMessage);
@@ -170,7 +175,7 @@ public class TestNodeClusterCoordinator {
         final RevisionManager revisionManager = 
Mockito.mock(RevisionManager.class);
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
-        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, null, 
revisionManager, createProperties()) {
+        final NodeClusterCoordinator coordinator = new 
NodeClusterCoordinator(senderListener, eventReporter, null, new 
FirstVoteWinsFlowElection(), null, revisionManager, createProperties()) {
             @Override
             void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
             }
@@ -180,6 +185,7 @@ public class TestNodeClusterCoordinator {
         final StandardDataFlow dataFlow = new StandardDataFlow(new byte[50], 
new byte[50], new byte[50]);
         Mockito.when(flowService.createDataFlow()).thenReturn(dataFlow);
         coordinator.setFlowService(flowService);
+        coordinator.setConnected(true);
 
         final NodeIdentifier nodeId = createNodeId(1);
         coordinator.finishNodeConnection(nodeId);
@@ -400,7 +406,7 @@ public class TestNodeClusterCoordinator {
         final NodeIdentifier id1 = new NodeIdentifier("1234", "localhost", 
8000, "localhost", 9000, "localhost", 10000, 11000, false);
         final NodeIdentifier conflictingId = new NodeIdentifier("1234", 
"localhost", 8001, "localhost", 9000, "localhost", 10000, 11000, false);
 
-        final ConnectionRequest connectionRequest = new ConnectionRequest(id1);
+        final ConnectionRequest connectionRequest = new ConnectionRequest(id1, 
new StandardDataFlow(new byte[0], new byte[0], new byte[0]));
         final ConnectionRequestMessage crm = new ConnectionRequestMessage();
         crm.setConnectionRequest(connectionRequest);
 
@@ -411,7 +417,7 @@ public class TestNodeClusterCoordinator {
         final NodeIdentifier resolvedNodeId = 
responseMessage.getConnectionResponse().getNodeIdentifier();
         assertEquals(id1, resolvedNodeId);
 
-        final ConnectionRequest conRequest2 = new 
ConnectionRequest(conflictingId);
+        final ConnectionRequest conRequest2 = new 
ConnectionRequest(conflictingId, new StandardDataFlow(new byte[0], new byte[0], 
new byte[0]));
         final ConnectionRequestMessage crm2 = new ConnectionRequestMessage();
         crm2.setConnectionRequest(conRequest2);
 
@@ -434,10 +440,45 @@ public class TestNodeClusterCoordinator {
     }
 
     private ProtocolMessage requestConnection(final NodeIdentifier 
requestedNodeId, final NodeClusterCoordinator coordinator) {
-        final ConnectionRequest request = new 
ConnectionRequest(requestedNodeId);
+        final ConnectionRequest request = new 
ConnectionRequest(requestedNodeId, new StandardDataFlow(new byte[0], new 
byte[0], new byte[0]));
         final ConnectionRequestMessage requestMsg = new 
ConnectionRequestMessage();
         requestMsg.setConnectionRequest(request);
         return coordinator.handle(requestMsg);
     }
 
+
+    private static class FirstVoteWinsFlowElection implements FlowElection {
+        private DataFlow dataFlow;
+        private NodeIdentifier voter;
+
+        @Override
+        public boolean isElectionComplete() {
+            return dataFlow != null;
+        }
+
+        @Override
+        public synchronized DataFlow castVote(DataFlow candidate, 
NodeIdentifier nodeIdentifier) {
+            if (dataFlow == null) {
+                dataFlow = candidate;
+                voter = nodeIdentifier;
+            }
+
+            return dataFlow;
+        }
+
+        @Override
+        public DataFlow getElectedDataFlow() {
+            return dataFlow;
+        }
+
+        @Override
+        public String getStatusDescription() {
+            return "First Vote Wins";
+        }
+
+        @Override
+        public boolean isVoteCounted(NodeIdentifier nodeIdentifier) {
+            return voter != null && voter.equals(nodeIdentifier);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
index ce2017e..63c3f60 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
@@ -30,7 +30,11 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.test.TestingServer;
+import org.apache.nifi.cluster.coordination.flow.FlowElection;
+import org.apache.nifi.cluster.coordination.flow.PopularVoteFlowElection;
 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.fingerprint.FingerprintFactory;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +45,22 @@ public class Cluster {
     private final Set<Node> nodes = new HashSet<>();
     private final TestingServer zookeeperServer;
 
+    private final long flowElectionTimeoutMillis;
+    private final Integer flowElectionMaxNodes;
+
     public Cluster() throws IOException {
+        this(3, TimeUnit.SECONDS, 3);
+    }
+
+    public Cluster(final long flowElectionTimeout, final TimeUnit 
flowElectionTimeUnit, final Integer flowElectionMaxNodes) throws IOException {
         try {
             zookeeperServer = new TestingServer();
         } catch (final Exception e) {
             throw new RuntimeException(e);
         }
+
+        this.flowElectionTimeoutMillis = 
flowElectionTimeUnit.toMillis(flowElectionTimeout);
+        this.flowElectionMaxNodes = flowElectionMaxNodes;
     }
 
 
@@ -116,7 +130,11 @@ public class Cluster {
         addProps.put(NiFiProperties.CLUSTER_IS_NODE, "true");
 
         final NiFiProperties nifiProperties = 
NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties",
 addProps);
-        final Node node = new Node(nifiProperties);
+
+        final FingerprintFactory fingerprintFactory = new 
FingerprintFactory(StringEncryptor.createEncryptor(nifiProperties));
+        final FlowElection flowElection = new 
PopularVoteFlowElection(flowElectionTimeoutMillis, TimeUnit.MILLISECONDS, 
flowElectionMaxNodes, fingerprintFactory);
+
+        final Node node = new Node(nifiProperties, flowElection);
         node.start();
         nodes.add(node);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 7ba718b..93c9397 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -28,9 +28,11 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.cluster.ReportedEvent;
+import org.apache.nifi.cluster.coordination.flow.FlowElection;
 import 
org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
 import 
org.apache.nifi.cluster.coordination.node.LeaderElectionNodeProtocolSender;
@@ -74,6 +76,7 @@ public class Node {
 
     private final List<ReportedEvent> reportedEvents = 
Collections.synchronizedList(new ArrayList<ReportedEvent>());
     private final RevisionManager revisionManager;
+    private final FlowElection flowElection;
 
     private NodeClusterCoordinator clusterCoordinator;
     private NodeProtocolSender protocolSender;
@@ -88,11 +91,11 @@ public class Node {
     private ScheduledExecutorService executor = new FlowEngine(8, "Node 
tasks", true);
 
 
-    public Node(final NiFiProperties properties) {
-        this(createNodeId(), properties);
+    public Node(final NiFiProperties properties, final FlowElection 
flowElection) {
+        this(createNodeId(), properties, flowElection);
     }
 
-    public Node(final NodeIdentifier nodeId, final NiFiProperties properties) {
+    public Node(final NodeIdentifier nodeId, final NiFiProperties properties, 
final FlowElection flowElection) {
         this.nodeId = nodeId;
         this.nodeProperties = new NiFiProperties() {
             @Override
@@ -119,6 +122,7 @@ public class Node {
         
Mockito.when(revisionManager.getAllRevisions()).thenReturn(Collections.emptyList());
 
         electionManager = new CuratorLeaderElectionManager(4, nodeProperties);
+        this.flowElection = flowElection;
     }
 
 
@@ -132,7 +136,7 @@ public class Node {
         protocolSender = createNodeProtocolSender();
         clusterCoordinator = createClusterCoordinator();
         clusterCoordinator.setLocalNodeIdentifier(nodeId);
-        clusterCoordinator.setConnected(true);
+        //        clusterCoordinator.setConnected(true);
 
         final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor();
         flowController = 
FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class),
 nodeProperties,
@@ -273,7 +277,7 @@ public class Node {
         }
 
         final ClusterCoordinationProtocolSenderListener protocolSenderListener 
= new 
ClusterCoordinationProtocolSenderListener(createCoordinatorProtocolSender(), 
protocolListener);
-        return new NodeClusterCoordinator(protocolSenderListener, 
eventReporter, electionManager, null, revisionManager, nodeProperties);
+        return new NodeClusterCoordinator(protocolSenderListener, 
eventReporter, electionManager, flowElection, null, revisionManager, 
nodeProperties);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7e76cc0/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/empty-flow.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/empty-flow.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/empty-flow.xml
new file mode 100644
index 0000000..c0cb6de
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/empty-flow.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+        http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<flowController encoding-version="1.0">
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>00000000-0000-0000-0000-000000000000</id>
+    <name>Empty NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+  </rootGroup>
+  <controllerServices/>
+  <reportingTasks/>
+</flowController>
\ No newline at end of file

Reply via email to