Repository: nifi
Updated Branches:
  refs/heads/master 69586d8bd -> 52bc23f5d


NIFI-2316, NIFI-2318: Ensure that we do not save the flow before initializing 
the Run Status of components. Clarify the Node Event messages

This closes #678

Signed-off-by: jpercivall <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/52bc23f5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/52bc23f5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/52bc23f5

Branch: refs/heads/master
Commit: 52bc23f5dbcc3a236df1fa88744c277e5d60e244
Parents: 69586d8
Author: Mark Payne <[email protected]>
Authored: Tue Jul 19 14:49:38 2016 -0400
Committer: jpercivall <[email protected]>
Committed: Tue Jul 26 14:24:32 2016 -0400

----------------------------------------------------------------------
 .../node/NodeClusterCoordinator.java            | 60 +++++++++++++++---
 .../apache/nifi/controller/FlowController.java  | 26 +++-----
 .../nifi/controller/StandardFlowService.java    | 64 ++++++++++----------
 .../controller/StandardFlowSynchronizer.java    |  4 +-
 .../serialization/FlowFromDOMFactory.java       |  8 ++-
 .../StandardXMLFlowConfigurationDAO.java        | 12 +++-
 6 files changed, 113 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index b31530f..514d928 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -35,6 +35,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -770,6 +771,55 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         }
     }
 
+    private String summarizeStatusChange(final NodeConnectionStatus oldStatus, 
final NodeConnectionStatus status) {
+        final StringBuilder sb = new StringBuilder();
+
+        if (oldStatus != null && status.getState() == oldStatus.getState()) {
+            // Check if roles changed
+            final Set<String> oldRoles = oldStatus.getRoles();
+            final Set<String> newRoles = status.getRoles();
+
+            final Set<String> rolesRemoved = new HashSet<>(oldRoles);
+            rolesRemoved.removeAll(newRoles);
+
+            final Set<String> rolesAdded = new HashSet<>(newRoles);
+            rolesAdded.removeAll(oldRoles);
+
+            if (!rolesRemoved.isEmpty()) {
+                sb.append("Relinquished role");
+                if (rolesRemoved.size() != 1) {
+                    sb.append("s");
+                }
+
+                sb.append(" ").append(rolesRemoved);
+            }
+
+            if (!rolesAdded.isEmpty()) {
+                if (sb.length() > 0) {
+                    sb.append("; ");
+                }
+
+                sb.append("Acquired role");
+                if (rolesAdded.size() != 1) {
+                    sb.append("s");
+                }
+
+                sb.append(" ").append(rolesAdded);
+            }
+        } else {
+            sb.append("Node Status changed from ").append(oldStatus == null ? 
"[Unknown Node]" : oldStatus.getState().toString()).append(" to 
").append(status.getState().toString());
+            if (status.getState() == NodeConnectionState.CONNECTED) {
+                sb.append(" 
(Roles=").append(status.getRoles().toString()).append(")");
+            } else if (status.getDisconnectReason() != null) {
+                sb.append(" due to ").append(status.getDisconnectReason());
+            } else if (status.getDisconnectCode() != null) {
+                sb.append(" due to 
").append(status.getDisconnectCode().toString());
+            }
+        }
+
+        return sb.toString();
+    }
+
     private void handleNodeStatusChange(final NodeStatusChangeMessage 
statusChangeMessage) {
         final NodeConnectionStatus updatedStatus = 
statusChangeMessage.getNodeConnectionStatus();
         final NodeIdentifier nodeId = statusChangeMessage.getNodeId();
@@ -790,14 +840,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
                     logger.info("Status of {} changed from {} to {}", 
statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
 
                     final NodeConnectionStatus status = 
statusChangeMessage.getNodeConnectionStatus();
-                    final StringBuilder sb = new StringBuilder();
-                    sb.append("Connection Status changed to 
").append(status.getState().toString());
-                    if (status.getDisconnectReason() != null) {
-                        sb.append(" due to 
").append(status.getDisconnectReason());
-                    } else if (status.getDisconnectCode() != null) {
-                        sb.append(" due to 
").append(status.getDisconnectCode().toString());
+                    final String summary = summarizeStatusChange(oldStatus, 
status);
+                    if (!StringUtils.isEmpty(summary)) {
+                        addNodeEvent(nodeId, summary);
                     }
-                    addNodeEvent(nodeId, sb.toString());
 
                     // Update our counter so that we are in-sync with the 
cluster on the
                     // most up-to-date version of the NodeConnectionStatus' 
Update Identifier.

http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 5de77f6..2649895 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -565,7 +565,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
-        heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, new 
NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)));
+        this.connectionStatus = new NodeConnectionStatus(nodeId, 
DisconnectionCode.NOT_YET_CONNECTED);
+        heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
 
         if (configuredForClustering) {
             leaderElectionManager = new CuratorLeaderElectionManager(4);
@@ -1459,7 +1460,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, 
isPrimary(), connectionStatus));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, 
isPrimary()));
         } finally {
             writeLock.unlock();
         }
@@ -3349,7 +3350,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, 
isPrimary(), connectionStatus));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, 
isPrimary()));
         } finally {
             writeLock.unlock();
         }
@@ -3386,7 +3387,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         eventDrivenWorkerQueue.setPrimary(primary);
 
         // update the heartbeat bean
-        final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new 
HeartbeatBean(rootGroup, primary, connectionStatus));
+        final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new 
HeartbeatBean(rootGroup, primary));
 
         // Emit a bulletin detailing the fact that the primary node state has 
changed
         if (oldBean == null || oldBean.isPrimary() != primary) {
@@ -3754,7 +3755,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     public boolean isConnected() {
         rwLock.readLock().lock();
         try {
-            return connectionStatus.getState() == 
NodeConnectionState.CONNECTED;
+            return connectionStatus != null && connectionStatus.getState() == 
NodeConnectionState.CONNECTED;
         } finally {
             rwLock.readLock().unlock();
         }
@@ -3766,7 +3767,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             this.connectionStatus = connectionStatus;
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, 
isPrimary(), connectionStatus));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, 
isPrimary()));
         } finally {
             rwLock.writeLock().unlock();
         }
@@ -3837,8 +3838,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             if (bean == null) {
                 readLock.lock();
                 try {
-                    final NodeConnectionStatus connectionStatus = new 
NodeConnectionStatus(getNodeId(), DisconnectionCode.NOT_YET_CONNECTED);
-                    bean = new HeartbeatBean(getGroup(getRootGroupId()), 
isPrimary(), connectionStatus);
+                    bean = new HeartbeatBean(getGroup(getRootGroupId()), 
isPrimary());
                 } finally {
                     readLock.unlock();
                 }
@@ -3868,7 +3868,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 roles.add(ClusterRoles.CLUSTER_COORDINATOR);
             }
 
-            final Heartbeat heartbeat = new Heartbeat(nodeId, roles, 
bean.getConnectionStatus(), hbPayload.marshal());
+            final Heartbeat heartbeat = new Heartbeat(nodeId, roles, 
connectionStatus, hbPayload.marshal());
             final HeartbeatMessage message = new HeartbeatMessage();
             message.setHeartbeat(heartbeat);
 
@@ -4002,12 +4002,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private static class HeartbeatBean {
         private final ProcessGroup rootGroup;
         private final boolean primary;
-        private final NodeConnectionStatus connectionStatus;
 
-        public HeartbeatBean(final ProcessGroup rootGroup, final boolean 
primary, final NodeConnectionStatus connectionStatus) {
+        public HeartbeatBean(final ProcessGroup rootGroup, final boolean 
primary) {
             this.rootGroup = rootGroup;
             this.primary = primary;
-            this.connectionStatus = connectionStatus;
         }
 
         public ProcessGroup getRootGroup() {
@@ -4017,9 +4015,5 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         public boolean isPrimary() {
             return primary;
         }
-
-        public NodeConnectionStatus getConnectionStatus() {
-            return connectionStatus;
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 4f286eb..71d66b5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -16,6 +16,37 @@
  */
 package org.apache.nifi.controller;
 
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
 import org.apache.nifi.authorization.Authorizer;
@@ -63,37 +94,6 @@ import org.apache.nifi.web.revision.RevisionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
 public class StandardFlowService implements FlowService, ProtocolHandler {
 
     private static final String EVENT_CATEGORY = "Controller";
@@ -444,7 +444,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
              * the response will be null and we should load the local dataflow
              * and heartbeat until a manager is located.
              */
-            final boolean localFlowEmpty = 
StandardFlowSynchronizer.isEmpty(proposedFlow, encryptor);
+            final boolean localFlowEmpty = 
StandardFlowSynchronizer.isEmpty(proposedFlow);
             final ConnectionResponse response = connect(localFlowEmpty, 
localFlowEmpty);
 
             // obtain write lock while we are updating the controller. We need 
to ensure that we don't

http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index d6dee2c..bc28380 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -124,7 +124,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         autoResumeState = NiFiProperties.getInstance().getAutoResumeState();
     }
 
-    public static boolean isEmpty(final DataFlow dataFlow, final 
StringEncryptor encryptor) {
+    public static boolean isEmpty(final DataFlow dataFlow) {
         if (dataFlow == null || dataFlow.getFlow() == null || 
dataFlow.getFlow().length == 0) {
             return true;
         }
@@ -135,7 +135,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
         final Element rootGroupElement = (Element) 
rootElement.getElementsByTagName("rootGroup").item(0);
         final FlowEncodingVersion encodingVersion = 
FlowEncodingVersion.parse(rootGroupElement);
 
-        final ProcessGroupDTO rootGroupDto = 
FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, 
encodingVersion);
+        final ProcessGroupDTO rootGroupDto = 
FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, null, 
encodingVersion);
         return isEmpty(rootGroupDto);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index 1409df4..2c51e96 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -260,7 +260,9 @@ public class FlowFromDOMFactory {
         dto.setProxyHost(getString(element, "proxyHost"));
         dto.setProxyPort(getOptionalInt(element, "proxyPort"));
         dto.setProxyUser(getString(element, "proxyUser"));
-        String proxyPassword = decrypt(getString(element, "proxyPassword"), 
encryptor);
+
+        final String rawPassword = getString(element, "proxyPassword");
+        final String proxyPassword = encryptor == null ? rawPassword : 
decrypt(rawPassword, encryptor);
         dto.setProxyPassword(proxyPassword);
 
         return dto;
@@ -395,7 +397,9 @@ public class FlowFromDOMFactory {
         final List<Element> propertyNodeList = getChildrenByTagName(element, 
"property");
         for (final Element propertyElement : propertyNodeList) {
             final String name = getString(propertyElement, "name");
-            final String value = decrypt(getString(propertyElement, "value"), 
encryptor);
+
+            final String rawPropertyValue = getString(propertyElement, 
"value");
+            final String value = encryptor == null ? rawPropertyValue : 
decrypt(rawPropertyValue, encryptor);
             properties.put(name, value);
         }
         return properties;

http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index 8b0d18f..f73dce5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -80,8 +80,16 @@ public final class StandardXMLFlowConfigurationDAO 
implements FlowConfigurationD
         final FlowSynchronizer flowSynchronizer = new 
StandardFlowSynchronizer(encryptor);
         controller.synchronize(flowSynchronizer, dataFlow);
 
-        // save based on the controller, not the provided data flow because 
Process Groups may contain 'local' templates.
-        save(controller);
+        if (StandardFlowSynchronizer.isEmpty(dataFlow)) {
+            // If the dataflow is empty, we want to save it. We do this 
because when we start up a brand new cluster with no
+            // dataflow, we need to ensure that the flow is consistent across 
all nodes in the cluster and that upon restart
+            // of NiFi, the root group ID does not change. However, we don't 
always want to save it, because if the flow is
+            // not empty, then we can get into a bad situation, since the 
Processors, etc. don't have the appropriate "Scheduled
+            // State" yet (since they haven't yet been scheduled). So if there 
are components in the flow and we save it, we
+            // may end up saving the flow in such a way that all components 
are stopped.
+            // We save based on the controller, not the provided data flow 
because Process Groups may contain 'local' templates.
+            save(controller);
+        }
     }
 
     @Override

Reply via email to