Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-250 cb84829b3 -> ea17dbec6


NIFI-250: Only run controller services and reporting tasks on the nodes/ncm 
according to their Availability


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4de0fd02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4de0fd02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4de0fd02

Branch: refs/heads/NIFI-250
Commit: 4de0fd02678ce13d5a8b08edbc35a677560ded4d
Parents: 2df4500
Author: Mark Payne <[email protected]>
Authored: Mon Jan 26 19:05:31 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Mon Jan 26 19:05:31 2015 -0500

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java | 16 ++++++++
 .../apache/nifi/controller/FlowController.java  | 40 ++++++++++++++++++--
 2 files changed, 52 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4de0fd02/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 9d9640d..27620b5 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -121,6 +121,7 @@ import 
org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.Availability;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
@@ -128,6 +129,7 @@ import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
 import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import 
org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
 import org.apache.nifi.controller.service.ControllerServiceNode;
@@ -393,7 +395,11 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
             public void heartbeat() {
             }
         }, this, encryptor);
+        
+        // When we construct the scheduling agents, we can pass null for a lot 
of the arguments because we are only
+        // going to be scheduling Reporting Tasks. Otherwise, it would not be 
okay.
         processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, 
new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor));
+        processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, 
new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
         processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 
10);
         processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
     }
@@ -1338,11 +1344,21 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
 
     @Override
     public void enableControllerService(final ControllerServiceNode 
serviceNode) {
+       if ( serviceNode.getAvailability() == Availability.NODE_ONLY ) {
+               serviceNode.setDisabled(false); // update disabled flag to stay 
in sync across cluster
+               return;
+        }
+       
         controllerServiceProvider.enableControllerService(serviceNode);
     }
     
     @Override
     public void disableControllerService(final ControllerServiceNode 
serviceNode) {
+       if ( serviceNode.getAvailability() == Availability.NODE_ONLY ) {
+               serviceNode.setDisabled(true);  // update disabled flag to stay 
in sync across cluster
+               return;
+        }
+       
         controllerServiceProvider.disableControllerService(serviceNode);
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4de0fd02/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 47e26c0..a215938 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2518,16 +2518,30 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, H
     }
 
     public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
+       if ( reportingTaskNode.getAvailability() == 
Availability.CLUSTER_MANAGER_ONLY ) {
+               reportingTaskNode.setScheduledState(ScheduledState.RUNNING); // 
updated scheduled state to keep state in sync across cluster
+               return;
+        }
+       
         if (isTerminated()) {
             throw new IllegalStateException("Cannot start reporting task " + 
reportingTaskNode + " because the controller is terminated");
         }
 
+        if ( reportingTaskNode.getAvailability() == 
Availability.CLUSTER_MANAGER_ONLY ) {
+               return;
+        }
+
         reportingTaskNode.verifyCanStart();
-        
-        processScheduler.schedule(reportingTaskNode);
+               processScheduler.schedule(reportingTaskNode);
     }
 
+    
     public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
+       if ( reportingTaskNode.getAvailability() == 
Availability.CLUSTER_MANAGER_ONLY ) {
+               reportingTaskNode.setScheduledState(ScheduledState.STOPPED); // 
updated scheduled state to keep state in sync across cluster
+               return;
+        }
+       
         if (isTerminated()) {
             return;
         }
@@ -2646,25 +2660,43 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, H
     }
     
     public void enableReportingTask(final ReportingTaskNode reportingTaskNode) 
{
+       if ( reportingTaskNode.getAvailability() == 
Availability.CLUSTER_MANAGER_ONLY ) {
+               reportingTaskNode.setScheduledState(ScheduledState.STOPPED); // 
updated scheduled state to keep state in sync across cluster
+               return;
+        }
+       
         reportingTaskNode.verifyCanEnable();
-        
         processScheduler.enableReportingTask(reportingTaskNode);
     }
     
     public void disableReportingTask(final ReportingTaskNode 
reportingTaskNode) {
+       if ( reportingTaskNode.getAvailability() == 
Availability.CLUSTER_MANAGER_ONLY ) {
+               reportingTaskNode.setScheduledState(ScheduledState.DISABLED); 
// updated scheduled state to keep state in sync across cluster
+               return;
+        }
+       
         reportingTaskNode.verifyCanDisable();
-        
         processScheduler.disableReportingTask(reportingTaskNode);
     }
     
     @Override
     public void enableControllerService(final ControllerServiceNode 
serviceNode) {
+        if ( serviceNode.getAvailability() == 
Availability.CLUSTER_MANAGER_ONLY ) {
+               serviceNode.setDisabled(false); // set the disabled flag so 
that we can keep in sync with cluster
+               return;
+        }
+
         serviceNode.verifyCanEnable();
         controllerServiceProvider.enableControllerService(serviceNode);
     }
     
     @Override
     public void disableControllerService(final ControllerServiceNode 
serviceNode) {
+       if ( serviceNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY 
) {
+               serviceNode.setDisabled(true);  // set the disabled flag so 
that we can keep in sync with cluster
+               return;
+        }
+       
         serviceNode.verifyCanDisable();
         controllerServiceProvider.disableControllerService(serviceNode);
     }

Reply via email to