Repository: nifi
Updated Branches:
  refs/heads/master 13b59b562 -> 5cb928131


NIFI-3981: When serializing flow to cluster, use the Scheduled State of ports 
as they are configured to be, not the current state, since the current state 
may change as soon as the FlowController has finished initializing

This closes #1861.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5cb92813
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5cb92813
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5cb92813

Branch: refs/heads/master
Commit: 5cb928131ca418c67827a536b02a046b34d14c7f
Parents: 13b59b5
Author: Mark Payne <[email protected]>
Authored: Thu May 25 15:31:43 2017 -0400
Committer: Bryan Bende <[email protected]>
Committed: Thu May 25 16:45:20 2017 -0400

----------------------------------------------------------------------
 .../apache/nifi/controller/FlowController.java  | 51 +++++++++++++++++++-
 .../controller/StandardFlowSynchronizer.java    | 12 ++---
 .../serialization/ScheduledStateLookup.java     | 15 +++++-
 .../serialization/StandardFlowSerializer.java   | 16 +++---
 4 files changed, 78 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5cb92813/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 34ea266..aef6d46 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1518,7 +1518,29 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     public void serialize(final FlowSerializer serializer, final OutputStream 
os) throws FlowSerializationException {
         readLock.lock();
         try {
-            final ScheduledStateLookup scheduledStateLookup = procNode -> 
startConnectablesAfterInitialization.contains(procNode) ? 
ScheduledState.RUNNING : procNode.getScheduledState();
+            final ScheduledStateLookup scheduledStateLookup = new 
ScheduledStateLookup() {
+                @Override
+                public ScheduledState getScheduledState(final ProcessorNode 
procNode) {
+                    if 
(startConnectablesAfterInitialization.contains(procNode)) {
+                        return ScheduledState.RUNNING;
+                    }
+
+                    return procNode.getScheduledState();
+                }
+
+                @Override
+                public ScheduledState getScheduledState(final Port port) {
+                    if (startConnectablesAfterInitialization.contains(port)) {
+                        return ScheduledState.RUNNING;
+                    }
+                    if 
(startRemoteGroupPortsAfterInitialization.contains(port)) {
+                        return ScheduledState.RUNNING;
+                    }
+
+                    return port.getScheduledState();
+                }
+            };
+
             serializer.serialize(this, os, scheduledStateLookup);
         } finally {
             readLock.unlock();
@@ -2922,6 +2944,33 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
+    public void stopConnectable(final Connectable connectable) {
+        final ProcessGroup group = 
requireNonNull(connectable).getProcessGroup();
+
+        writeLock.lock();
+        try {
+            switch (requireNonNull(connectable).getConnectableType()) {
+                case FUNNEL:
+                    // Ignore. We don't support stopping funnels.
+                    break;
+                case INPUT_PORT:
+                case REMOTE_INPUT_PORT:
+                    startConnectablesAfterInitialization.remove(connectable);
+                    group.stopInputPort((Port) connectable);
+                    break;
+                case OUTPUT_PORT:
+                case REMOTE_OUTPUT_PORT:
+                    startConnectablesAfterInitialization.remove(connectable);
+                    group.stopOutputPort((Port) connectable);
+                    break;
+                default:
+                    throw new IllegalArgumentException();
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     public void startTransmitting(final RemoteGroupPort remoteGroupPort) {
         writeLock.lock();
         try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cb92813/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 975f954..6f1e8e1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -774,19 +774,19 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                     case DISABLED:
                         // switch processor do disabled. This means we have to 
stop it (if it's already stopped, this method does nothing),
                         // and then we have to disable it.
-                        port.getProcessGroup().stopInputPort(port);
+                        controller.stopConnectable(port);
                         port.getProcessGroup().disableInputPort(port);
                         break;
                     case RUNNING:
                         // we want to run now. Make sure processor is not 
disabled and then start it.
                         port.getProcessGroup().enableInputPort(port);
-                        port.getProcessGroup().startInputPort(port);
+                        controller.startConnectable(port);
                         break;
                     case STOPPED:
                         if (port.getScheduledState() == 
ScheduledState.DISABLED) {
                             port.getProcessGroup().enableInputPort(port);
                         } else if (port.getScheduledState() == 
ScheduledState.RUNNING) {
-                            port.getProcessGroup().stopInputPort(port);
+                            controller.stopConnectable(port);
                         }
                         break;
                 }
@@ -803,19 +803,19 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                     case DISABLED:
                         // switch processor do disabled. This means we have to 
stop it (if it's already stopped, this method does nothing),
                         // and then we have to disable it.
-                        port.getProcessGroup().stopOutputPort(port);
+                        controller.stopConnectable(port);
                         port.getProcessGroup().disableOutputPort(port);
                         break;
                     case RUNNING:
                         // we want to run now. Make sure processor is not 
disabled and then start it.
                         port.getProcessGroup().enableOutputPort(port);
-                        port.getProcessGroup().startOutputPort(port);
+                        controller.startConnectable(port);
                         break;
                     case STOPPED:
                         if (port.getScheduledState() == 
ScheduledState.DISABLED) {
                             port.getProcessGroup().enableOutputPort(port);
                         } else if (port.getScheduledState() == 
ScheduledState.RUNNING) {
-                            port.getProcessGroup().stopOutputPort(port);
+                            controller.stopConnectable(port);
                         }
                         break;
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cb92813/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
index 07f6017..39693b8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.controller.serialization;
 
+import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
 
@@ -24,5 +25,17 @@ public interface ScheduledStateLookup {
 
     ScheduledState getScheduledState(ProcessorNode procNode);
 
-    public static final ScheduledStateLookup IDENTITY_LOOKUP = 
ProcessorNode::getScheduledState;
+    ScheduledState getScheduledState(Port port);
+
+    public static final ScheduledStateLookup IDENTITY_LOOKUP = new 
ScheduledStateLookup() {
+        @Override
+        public ScheduledState getScheduledState(final ProcessorNode procNode) {
+            return procNode.getScheduledState();
+        }
+
+        @Override
+        public ScheduledState getScheduledState(final Port port) {
+            return port.getScheduledState();
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cb92813/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index fea1ecb..702932c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -159,19 +159,19 @@ public class StandardFlowSerializer implements 
FlowSerializer {
 
         if (group.isRootGroup()) {
             for (final Port port : group.getInputPorts()) {
-                addRootGroupPort(element, (RootGroupPort) port, "inputPort");
+                addRootGroupPort(element, (RootGroupPort) port, "inputPort", 
scheduledStateLookup);
             }
 
             for (final Port port : group.getOutputPorts()) {
-                addRootGroupPort(element, (RootGroupPort) port, "outputPort");
+                addRootGroupPort(element, (RootGroupPort) port, "outputPort", 
scheduledStateLookup);
             }
         } else {
             for (final Port port : group.getInputPorts()) {
-                addPort(element, port, "inputPort");
+                addPort(element, port, "inputPort", scheduledStateLookup);
             }
 
             for (final Port port : group.getOutputPorts()) {
-                addPort(element, port, "outputPort");
+                addPort(element, port, "outputPort", scheduledStateLookup);
             }
         }
 
@@ -330,7 +330,7 @@ public class StandardFlowSerializer implements 
FlowSerializer {
         parentElement.appendChild(element);
     }
 
-    private void addPort(final Element parentElement, final Port port, final 
String elementName) {
+    private void addPort(final Element parentElement, final Port port, final 
String elementName, final ScheduledStateLookup scheduledStateLookup) {
         final Document doc = parentElement.getOwnerDocument();
         final Element element = doc.createElement(elementName);
         parentElement.appendChild(element);
@@ -338,12 +338,12 @@ public class StandardFlowSerializer implements 
FlowSerializer {
         addTextElement(element, "name", port.getName());
         addPosition(element, port.getPosition());
         addTextElement(element, "comments", port.getComments());
-        addTextElement(element, "scheduledState", 
port.getScheduledState().name());
+        addTextElement(element, "scheduledState", 
scheduledStateLookup.getScheduledState(port).name());
 
         parentElement.appendChild(element);
     }
 
-    private void addRootGroupPort(final Element parentElement, final 
RootGroupPort port, final String elementName) {
+    private void addRootGroupPort(final Element parentElement, final 
RootGroupPort port, final String elementName, final ScheduledStateLookup 
scheduledStateLookup) {
         final Document doc = parentElement.getOwnerDocument();
         final Element element = doc.createElement(elementName);
         parentElement.appendChild(element);
@@ -351,7 +351,7 @@ public class StandardFlowSerializer implements 
FlowSerializer {
         addTextElement(element, "name", port.getName());
         addPosition(element, port.getPosition());
         addTextElement(element, "comments", port.getComments());
-        addTextElement(element, "scheduledState", 
port.getScheduledState().name());
+        addTextElement(element, "scheduledState", 
scheduledStateLookup.getScheduledState(port).name());
         addTextElement(element, "maxConcurrentTasks", 
String.valueOf(port.getMaxConcurrentTasks()));
         for (final String user : port.getUserAccessControl()) {
             addTextElement(element, "userAccessControl", user);

Reply via email to