Repository: incubator-nifi Updated Branches: refs/heads/develop b6b185947 -> 95b22a0ae
NIFI-241: Only call Processor methods with @OnAdded annotation when a Processor is actually added to graph instead of also calling on NiFi restart Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/21e809a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/21e809a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/21e809a7 Branch: refs/heads/develop Commit: 21e809a7cbb85f450749916e474dec93f0d023d6 Parents: f36eea3 Author: Mark Payne <[email protected]> Authored: Tue Jan 13 13:28:20 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Tue Jan 13 13:28:20 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/controller/FlowController.java | 35 ++++++++++++++++---- .../controller/StandardFlowSynchronizer.java | 2 +- .../nifi/fingerprint/FingerprintFactory.java | 2 +- 3 files changed, 30 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/21e809a7/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java index 99d8d6e..346e801 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -756,7 +756,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H /** * <p> - * Creates a new ProcessorNode with the given type and identifier.</p> + * Creates a new ProcessorNode with the given type and identifier and initializes it invoking the + * methods annotated with {@link OnAdded}. + * </p> * * @param type * @param id @@ -766,6 +768,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H * instantiated for any reason */ public ProcessorNode createProcessor(final String type, String id) throws ProcessorInstantiationException { + return createProcessor(type, id, true); + } + + /** + * <p> + * Creates a new ProcessorNode with the given type and identifier and optionally initializes it. + * </p> + * + * @param type the fully qualified Processor class name + * @param id the unique ID of the Processor + * @param firstTimeAdded whether or not this is the first time this Processor is added to the graph. If {@code true}, + * will invoke methods annotated with the {@link OnAdded} annotation. + * @return + * @throws NullPointerException if either arg is null + * @throws ProcessorInstantiationException if the processor cannot be + * instantiated for any reason + */ + public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException { id = id.intern(); final Processor processor = instantiateProcessor(type, id); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider); @@ -774,12 +794,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); - // TODO: We should only call this the first time that it is added to the graph.... - try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); - } catch (final Exception e) { - logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID); - throw new ProcessorLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e); + if ( firstTimeAdded ) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); + } catch (final Exception e) { + logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID); + throw new ProcessorLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e); + } } return procNode; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/21e809a7/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index ffea644..b60d187 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -594,7 +594,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor"); for (final Element processorElement : processorNodeList) { final ProcessorDTO processorDTO = FlowFromDOMFactory.getProcessor(processorElement, encryptor); - final ProcessorNode procNode = controller.createProcessor(processorDTO.getType(), processorDTO.getId()); + final ProcessorNode procNode = controller.createProcessor(processorDTO.getType(), processorDTO.getId(), false); processGroup.addProcessor(procNode); updateProcessor(procNode, processorDTO, processGroup, controller); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/21e809a7/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 9be46a1..7bdf278 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -467,7 +467,7 @@ public final class FingerprintFactory { Processor processor = null; try { if (controller != null) { - processor = controller.createProcessor(className, UUID.randomUUID().toString()).getProcessor(); + processor = controller.createProcessor(className, UUID.randomUUID().toString(), false).getProcessor(); } } catch (ProcessorInstantiationException e) { logger.warn("Unable to create Processor of type {} due to {}; its default properties will be fingerprinted instead of being ignored.", className, e.toString());
