Repository: incubator-nifi
Updated Branches:
  refs/heads/develop a6740a6e2 -> 93b361e69


NIFI-443: Always start funnels when added to process group, even when 
autoResumeState is false


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8b911c5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8b911c5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8b911c5a

Branch: refs/heads/develop
Commit: 8b911c5aab2a4b8283510a3423e3c8962a533b96
Parents: eb5ec70
Author: Mark Payne <[email protected]>
Authored: Thu Mar 19 11:04:46 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Mar 19 11:04:46 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/groups/ProcessGroup.java    | 38 +++++------
 .../apache/nifi/controller/FlowController.java  | 71 ++++++++++++--------
 .../nifi/controller/StandardFlowService.java    | 21 +++---
 .../controller/StandardFlowSynchronizer.java    |  6 +-
 .../nifi/groups/StandardProcessGroup.java       | 55 +++------------
 .../ApplicationStartupContextListener.java      |  6 +-
 6 files changed, 89 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8b911c5a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 61be59c..53b26e1 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -118,7 +118,7 @@ public interface ProcessGroup {
     void stopProcessing();
 
     /**
-     * Starts the given Processor
+     * Enables the given Processor
      *
      * @param processor the processor to start
      * @throws IllegalStateException if the processor is not valid, or is
@@ -127,25 +127,19 @@ public interface ProcessGroup {
     void enableProcessor(ProcessorNode processor);
 
     /**
-     * Starts the given Input Port
+     * Enables the given Input Port
      *
      * @param port
      */
     void enableInputPort(Port port);
 
     /**
-     * Starts the given Output Port
+     * Enables the given Output Port
      *
      * @param port
      */
     void enableOutputPort(Port port);
 
-    /**
-     * Starts the given Funnel
-     *
-     * @param funnel
-     */
-    void enableFunnel(Funnel funnel);
 
     /**
      * Starts the given Processor
@@ -206,7 +200,7 @@ public interface ProcessGroup {
     void stopFunnel(Funnel funnel);
 
     /**
-     * Starts the given Processor
+     * Disables the given Processor
      *
      * @param processor the processor to start
      * @throws IllegalStateException if the processor is not valid, or is
@@ -215,25 +209,19 @@ public interface ProcessGroup {
     void disableProcessor(ProcessorNode processor);
 
     /**
-     * Starts the given Input Port
+     * Disables the given Input Port
      *
      * @param port
      */
     void disableInputPort(Port port);
 
     /**
-     * Starts the given Output Port
+     * Disables the given Output Port
      *
      * @param port
      */
     void disableOutputPort(Port port);
 
-    /**
-     * Starts the given Funnel
-     *
-     * @param funnel
-     */
-    void disableFunnel(Funnel funnel);
 
     /**
      * Indicates that the Flow is being shutdown; allows cleanup of resources
@@ -618,11 +606,23 @@ public interface ProcessGroup {
     Port getOutputPortByName(String name);
 
     /**
-     * Adds the given funnel to this ProcessGroup
+     * Adds the given funnel to this ProcessGroup and starts it. While other 
components
+     * do not automatically start, the funnel does by default because it is 
intended to be
+     * more of a notional component that users are unable to explicitly start 
and stop.
+     * However, there is an override available in {@link #addFunnel(Funnel, 
boolean)} because
+     * we may need to avoid starting the funnel on restart until the flow is 
completely
+     * initialized.
      *
      * @param funnel
      */
     void addFunnel(Funnel funnel);
+    
+    /**
+     * Adds the given funnel to this ProcessGroup and optionally starts the 
funnel.
+     * @param funnel
+     * @param autoStart
+     */
+    void addFunnel(Funnel funnel, boolean autoStart);
 
     /**
      * Returns a Set of all Funnels that belong to this ProcessGroup

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8b911c5a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 54f0807..06ef203 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -576,40 +576,57 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, H
      * flag of true to now start
      * </p>
      */
-    public void startDelayed() {
+    public void onFlowInitialized(final boolean startDelayedComponents) {
         writeLock.lock();
         try {
-            LOG.info("Starting {} processors/ports/funnels", 
(startConnectablesAfterInitialization.size() + 
startRemoteGroupPortsAfterInitialization.size()));
-            for (final Connectable connectable : 
startConnectablesAfterInitialization) {
-                if (connectable.getScheduledState() == 
ScheduledState.DISABLED) {
-                    continue;
+            if ( startDelayedComponents ) {
+                LOG.info("Starting {} processors/ports/funnels", 
(startConnectablesAfterInitialization.size() + 
startRemoteGroupPortsAfterInitialization.size()));
+                for (final Connectable connectable : 
startConnectablesAfterInitialization) {
+                    if (connectable.getScheduledState() == 
ScheduledState.DISABLED) {
+                        continue;
+                    }
+    
+                    try {
+                        if (connectable instanceof ProcessorNode) {
+                            
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
+                        } else {
+                            startConnectable(connectable);
+                        }
+                    } catch (final Throwable t) {
+                        LOG.error("Unable to start {} due to {}", new 
Object[]{connectable, t});
+                    }
                 }
-
-                try {
-                    if (connectable instanceof ProcessorNode) {
-                        
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
-                    } else {
-                        startConnectable(connectable);
+    
+                startConnectablesAfterInitialization.clear();
+    
+                int startedTransmitting = 0;
+                for (final RemoteGroupPort remoteGroupPort : 
startRemoteGroupPortsAfterInitialization) {
+                    try {
+                        
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
+                        startedTransmitting++;
+                    } catch (final Throwable t) {
+                        LOG.error("Unable to start transmitting with {} due to 
{}", new Object[]{remoteGroupPort, t});
                     }
-                } catch (final Throwable t) {
-                    LOG.error("Unable to start {} due to {}", new 
Object[]{connectable, t});
                 }
-            }
-
-            startConnectablesAfterInitialization.clear();
-
-            int startedTransmitting = 0;
-            for (final RemoteGroupPort remoteGroupPort : 
startRemoteGroupPortsAfterInitialization) {
-                try {
-                    
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
-                    startedTransmitting++;
-                } catch (final Throwable t) {
-                    LOG.error("Unable to start transmitting with {} due to 
{}", new Object[]{remoteGroupPort, t});
+    
+                LOG.info("Started {} Remote Group Ports transmitting", 
startedTransmitting);
+                startRemoteGroupPortsAfterInitialization.clear();
+            } else {
+                // We don't want to start all of the delayed components. 
However, funnels need to be started anyway
+                // because we don't provide users the ability to start or stop 
them - they are just notional.
+                for (final Connectable connectable : 
startConnectablesAfterInitialization) {
+                    try {
+                        if (connectable instanceof Funnel) {
+                            startConnectable(connectable);
+                        }
+                    } catch (final Throwable t) {
+                        LOG.error("Unable to start {} due to {}", new 
Object[]{connectable, t});
+                    }
                 }
+                
+                startConnectablesAfterInitialization.clear();
+                startRemoteGroupPortsAfterInitialization.clear();
             }
-
-            LOG.info("Started {} Remote Group Ports transmitting", 
startedTransmitting);
-            startRemoteGroupPortsAfterInitialization.clear();
         } finally {
             writeLock.unlock();
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8b911c5a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index d459b00..64ce5c4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -423,18 +423,15 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                      */
                     controller.startHeartbeating();
 
-                    // if configured, start all components
-                    if (autoResumeState) {
-                        try {
-                            controller.startDelayed();
-                        } catch (final Exception ex) {
-                            logger.warn("Unable to start all processors due to 
invalid flow configuration.");
-                            if (logger.isDebugEnabled()) {
-                                logger.warn(StringUtils.EMPTY, ex);
-                            }
+                    // notify controller that flow is initialized
+                    try {
+                        controller.onFlowInitialized(autoResumeState);
+                    } catch (final Exception ex) {
+                        logger.warn("Unable to start all processors due to 
invalid flow configuration.");
+                        if (logger.isDebugEnabled()) {
+                            logger.warn(StringUtils.EMPTY, ex);
                         }
                     }
-
                 } else {
                     try {
                         loadFromConnectionResponse(response);
@@ -732,9 +729,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             controller.setPrimary(response.isPrimary());
 
             // start the processors as indicated by the dataflow
-            if (dataFlow.isAutoStartProcessors()) {
-                controller.startDelayed();
-            }
+            controller.onFlowInitialized(dataFlow.isAutoStartProcessors());
 
             loadTemplates(dataFlow.getTemplates());
             loadSnippets(dataFlow.getSnippets());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8b911c5a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index b60d187..05a8f01 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -692,7 +692,11 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
             final FunnelDTO funnelDTO = 
FlowFromDOMFactory.getFunnel(funnelElement);
             final Funnel funnel = controller.createFunnel(funnelDTO.getId());
             funnel.setPosition(toPosition(funnelDTO.getPosition()));
-            processGroup.addFunnel(funnel);
+            
+            // Since this is called during startup, we want to add the funnel 
without enabling it
+            // and then tell the controller to enable it. This way, if the 
controller is not fully
+            // initialized, the starting of the funnel is delayed until the 
controller is ready.
+            processGroup.addFunnel(funnel, false);
             controller.startConnectable(funnel);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8b911c5a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 856ccc1..0025caa 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -1036,9 +1036,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             }
 
             final ScheduledState state = funnel.getScheduledState();
-            if (state == ScheduledState.DISABLED) {
-                throw new IllegalStateException("Funnel is disabled");
-            } else if (state == ScheduledState.RUNNING) {
+            if (state == ScheduledState.RUNNING) {
                 return;
             }
             scheduler.startFunnel(funnel);
@@ -1132,27 +1130,6 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     }
 
     @Override
-    public void enableFunnel(final Funnel funnel) {
-        readLock.lock();
-        try {
-            if (!funnels.containsKey(funnel.getIdentifier())) {
-                throw new IllegalStateException("No Funnel with ID " + 
funnel.getIdentifier() + " belongs to this Process Group");
-            }
-
-            final ScheduledState state = funnel.getScheduledState();
-            if (state == ScheduledState.STOPPED) {
-                return;
-            } else if (state == ScheduledState.RUNNING) {
-                throw new IllegalStateException("Funnel is currently running");
-            }
-
-            scheduler.enableFunnel(funnel);
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    @Override
     public void enableInputPort(final Port port) {
         readLock.lock();
         try {
@@ -1215,26 +1192,6 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         }
     }
 
-    @Override
-    public void disableFunnel(final Funnel funnel) {
-        readLock.lock();
-        try {
-            if (!funnels.containsKey(funnel.getIdentifier())) {
-                throw new IllegalStateException("No Funnel with ID " + 
funnel.getIdentifier() + " belongs to this Process Group");
-            }
-
-            final ScheduledState state = funnel.getScheduledState();
-            if (state == ScheduledState.DISABLED) {
-                return;
-            } else if (state == ScheduledState.RUNNING) {
-                throw new IllegalStateException("Funnel is currently running");
-            }
-
-            scheduler.disableFunnel(funnel);
-        } finally {
-            readLock.unlock();
-        }
-    }
 
     @Override
     public void disableInputPort(final Port port) {
@@ -1546,8 +1503,14 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         return null;
     }
 
+    
     @Override
     public void addFunnel(final Funnel funnel) {
+        addFunnel(funnel, true);
+    }
+    
+    @Override
+    public void addFunnel(final Funnel funnel, final boolean autoStart) {
         writeLock.lock();
         try {
             final Funnel existing = 
funnels.get(requireNonNull(funnel).getIdentifier());
@@ -1557,6 +1520,10 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
             funnel.setProcessGroup(this);
             funnels.put(funnel.getIdentifier(), funnel);
+            
+            if ( autoStart ) {
+                startFunnel(funnel);
+            }
         } finally {
             writeLock.unlock();
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8b911c5a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/contextlistener/ApplicationStartupContextListener.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/contextlistener/ApplicationStartupContextListener.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/contextlistener/ApplicationStartupContextListener.java
index a631670..8b48abf 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/contextlistener/ApplicationStartupContextListener.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/contextlistener/ApplicationStartupContextListener.java
@@ -91,10 +91,8 @@ public class ApplicationStartupContextListener implements 
ServletContextListener
                      * reloading actions, the node will start the necessary
                      * processors.
                      */
-                    if (properties.getAutoResumeState()) {
-                        final FlowController flowController = 
flowService.getController();
-                        flowController.startDelayed();
-                    }
+                    final FlowController flowController = 
flowService.getController();
+                    
flowController.onFlowInitialized(properties.getAutoResumeState());
 
                     logger.info("Flow Controller started successfully.");
                 }

Reply via email to