NIFI-250: Refactoring of controller service states
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7de30ab1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7de30ab1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7de30ab1 Branch: refs/heads/NIFI-250 Commit: 7de30ab15ad9570233c4bff68f37acf324a66dda Parents: 81d8454 Author: Mark Payne <[email protected]> Authored: Thu Feb 19 13:30:37 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Thu Feb 19 13:30:37 2015 -0500 ---------------------------------------------------------------------- .../nifi/components/PropertyDescriptor.java | 8 + .../nifi/components/ValidationContext.java | 9 + .../apache/nifi/util/MockValidationContext.java | 5 + .../cluster/manager/impl/WebClusterManager.java | 30 ++- .../nifi-framework-core-api/.gitignore | 1 + .../controller/AbstractConfiguredComponent.java | 8 +- .../nifi/controller/ProcessScheduler.java | 16 +- .../apache/nifi/controller/ProcessorNode.java | 11 + .../nifi/controller/ReportingTaskNode.java | 12 + .../controller/ValidationContextFactory.java | 4 + .../service/ControllerServiceNode.java | 24 +- .../service/ControllerServiceProvider.java | 52 ++-- .../service/ControllerServiceState.java | 45 ++++ .../apache/nifi/controller/FlowController.java | 33 ++- .../nifi/controller/StandardFlowSerializer.java | 6 +- .../nifi/controller/StandardProcessorNode.java | 37 ++- .../reporting/AbstractReportingTaskNode.java | 33 +++ .../scheduling/StandardProcessScheduler.java | 115 +++++++- .../service/StandardControllerServiceNode.java | 79 +++--- .../StandardControllerServiceProvider.java | 270 +++++++++++-------- .../StandardControllerServiceReference.java | 3 +- .../processor/StandardSchedulingContext.java | 5 +- .../processor/StandardValidationContext.java | 14 + .../StandardValidationContextFactory.java | 5 + .../TestStandardControllerServiceProvider.java | 119 +++++++- .../controller/service/mock/DummyProcessor.java | 49 ++++ .../dao/impl/StandardControllerServiceDAO.java | 13 +- 27 files changed, 798 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java b/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java index a4c855b..48b9645 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/components/PropertyDescriptor.java @@ -142,6 +142,14 @@ public final class PropertyDescriptor implements Comparable<PropertyDescriptor> final Set<String> validIdentifiers = context.getControllerServiceLookup().getControllerServiceIdentifiers(controllerServiceDefinition); if (validIdentifiers != null && validIdentifiers.contains(input)) { final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(input); + if ( !context.isValidationRequired(controllerService) ) { + return new ValidationResult.Builder() + .input(input) + .subject(getName()) + .valid(true) + .build(); + } + if (!context.getControllerServiceLookup().isControllerServiceEnabled(controllerService)) { return new ValidationResult.Builder() .input(context.getControllerServiceLookup().getControllerServiceName(controllerService.getIdentifier())) http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java b/nifi/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java index e50f002..214fac9 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java @@ -79,4 +79,13 @@ public interface ValidationContext { * @return */ String getAnnotationData(); + + /** + * There are times when the framework needs to consider a component valid, even if it + * references an invalid ControllerService. This method will return <code>false</code> + * if the component is to be considered valid even if the given Controller Service is referenced + * and is invalid. + * @param service + */ + boolean isValidationRequired(ControllerService service); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java index 34fd7de..6d32d0b 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java @@ -96,4 +96,9 @@ public class MockValidationContext implements ValidationContext, ControllerServi final ControllerServiceConfiguration configuration = context.getConfiguration(serviceIdentifier); return configuration == null ? null : serviceIdentifier; } + + @Override + public boolean isValidationRequired(final ControllerService service) { + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index ee3c621..3e522b9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -1357,16 +1357,38 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return controllerServiceProvider.getAllControllerServices(); } + + @Override + public void disableReferencingServices(final ControllerServiceNode serviceNode) { + controllerServiceProvider.disableReferencingServices(serviceNode); + } + + @Override + public void enableReferencingServices(final ControllerServiceNode serviceNode) { + controllerServiceProvider.enableReferencingServices(serviceNode); + } + @Override - public void activateReferencingComponents(final ControllerServiceNode serviceNode) { - controllerServiceProvider.activateReferencingComponents(serviceNode); + public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) { + controllerServiceProvider.scheduleReferencingComponents(serviceNode); } @Override - public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) { - controllerServiceProvider.deactivateReferencingComponents(serviceNode); + public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { + controllerServiceProvider.unscheduleReferencingComponents(serviceNode); } + @Override + public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) { + controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode); + } + + @Override + public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) { + controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode); + } + + private byte[] serialize(final Document doc) throws TransformerException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DOMSource domSource = new DOMSource(doc); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore index ea8c4bf..29546b5 100755 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/.gitignore @@ -1 +1,2 @@ /target +/target/ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index af65b41..c44161f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -23,6 +23,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; @@ -260,12 +261,17 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone return true; } + @Override public Collection<ValidationResult> getValidationErrors() { + return getValidationErrors(Collections.<String>emptySet()); + } + + public Collection<ValidationResult> getValidationErrors(final Set<String> serviceIdentifiersNotToValidate) { final List<ValidationResult> results = new ArrayList<>(); lock.lock(); try { - final ValidationContext validationContext = validationContextFactory.newValidationContext(getProperties(), getAnnotationData()); + final ValidationContext validationContext = validationContextFactory.newValidationContext(serviceIdentifiersNotToValidate, getProperties(), getAnnotationData()); final Collection<ValidationResult> validationResults; try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java index 724d1f2..c3b6613 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java @@ -19,8 +19,7 @@ package org.apache.nifi.controller; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.OnUnscheduled; +import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.scheduling.SchedulingStrategy; public interface ProcessScheduler { @@ -151,9 +150,20 @@ public interface ProcessScheduler { void unschedule(ReportingTaskNode taskNode); /** - * Begins scheduling the given Reporting Taks to run + * Begins scheduling the given Reporting Task to run * @param taskNode */ void schedule(ReportingTaskNode taskNode); + /** + * Enables the Controller Service so that it can be used by Reporting Tasks and Processors + * @param service + */ + void enableControllerService(ControllerServiceNode service); + + /** + * Disables the Controller Service so that it can be updated + * @param service + */ + void disableControllerService(ControllerServiceNode service); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 81ef7c0..3189edd 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.Processor; @@ -82,4 +83,14 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen * @return */ public abstract int getActiveThreadCount(); + + /** + * Verifies that this Processor can be started if the provided set of + * services are enabled. This is introduced because we need to verify that all components + * can be started before starting any of them. In order to do that, we need to know that this + * component can be started if the given services are enabled, as we will then enable the given + * services before starting this component. + * @param ignoredReferences + */ + public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java index 76285d1..c932f30 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java @@ -16,8 +16,10 @@ */ package org.apache.nifi.controller; +import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; @@ -74,6 +76,16 @@ public interface ReportingTaskNode extends ConfiguredComponent { void setComments(String comment); + /** + * Verifies that this Reporting Task can be enabled if the provided set of + * services are enabled. This is introduced because we need to verify that all components + * can be started before starting any of them. In order to do that, we need to know that this + * component can be started if the given services are enabled, as we will then enable the given + * services before starting this component. + * @param ignoredReferences + */ + void verifyCanStart(Set<ControllerServiceNode> ignoredReferences); + void verifyCanStart(); void verifyCanStop(); void verifyCanDisable(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java index df3c251..09479d5 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller; import java.util.Map; +import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -24,4 +25,7 @@ import org.apache.nifi.components.ValidationContext; public interface ValidationContextFactory { ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData); + + ValidationContext newValidationContext(Set<String> serviceIdentifiersToNotValidate, Map<PropertyDescriptor, String> properties, String annotationData); + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index 4822958..68357b8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.service; import java.util.Set; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; @@ -27,18 +28,9 @@ public interface ControllerServiceNode extends ConfiguredComponent { ControllerService getControllerServiceImplementation(); - boolean isDisabled(); - - void enable(); - void disable(); + ControllerServiceState getState(); + void setState(ControllerServiceState state); - /** - * Disables the Controller Service but does not verify that the provided set of referencing - * Controller Services should be verified as disabled first - * @param ignoredReferences - */ - void disable(Set<ControllerServiceNode> ignoredReferences); - ControllerServiceReference getReferences(); void addReference(ConfiguredComponent referringComponent); @@ -62,6 +54,16 @@ public interface ControllerServiceNode extends ConfiguredComponent { */ void verifyCanDisable(Set<ControllerServiceNode> ignoredReferences); + /** + * Verifies that this Controller Service can be enabled if the provided set of + * services are also enabled. This is introduced because we can have an instance where + * A reference B, which references C, which references A and we want to enable + * Service A. In this case, the cycle needs to not cause us to fail, so we want to verify + * that A can be enabled if A and B also are. + * @param ignoredReferences + */ + void verifyCanEnable(Set<ControllerServiceNode> ignoredReferences); + void verifyCanDelete(); void verifyCanUpdate(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index 7a767bf..351a036 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -75,27 +75,47 @@ public interface ControllerServiceProvider extends ControllerServiceLookup { */ Set<ControllerServiceNode> getAllControllerServices(); + /** - * Recursively stops all Processors and Reporting Tasks that are referencing the given Controller Service, - * as well as disabling any Controller Service that references this Controller Service (and stops - * all Reporting Task or Controller Service that is referencing it, and so on). + * Recursively unschedules all schedulable components (Processors and Reporting Tasks) that reference the given + * Controller Service. For any Controller services that reference this one, its schedulable referencing components will also + * be unscheduled. * @param serviceNode */ - void deactivateReferencingComponents(ControllerServiceNode serviceNode); + void unscheduleReferencingComponents(ControllerServiceNode serviceNode); /** - * <p> - * Starts any enabled Processors and Reporting Tasks that are referencing this Controller Service. If other Controller - * Services reference this Controller Service, will also enable those services and 'activate' any components referencing - * them. - * </p> - * - * <p> - * NOTE: If any component cannot be started, an IllegalStateException will be thrown an no more components will - * be activated. This method provides no atomicity. - * </p> - * + * Disables any Controller Service that references the provided Controller Service. This action is performed recursively + * so that if service A references B and B references C, disabling references for C will first disable A, then B. + * @param serviceNode + */ + void disableReferencingServices(ControllerServiceNode serviceNode); + + /** + * Verifies that all Controller Services referencing the provided ControllerService can be enabled. + * @param serviceNode + */ + void verifyCanEnableReferencingServices(ControllerServiceNode serviceNode); + + /** + * Verifies that all enabled Processors referencing the ControllerService (or a service that depends on + * the provided service) can be scheduled to run. + * @param serviceNode + */ + void verifyCanScheduleReferencingComponents(ControllerServiceNode serviceNode); + + /** + * Enables all Controller Services that are referencing the given service. If Service A references Service B and Service + * B references serviceNode, Service A and B will both be enabled. + * @param serviceNode + */ + void enableReferencingServices(ControllerServiceNode serviceNode); + + /** + * Schedules any schedulable component (Processor, ReportingTask) that is referencing the given Controller Service + * to run. This is performed recursively, so if a Processor is referencing Service A, which is referencing serviceNode, + * then the Processor will also be started. * @param serviceNode */ - void activateReferencingComponents(ControllerServiceNode serviceNode); + void scheduleReferencingComponents(ControllerServiceNode serviceNode); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java new file mode 100644 index 0000000..2ed8fd9 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service; + + +/** + * Represents the valid states for a Controller Service. + */ +public enum ControllerServiceState { + /** + * Controller Service is disabled and cannot be used. + */ + DISABLED, + + /** + * Controller Service has been disabled but has not yet finished its lifecycle + * methods. + */ + DISABLING, + + /** + * Controller Service has been enabled but has not yet finished its lifecycle methods. + */ + ENABLING, + + /** + * Controller Service has been enabled and has finished its lifecycle methods. The Controller SErvice + * is ready to be used. + */ + ENABLED; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 2825d5b..0a86145 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2570,12 +2570,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H return reportingTasks.values(); } - - @Override - public void activateReferencingComponents(final ControllerServiceNode serviceNode) { - controllerServiceProvider.activateReferencingComponents(serviceNode); - } - @Override public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { return controllerServiceProvider.createControllerService(type, id, firstTimeAdded); @@ -2591,10 +2585,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H processScheduler.disableReportingTask(reportingTaskNode); } + @Override + public void disableReferencingServices(final ControllerServiceNode serviceNode) { + controllerServiceProvider.disableReferencingServices(serviceNode); + } + + @Override + public void enableReferencingServices(final ControllerServiceNode serviceNode) { + controllerServiceProvider.enableReferencingServices(serviceNode); + } @Override - public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) { - controllerServiceProvider.deactivateReferencingComponents(serviceNode); + public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) { + controllerServiceProvider.scheduleReferencingComponents(serviceNode); + } + + @Override + public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { + controllerServiceProvider.unscheduleReferencingComponents(serviceNode); } @Override @@ -2609,6 +2617,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H controllerServiceProvider.disableControllerService(serviceNode); } + @Override + public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) { + controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode); + } + + @Override + public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) { + controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode); + } @Override public ControllerService getControllerService(final String serviceIdentifier) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java index f281fa7..832df7c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java @@ -41,6 +41,7 @@ import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Size; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.groups.ProcessGroup; @@ -412,7 +413,10 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(serviceElement, "name", serviceNode.getName()); addTextElement(serviceElement, "comment", serviceNode.getComments()); addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName()); - addTextElement(serviceElement, "enabled", String.valueOf(!serviceNode.isDisabled())); + + final ControllerServiceState state = serviceNode.getState(); + final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING); + addTextElement(serviceElement, "enabled", String.valueOf(enabled)); addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData(), encryptor); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index af25955..355e303 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -52,6 +52,7 @@ import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.LogLevel; @@ -120,7 +121,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private SchedulingStrategy schedulingStrategy; // guarded by read/write lock @SuppressWarnings("deprecation") - StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory, + public StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider) { super(processor, uuid, validationContextFactory, controllerServiceProvider); @@ -1192,8 +1193,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable public void verifyCanStart() { readLock.lock(); try { - if (scheduledState.get() != ScheduledState.STOPPED) { - throw new IllegalStateException(this + " is not stopped"); + switch (getScheduledState()) { + case DISABLED: + throw new IllegalStateException(this + " cannot be started because it is disabled"); + case RUNNING: + throw new IllegalStateException(this + " cannot be started because it is already running"); + case STOPPED: + break; } verifyNoActiveThreads(); @@ -1204,6 +1210,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable readLock.unlock(); } } + + @Override + public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) { + switch (getScheduledState()) { + case DISABLED: + throw new IllegalStateException(this + " cannot be started because it is disabled"); + case RUNNING: + throw new IllegalStateException(this + " cannot be started because it is already running"); + case STOPPED: + break; + } + verifyNoActiveThreads(); + + final Set<String> ids = new HashSet<>(); + for ( final ControllerServiceNode node : ignoredReferences ) { + ids.add(node.getIdentifier()); + } + + final Collection<ValidationResult> validationResults = getValidationErrors(ids); + for ( final ValidationResult result : validationResults ) { + if ( !result.isValid() ) { + throw new IllegalStateException(this + " cannot be started because it is not valid: " + result); + } + } + } @Override public void verifyCanStop() { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index 079ff31..272c0ba 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -16,9 +16,13 @@ */ package org.apache.nifi.controller.reporting; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceLookup; @@ -28,6 +32,7 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.annotation.OnConfigured; import org.apache.nifi.controller.exception.ProcessorLifeCycleException; +import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.nar.NarCloseable; @@ -212,4 +217,32 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon throw new IllegalStateException("Cannot update " + reportingTask + " because it is currently running"); } } + + @Override + public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) { + switch (getScheduledState()) { + case DISABLED: + throw new IllegalStateException(this + " cannot be started because it is disabled"); + case RUNNING: + throw new IllegalStateException(this + " cannot be started because it is already running"); + case STOPPED: + break; + } + final int activeThreadCount = getActiveThreadCount(); + if ( activeThreadCount > 0 ) { + throw new IllegalStateException(this + " cannot be started because it has " + activeThreadCount + " active threads already"); + } + + final Set<String> ids = new HashSet<>(); + for ( final ControllerServiceNode node : ignoredReferences ) { + ids.add(node.getIdentifier()); + } + + final Collection<ValidationResult> validationResults = getValidationErrors(ids); + for ( final ValidationResult result : validationResults ) { + if ( !result.isValid() ) { + throw new IllegalStateException(this + " cannot be started because it is not valid: " + result); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index b6699d2..2b4757d 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -45,6 +45,8 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.annotation.OnConfigured; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.logging.ProcessorLog; @@ -212,7 +214,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { if (!scheduleState.isScheduled()) { return; } - + + taskNode.verifyCanStop(); final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); final ReportingTask reportingTask = taskNode.getReportingTask(); scheduleState.setScheduled(false); @@ -313,11 +316,12 @@ public final class StandardProcessScheduler implements ProcessScheduler { return; } } catch (final Exception e) { + final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e; final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor()); procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}", - new Object[]{procNode.getProcessor(), e.getCause(), administrativeYieldDuration}, e.getCause()); - LOG.error("Failed to invoke @OnScheduled method due to {}", e.getCause().toString(), e.getCause()); + new Object[]{procNode.getProcessor(), cause.getCause(), administrativeYieldDuration}, cause.getCause()); + LOG.error("Failed to invoke @OnScheduled method due to {}", cause.getCause().toString(), cause.getCause()); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext); @@ -610,4 +614,109 @@ public final class StandardProcessScheduler implements ProcessScheduler { } return scheduleState; } + + @Override + public void enableControllerService(final ControllerServiceNode service) { + service.verifyCanEnable(); + service.setState(ControllerServiceState.ENABLING); + final ScheduleState scheduleState = getScheduleState(service); + + final Runnable enableRunnable = new Runnable() { + @Override + public void run() { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + long lastStopTime = scheduleState.getLastStopTime(); + final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider); + + while (true) { + try { + synchronized (scheduleState) { + // if no longer enabled, then we're finished. This can happen, for example, + // if the @OnEnabled method throws an Exception and the user disables the service + // while we're administratively yielded. + // + // we also check if the schedule state's last stop time is equal to what it was before. + // if not, then means that the service has been disabled and enabled again, so we should just + // bail; another thread will be responsible for invoking the @OnEnabled methods. + if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) { + return; + } + + ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service.getControllerServiceImplementation(), configContext); + heartbeater.heartbeat(); + service.setState(ControllerServiceState.ENABLED); + return; + } + } catch (final Exception e) { + // TODO: Generate a bulletin just like in startProcessor + final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e; + LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString()); + if ( LOG.isDebugEnabled() ) { + LOG.error("", cause); + } + + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext); + Thread.sleep(administrativeYieldMillis); + continue; + } + } + } catch (final Throwable t) { + // TODO: Generate a bulletin just like in startProcessor + final Throwable cause = (t instanceof InvocationTargetException) ? t.getCause() : t; + LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString()); + if ( LOG.isDebugEnabled() ) { + LOG.error("", cause); + } + } + } + }; + + scheduleState.setScheduled(true); + componentLifeCycleThreadPool.execute(enableRunnable); + } + + @Override + public void disableControllerService(final ControllerServiceNode service) { + service.verifyCanDisable(); + + final ScheduleState state = getScheduleState(requireNonNull(service)); + final Runnable disableRunnable = new Runnable() { + @Override + public void run() { + synchronized (state) { + state.setScheduled(false); + } + + try (final NarCloseable x = NarCloseable.withNarLoader()) { + final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider); + + while(true) { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext); + heartbeater.heartbeat(); + service.setState(ControllerServiceState.DISABLED); + return; + } catch (final Exception e) { + // TODO: Generate a bulletin just like in startProcessor + final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e; + LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString()); + if ( LOG.isDebugEnabled() ) { + LOG.error("", cause); + } + + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext); + try { + Thread.sleep(administrativeYieldMillis); + } catch (final InterruptedException ie) {} + + continue; + } + } + } + } + }; + + service.setState(ControllerServiceState.DISABLING); + componentLifeCycleThreadPool.execute(disableRunnable); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index b29f86b..c8c7ec9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -16,14 +16,16 @@ */ package org.apache.nifi.controller.service; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; @@ -40,7 +42,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i private final ControllerService implementation; private final ControllerServiceProvider serviceProvider; - private final AtomicBoolean disabled = new AtomicBoolean(true); + private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED); private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -57,34 +59,8 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i this.serviceProvider = serviceProvider; } - @Override - public boolean isDisabled() { - return disabled.get(); - } - - - @Override - public void enable() { - if ( !isValid() ) { - throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid"); - } - - this.disabled.set(false); - } @Override - public void disable() { - verifyCanDisable(); - this.disabled.set(true); - } - - @Override - public void disable(final Set<ControllerServiceNode> ignoredReferences) { - verifyCanDisable(ignoredReferences); - this.disabled.set(true); - } - - @Override public ControllerService getProxiedControllerService() { return proxedControllerService; } @@ -126,7 +102,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void verifyModifiable() throws IllegalStateException { - if (!isDisabled()) { + if (getState() != ControllerServiceState.DISABLED) { throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first."); } } @@ -134,7 +110,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void setProperty(final String name, final String value) { super.setProperty(name, value); - onConfigured(); } @@ -160,7 +135,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void verifyCanDelete() { - if ( !isDisabled() ) { + if ( getState() != ControllerServiceState.DISABLED ) { throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled"); } } @@ -172,6 +147,11 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) { + final ControllerServiceState state = getState(); + if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) { + throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled"); + } + final ControllerServiceReference references = getReferences(); for ( final ConfiguredComponent activeReference : references.getActiveReferences() ) { @@ -183,14 +163,37 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void verifyCanEnable() { - if ( !isDisabled() ) { + if ( getState() != ControllerServiceState.DISABLED ) { throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled"); } + + if ( !isValid() ) { + throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + getValidationErrors()); + } + } + + @Override + public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) { + if (getState() != ControllerServiceState.DISABLED) { + throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled"); + } + + final Set<String> ids = new HashSet<>(); + for ( final ControllerServiceNode node : ignoredReferences ) { + ids.add(node.getIdentifier()); + } + + final Collection<ValidationResult> validationResults = getValidationErrors(ids); + for ( final ValidationResult result : validationResults ) { + if ( !result.isValid() ) { + throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + result); + } + } } @Override public void verifyCanUpdate() { - if ( !isDisabled() ) { + if ( getState() != ControllerServiceState.DISABLED ) { throw new IllegalStateException(implementation + " cannot be updated because it is not disabled"); } } @@ -214,4 +217,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i writeLock.unlock(); } } + + @Override + public ControllerServiceState getState() { + return stateRef.get(); + } + + @Override + public void setState(final ControllerServiceState state) { + this.stateRef.set(state); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index c584188..d6596a4 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -32,8 +32,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.nifi.annotation.lifecycle.OnAdded; -import org.apache.nifi.annotation.lifecycle.OnDisabled; -import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; @@ -42,6 +40,7 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ControllerServiceNotFoundException; import org.apache.nifi.controller.exception.ProcessorLifeCycleException; @@ -128,7 +127,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { final ControllerServiceNode node = serviceNodeHolder.get(); - if (node.isDisabled() && !validDisabledMethods.contains(method)) { + final ControllerServiceState state = node.getState(); + final boolean disabled = (state != ControllerServiceState.ENABLED); // only allow method call if service state is ENABLED. + if (disabled && !validDisabledMethods.contains(method)) { // Use nar class loader here because we are implicitly calling toString() on the original implementation. try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService + " because the Controller Service is disabled"); @@ -182,29 +183,108 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } + + @Override - public void enableControllerService(final ControllerServiceNode serviceNode) { - serviceNode.verifyCanEnable(); + public void disableReferencingServices(final ControllerServiceNode serviceNode) { + // Get a list of all Controller Services that need to be disabled, in the order that they need to be + // disabled. + final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class); + final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable); - try (final NarCloseable x = NarCloseable.withNarLoader()) { - final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, this); - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation(), configContext); + for ( final ControllerServiceNode nodeToDisable : toDisable ) { + final ControllerServiceState state = nodeToDisable.getState(); + + if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) { + nodeToDisable.verifyCanDisable(serviceSet); + } } - serviceNode.enable(); + Collections.reverse(toDisable); + for ( final ControllerServiceNode nodeToDisable : toDisable ) { + final ControllerServiceState state = nodeToDisable.getState(); + + if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) { + disableControllerService(nodeToDisable); + } + } } + @Override - public void disableControllerService(final ControllerServiceNode serviceNode) { - serviceNode.verifyCanDisable(); - - // We must set the service to disabled before we invoke the OnDisabled methods because the service node - // can throw Exceptions if we attempt to disable the service while it's known to be in use. - serviceNode.disable(); + public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) { + // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service, + // or a service that references this controller service, etc. + final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class); + final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation()); + // verify that we can start all components (that are not disabled) before doing anything + for ( final ProcessorNode node : processors ) { + if ( node.getScheduledState() != ScheduledState.DISABLED ) { + node.verifyCanStart(); + } + } + for ( final ReportingTaskNode node : reportingTasks ) { + if ( node.getScheduledState() != ScheduledState.DISABLED ) { + node.verifyCanStart(); + } + } + + // start all of the components that are not disabled + for ( final ProcessorNode node : processors ) { + if ( node.getScheduledState() != ScheduledState.DISABLED ) { + node.getProcessGroup().startProcessor(node); + } } + for ( final ReportingTaskNode node : reportingTasks ) { + if ( node.getScheduledState() != ScheduledState.DISABLED ) { + processScheduler.schedule(node); + } + } + } + + @Override + public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) { + // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service, + // or a service that references this controller service, etc. + final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class); + final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); + + // verify that we can stop all components (that are running) before doing anything + for ( final ProcessorNode node : processors ) { + if ( node.getScheduledState() == ScheduledState.RUNNING ) { + node.verifyCanStop(); + } + } + for ( final ReportingTaskNode node : reportingTasks ) { + if ( node.getScheduledState() == ScheduledState.RUNNING ) { + node.verifyCanStop(); + } + } + + // stop all of the components that are running + for ( final ProcessorNode node : processors ) { + if ( node.getScheduledState() == ScheduledState.RUNNING ) { + node.getProcessGroup().stopProcessor(node); + } + } + for ( final ReportingTaskNode node : reportingTasks ) { + if ( node.getScheduledState() == ScheduledState.RUNNING ) { + processScheduler.unschedule(node); + } + } + } + + @Override + public void enableControllerService(final ControllerServiceNode serviceNode) { + serviceNode.verifyCanEnable(); + processScheduler.enableControllerService(serviceNode); + } + + @Override + public void disableControllerService(final ControllerServiceNode serviceNode) { + serviceNode.verifyCanDisable(); + processScheduler.disableControllerService(serviceNode); } @Override @@ -221,7 +301,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public boolean isControllerServiceEnabled(final String serviceIdentifier) { final ControllerServiceNode node = controllerServices.get(serviceIdentifier); - return (node == null) ? false : !node.isDisabled(); + return (node == null) ? false : (ControllerServiceState.ENABLED == node.getState()); } @Override @@ -281,120 +361,94 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi return new HashSet<>(controllerServices.values()); } - @Override - public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) { - deactivateReferencingComponents(serviceNode, new HashSet<ControllerServiceNode>()); - } - private void deactivateReferencingComponents(final ControllerServiceNode serviceNode, final Set<ControllerServiceNode> visited) { - final ControllerServiceReference reference = serviceNode.getReferences(); + /** + * Returns a List of all components that reference the given referencedNode (either directly or indirectly through + * another service) that are also of the given componentType. The list that is returned is in the order in which they will + * need to be 'activated' (enabled/started). + * @param referencedNode + * @param componentType + * @return + */ + private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) { + final List<T> references = new ArrayList<>(); - final Set<ConfiguredComponent> components = reference.getActiveReferences(); - for (final ConfiguredComponent component : components) { - if ( component instanceof ControllerServiceNode ) { - // If we've already visited this component (there is a loop such that - // we are disabling Controller Service A, but B depends on A and A depends on B) - // we don't need to disable this component because it will be disabled after we return - if ( visited.contains(component) ) { - continue; - } + for ( final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents() ) { + if ( componentType.isAssignableFrom(referencingComponent.getClass()) ) { + references.add(componentType.cast(referencingComponent)); + } + + if ( referencingComponent instanceof ControllerServiceNode ) { + final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent; - visited.add(serviceNode); - deactivateReferencingComponents((ControllerServiceNode) component, visited); + // find components recursively that depend on referencingNode. + final List<T> recursive = findRecursiveReferences(referencingNode, componentType); - if (isControllerServiceEnabled(serviceNode.getIdentifier())) { - serviceNode.verifyCanDisable(visited); - serviceNode.disable(visited); - } - } else if ( component instanceof ReportingTaskNode ) { - final ReportingTaskNode taskNode = (ReportingTaskNode) component; - if (taskNode.isRunning()) { - taskNode.verifyCanStop(); - processScheduler.unschedule(taskNode); - } - } else if ( component instanceof ProcessorNode ) { - final ProcessorNode procNode = (ProcessorNode) component; - if ( procNode.isRunning() ) { - procNode.getProcessGroup().stopProcessor(procNode); - } + // For anything that depends on referencing node, we want to add it to the list, but we know + // that it must come after the referencing node, so we first remove any existing occurrence. + references.removeAll(recursive); + references.addAll(recursive); } } + + return references; } @Override - public void activateReferencingComponents(final ControllerServiceNode serviceNode) { - activateReferencingComponents(serviceNode, new HashSet<ControllerServiceNode>()); + public void enableReferencingServices(final ControllerServiceNode serviceNode) { + final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class); + enableReferencingServices(serviceNode, recursiveReferences); } - - /** - * Recursively enables this controller service and any controller service that it references. - * @param serviceNode - */ - private void activateReferencedComponents(final ControllerServiceNode serviceNode) { - for ( final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet() ) { - final PropertyDescriptor key = entry.getKey(); - if ( key.getControllerServiceDefinition() == null ) { - continue; - } - - final String serviceId = entry.getValue() == null ? key.getDefaultValue() : entry.getValue(); - if ( serviceId == null ) { - continue; - } - - final ControllerServiceNode referencedNode = getControllerServiceNode(serviceId); - if ( referencedNode == null ) { - throw new IllegalStateException("Cannot activate referenced component of " + serviceNode + " because no service exists with ID " + serviceId); + private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) { + serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences)); + + final List<ControllerServiceNode> toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class); + for ( final ControllerServiceNode nodeToEnable : toEnable ) { + final ControllerServiceState state = nodeToEnable.getState(); + if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) { + nodeToEnable.verifyCanEnable(); } - - activateReferencedComponents(referencedNode); - - if ( referencedNode.isDisabled() ) { - enableControllerService(referencedNode); + } + + for ( final ControllerServiceNode nodeToEnable : toEnable ) { + final ControllerServiceState state = nodeToEnable.getState(); + if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) { + enableControllerService(nodeToEnable); } } } - private void activateReferencingComponents(final ControllerServiceNode serviceNode, final Set<ControllerServiceNode> visited) { - if ( serviceNode.isDisabled() ) { - throw new IllegalStateException("Cannot activate referencing components of " + serviceNode.getControllerServiceImplementation() + " because the Controller Service is disabled"); + @Override + public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) { + final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class); + final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices); + + for ( final ControllerServiceNode referencingService : referencingServices ) { + referencingService.verifyCanEnable(referencingServiceSet); } + } + + @Override + public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) { + final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class); + final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class); + final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class); - final ControllerServiceReference ref = serviceNode.getReferences(); - final Set<ConfiguredComponent> components = ref.getReferencingComponents(); + final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices); - // First, activate any other controller services. We do this first so that we can - // avoid the situation where Processor X depends on Controller Services Y and Z; and - // Controller Service Y depends on Controller Service Z. In this case, if we first attempted - // to start Processor X, we would fail because Controller Service Y is disabled. THis way, we - // can recursively enable everything. - for ( final ConfiguredComponent component : components ) { - if (component instanceof ControllerServiceNode) { - final ControllerServiceNode componentNode = (ControllerServiceNode) component; - activateReferencedComponents(componentNode); - - if ( componentNode.isDisabled() ) { - enableControllerService(componentNode); - } - - activateReferencingComponents(componentNode); + for ( final ReportingTaskNode taskNode : referencingReportingTasks ) { + if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) { + taskNode.verifyCanStart(referencingServiceSet); } } - for ( final ConfiguredComponent component : components ) { - if (component instanceof ProcessorNode) { - final ProcessorNode procNode = (ProcessorNode) component; - if ( !procNode.isRunning() ) { - procNode.getProcessGroup().startProcessor(procNode); - } - } else if (component instanceof ReportingTaskNode) { - final ReportingTaskNode taskNode = (ReportingTaskNode) component; - if ( !taskNode.isRunning() ) { - processScheduler.schedule(taskNode); - } + for ( final ProcessorNode procNode : referencingProcessors ) { + if ( procNode.getScheduledState() != ScheduledState.DISABLED ) { + procNode.verifyCanStart(referencingServiceSet); } } } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java index 97921d6..c470b99 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java @@ -66,7 +66,8 @@ public class StandardControllerServiceReference implements ControllerServiceRefe if (component instanceof ControllerServiceNode) { serviceNodes.add((ControllerServiceNode) component); - if ( !((ControllerServiceNode) component).isDisabled() ) { + final ControllerServiceState state = ((ControllerServiceNode) component).getState(); + if ( state != ControllerServiceState.DISABLED ) { activeReferences.add(component); } } else if (isRunning(component)) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index ac58504..c37a80d 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@ -25,6 +25,7 @@ import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.ControllerServiceState; public class StandardSchedulingContext implements SchedulingContext { @@ -45,8 +46,8 @@ public class StandardSchedulingContext implements SchedulingContext { throw new IllegalArgumentException("Cannot lease Controller Service because no Controller Service exists with identifier " + identifier); } - if (serviceNode.isDisabled()) { - throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is currently disabled"); + if ( serviceNode.getState() != ControllerServiceState.ENABLED ) { + throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is not currently enabled"); } if (!serviceNode.isValid()) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java index b216572..57f13d2 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java @@ -19,6 +19,7 @@ package org.apache.nifi.processor; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.apache.nifi.attribute.expression.language.PreparedQuery; import org.apache.nifi.attribute.expression.language.Query; @@ -38,11 +39,17 @@ public class StandardValidationContext implements ValidationContext { private final Map<PropertyDescriptor, String> properties; private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; private final String annotationData; + private final Set<String> serviceIdentifiersToNotValidate; public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Map<PropertyDescriptor, String> properties, final String annotationData) { + this(controllerServiceProvider, Collections.<String>emptySet(), properties, annotationData); + } + + public StandardValidationContext(final ControllerServiceProvider controllerServiceProvider, final Set<String> serviceIdentifiersToNotValidate, final Map<PropertyDescriptor, String> properties, final String annotationData) { this.controllerServiceProvider = controllerServiceProvider; this.properties = new HashMap<>(properties); this.annotationData = annotationData; + this.serviceIdentifiersToNotValidate = serviceIdentifiersToNotValidate; preparedQueries = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { @@ -94,4 +101,11 @@ public class StandardValidationContext implements ValidationContext { public ControllerServiceLookup getControllerServiceLookup() { return controllerServiceProvider; } + + @Override + public boolean isValidationRequired(final ControllerService service) { + return !serviceIdentifiersToNotValidate.contains(service.getIdentifier()); + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7de30ab1/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java index e172f93..c3df987 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java @@ -17,6 +17,7 @@ package org.apache.nifi.processor; import java.util.Map; +import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -36,4 +37,8 @@ public class StandardValidationContextFactory implements ValidationContextFactor return new StandardValidationContext(serviceProvider, properties, annotationData); } + @Override + public ValidationContext newValidationContext(final Set<String> serviceIdentifiersToNotValidate, final Map<PropertyDescriptor, String> properties, final String annotationData) { + return new StandardValidationContext(serviceProvider, serviceIdentifiersToNotValidate, properties, annotationData); + } }
