Repository: incubator-nifi Updated Branches: refs/heads/NIFI-250 cb84829b3 -> ea17dbec6
NIFI-250: Only run controller services and reporting tasks on the nodes/ncm according to their Availability Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4de0fd02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4de0fd02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4de0fd02 Branch: refs/heads/NIFI-250 Commit: 4de0fd02678ce13d5a8b08edbc35a677560ded4d Parents: 2df4500 Author: Mark Payne <[email protected]> Authored: Mon Jan 26 19:05:31 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Jan 26 19:05:31 2015 -0500 ---------------------------------------------------------------------- .../cluster/manager/impl/WebClusterManager.java | 16 ++++++++ .../apache/nifi/controller/FlowController.java | 40 ++++++++++++++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4de0fd02/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 9d9640d..27620b5 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -121,6 +121,7 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType; import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.Availability; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ReportingTaskNode; @@ -128,6 +129,7 @@ import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; +import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; import org.apache.nifi.controller.service.ControllerServiceNode; @@ -393,7 +395,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public void heartbeat() { } }, this, encryptor); + + // When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only + // going to be scheduling Reporting Tasks. Otherwise, it would not be okay. processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor)); + processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor)); processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10); processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10); } @@ -1338,11 +1344,21 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C @Override public void enableControllerService(final ControllerServiceNode serviceNode) { + if ( serviceNode.getAvailability() == Availability.NODE_ONLY ) { + serviceNode.setDisabled(false); // update disabled flag to stay in sync across cluster + return; + } + controllerServiceProvider.enableControllerService(serviceNode); } @Override public void disableControllerService(final ControllerServiceNode serviceNode) { + if ( serviceNode.getAvailability() == Availability.NODE_ONLY ) { + serviceNode.setDisabled(true); // update disabled flag to stay in sync across cluster + return; + } + controllerServiceProvider.disableControllerService(serviceNode); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4de0fd02/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 47e26c0..a215938 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2518,16 +2518,30 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H } public void startReportingTask(final ReportingTaskNode reportingTaskNode) { + if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) { + reportingTaskNode.setScheduledState(ScheduledState.RUNNING); // updated scheduled state to keep state in sync across cluster + return; + } + if (isTerminated()) { throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode + " because the controller is terminated"); } + if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) { + return; + } + reportingTaskNode.verifyCanStart(); - - processScheduler.schedule(reportingTaskNode); + processScheduler.schedule(reportingTaskNode); } + public void stopReportingTask(final ReportingTaskNode reportingTaskNode) { + if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) { + reportingTaskNode.setScheduledState(ScheduledState.STOPPED); // updated scheduled state to keep state in sync across cluster + return; + } + if (isTerminated()) { return; } @@ -2646,25 +2660,43 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H } public void enableReportingTask(final ReportingTaskNode reportingTaskNode) { + if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) { + reportingTaskNode.setScheduledState(ScheduledState.STOPPED); // updated scheduled state to keep state in sync across cluster + return; + } + reportingTaskNode.verifyCanEnable(); - processScheduler.enableReportingTask(reportingTaskNode); } public void disableReportingTask(final ReportingTaskNode reportingTaskNode) { + if ( reportingTaskNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) { + reportingTaskNode.setScheduledState(ScheduledState.DISABLED); // updated scheduled state to keep state in sync across cluster + return; + } + reportingTaskNode.verifyCanDisable(); - processScheduler.disableReportingTask(reportingTaskNode); } @Override public void enableControllerService(final ControllerServiceNode serviceNode) { + if ( serviceNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) { + serviceNode.setDisabled(false); // set the disabled flag so that we can keep in sync with cluster + return; + } + serviceNode.verifyCanEnable(); controllerServiceProvider.enableControllerService(serviceNode); } @Override public void disableControllerService(final ControllerServiceNode serviceNode) { + if ( serviceNode.getAvailability() == Availability.CLUSTER_MANAGER_ONLY ) { + serviceNode.setDisabled(true); // set the disabled flag so that we can keep in sync with cluster + return; + } + serviceNode.verifyCanDisable(); controllerServiceProvider.disableControllerService(serviceNode); }
