http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 1762068..06d32e2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -118,15 +118,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi ClassLoader cl = null; final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader(); + final ExtensionManager extensionManager = flowController.getExtensionManager(); try { final Class<?> rawClass; try { - final Bundle csBundle = ExtensionManager.getBundle(bundleCoordinate); + final Bundle csBundle = extensionManager.getBundle(bundleCoordinate); if (csBundle == null) { throw new ControllerServiceInstantiationException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate()); } - cl = ExtensionManager.createInstanceClassLoader(type, id, csBundle, additionalUrls); + cl = extensionManager.createInstanceClassLoader(type, id, csBundle, additionalUrls); Thread.currentThread().setContextClassLoader(cl); rawClass = Class.forName(type, false, cl); } catch (final Exception e) { @@ -138,7 +139,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class); final ControllerService originalService = controllerServiceClass.newInstance(); - final StandardControllerServiceInvocationHandler invocationHandler = new StandardControllerServiceInvocationHandler(originalService); + final StandardControllerServiceInvocationHandler invocationHandler = new StandardControllerServiceInvocationHandler(extensionManager, originalService); // extract all interfaces... controllerServiceClass is non null so getAllInterfaces is non null final List<Class<?>> interfaceList = ClassUtils.getAllInterfaces(controllerServiceClass); @@ -165,13 +166,13 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this, componentVarRegistry); final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler, - id, validationContextFactory, this, componentVarRegistry, flowController, validationTrigger); + id, validationContextFactory, this, componentVarRegistry, flowController, flowController.getExtensionManager(), validationTrigger); serviceNode.setName(rawClass.getSimpleName()); invocationHandler.setServiceNode(serviceNode); if (firstTimeAdded) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), originalService.getClass(), originalService.getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService); } catch (final Exception e) { throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e); @@ -244,7 +245,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry); final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id, - new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, componentVarRegistry, flowController, validationTrigger, true); + new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, componentVarRegistry, flowController, + flowController.getExtensionManager(), validationTrigger, true); serviceCache.putIfAbsent(id, serviceNode); return serviceNode; @@ -710,7 +712,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi group.removeControllerService(serviceNode); LogRepositoryFactory.removeRepository(serviceNode.getIdentifier()); - ExtensionManager.removeInstanceClassLoader(serviceNode.getIdentifier()); + final ExtensionManager extensionManager = flowController.getExtensionManager(); + extensionManager.removeInstanceClassLoader(serviceNode.getIdentifier()); serviceCache.remove(serviceNode.getIdentifier()); } @@ -819,4 +822,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) throws IllegalArgumentException { throw new UnsupportedOperationException("Cannot obtain Controller Service Identifiers for service type " + serviceType + " without providing a Process Group Identifier"); } + + @Override + public ExtensionManager getExtensionManager() { + return flowController.getExtensionManager(); + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java index 8464599..dbdbb6b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java @@ -70,16 +70,17 @@ public class StandardStateManagerProvider implements StateManagerProvider{ this.clusterStateProvider = clusterStateProvider; } - public static synchronized StateManagerProvider create(final NiFiProperties properties, final VariableRegistry variableRegistry) throws ConfigParseException, IOException { + public static synchronized StateManagerProvider create(final NiFiProperties properties, final VariableRegistry variableRegistry, final ExtensionManager extensionManager) + throws ConfigParseException, IOException { if (provider != null) { return provider; } - final StateProvider localProvider = createLocalStateProvider(properties,variableRegistry); + final StateProvider localProvider = createLocalStateProvider(properties,variableRegistry, extensionManager); final StateProvider clusterProvider; if (properties.isNode()) { - clusterProvider = createClusteredStateProvider(properties,variableRegistry); + clusterProvider = createClusteredStateProvider(properties,variableRegistry, extensionManager); } else { clusterProvider = null; } @@ -88,20 +89,23 @@ public class StandardStateManagerProvider implements StateManagerProvider{ return provider; } - private static StateProvider createLocalStateProvider(final NiFiProperties properties, final VariableRegistry variableRegistry) throws IOException, ConfigParseException { + private static StateProvider createLocalStateProvider(final NiFiProperties properties, final VariableRegistry variableRegistry, final ExtensionManager extensionManager) + throws IOException, ConfigParseException { final File configFile = properties.getStateManagementConfigFile(); - return createStateProvider(configFile, Scope.LOCAL, properties, variableRegistry); + return createStateProvider(configFile, Scope.LOCAL, properties, variableRegistry, extensionManager); } - private static StateProvider createClusteredStateProvider(final NiFiProperties properties, final VariableRegistry variableRegistry) throws IOException, ConfigParseException { + private static StateProvider createClusteredStateProvider(final NiFiProperties properties, final VariableRegistry variableRegistry, final ExtensionManager extensionManager) + throws IOException, ConfigParseException { final File configFile = properties.getStateManagementConfigFile(); - return createStateProvider(configFile, Scope.CLUSTER, properties, variableRegistry); + return createStateProvider(configFile, Scope.CLUSTER, properties, variableRegistry, extensionManager); } private static StateProvider createStateProvider(final File configFile, final Scope scope, final NiFiProperties properties, - final VariableRegistry variableRegistry) throws ConfigParseException, IOException { + final VariableRegistry variableRegistry, final ExtensionManager extensionManager) + throws ConfigParseException, IOException { final String providerId; final String providerIdPropertyName; final String providerDescription; @@ -169,7 +173,7 @@ public class StandardStateManagerProvider implements StateManagerProvider{ final StateProvider provider; try { - provider = instantiateStateProvider(providerClassName); + provider = instantiateStateProvider(extensionManager, providerClassName); } catch (final Exception e) { throw new RuntimeException("Cannot create " + providerDescription + " of type " + providerClassName, e); } @@ -223,10 +227,10 @@ public class StandardStateManagerProvider implements StateManagerProvider{ return provider; } - private static StateProvider instantiateStateProvider(final String type) throws ClassNotFoundException, InstantiationException, IllegalAccessException { + private static StateProvider instantiateStateProvider(final ExtensionManager extensionManager, final String type) throws ClassNotFoundException, InstantiationException, IllegalAccessException { final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); try { - final List<Bundle> bundles = ExtensionManager.getBundles(type); + final List<Bundle> bundles = extensionManager.getBundles(type); if (bundles.size() == 0) { throw new IllegalStateException(String.format("The specified class '%s' is not known to this nifi.", type)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java index e68aba8..851aad3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java @@ -197,7 +197,7 @@ public class ConnectableTask { final String originalThreadName = Thread.currentThread().getName(); try { - try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(connectable.getRunnableComponent().getClass(), connectable.getIdentifier())) { + try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), connectable.getRunnableComponent().getClass(), connectable.getIdentifier())) { boolean shouldRun = connectable.getScheduledState() == ScheduledState.RUNNING; while (shouldRun) { connectable.onTrigger(processContext, activeSessionFactory); http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java index b05321f..b60302a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java @@ -20,6 +20,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.scheduling.LifecycleState; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.util.ReflectionUtils; @@ -28,16 +29,18 @@ public class ReportingTaskWrapper implements Runnable { private final ReportingTaskNode taskNode; private final LifecycleState lifecycleState; + private final ExtensionManager extensionManager; - public ReportingTaskWrapper(final ReportingTaskNode taskNode, final LifecycleState lifecycleState) { + public ReportingTaskWrapper(final ReportingTaskNode taskNode, final LifecycleState lifecycleState, final ExtensionManager extensionManager) { this.taskNode = taskNode; this.lifecycleState = lifecycleState; + this.extensionManager = extensionManager; } @Override public synchronized void run() { lifecycleState.incrementActiveThreadCount(null); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) { taskNode.getReportingTask().onTrigger(taskNode.getReportingContext()); } catch (final Throwable t) { final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask()); @@ -50,7 +53,7 @@ public class ReportingTaskWrapper implements Runnable { // if the reporting task is no longer scheduled to run and this is the last thread, // invoke the OnStopped methods if (!lifecycleState.isScheduled() && lifecycleState.getActiveThreadCount() == 1 && lifecycleState.mustCallOnStoppedMethods()) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, taskNode.getReportingTask().getClass(), taskNode.getIdentifier())) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, taskNode.getReportingTask(), taskNode.getConfigurationContext()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 7da5702..999a42d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -84,11 +84,13 @@ public class FingerprintFactory { private static final String ENCRYPTED_VALUE_SUFFIX = "}"; private final StringEncryptor encryptor; private final DocumentBuilder flowConfigDocBuilder; + private final ExtensionManager extensionManager; private static final Logger logger = LoggerFactory.getLogger(FingerprintFactory.class); - public FingerprintFactory(final StringEncryptor encryptor) { + public FingerprintFactory(final StringEncryptor encryptor, final ExtensionManager extensionManager) { this.encryptor = encryptor; + this.extensionManager = extensionManager; final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance(); documentBuilderFactory.setNamespaceAware(true); @@ -108,9 +110,10 @@ public class FingerprintFactory { } } - public FingerprintFactory(final StringEncryptor encryptor, final DocumentBuilder docBuilder) { + public FingerprintFactory(final StringEncryptor encryptor, final DocumentBuilder docBuilder, final ExtensionManager extensionManager) { this.encryptor = encryptor; this.flowConfigDocBuilder = docBuilder; + this.extensionManager = extensionManager; } /** @@ -408,7 +411,7 @@ public class FingerprintFactory { // get the temp instance of the Processor so that we know the default property values final BundleCoordinate coordinate = getCoordinate(className, bundle); - final ConfigurableComponent configurableComponent = ExtensionManager.getTempComponent(className, coordinate); + final ConfigurableComponent configurableComponent = extensionManager.getTempComponent(className, coordinate); if (configurableComponent == null) { logger.warn("Unable to get Processor of type {}; its default properties will be fingerprinted instead of being ignored.", className); } @@ -640,7 +643,7 @@ public class FingerprintFactory { // get the temp instance of the ControllerService so that we know the default property values final BundleCoordinate coordinate = getCoordinate(dto.getType(), dto.getBundle()); - final ConfigurableComponent configurableComponent = ExtensionManager.getTempComponent(dto.getType(), coordinate); + final ConfigurableComponent configurableComponent = extensionManager.getTempComponent(dto.getType(), coordinate); if (configurableComponent == null) { logger.warn("Unable to get ControllerService of type {}; its default properties will be fingerprinted instead of being ignored.", dto.getType()); } @@ -672,7 +675,7 @@ public class FingerprintFactory { private BundleCoordinate getCoordinate(final String type, final BundleDTO dto) { BundleCoordinate coordinate; try { - coordinate = BundleUtils.getCompatibleBundle(type, dto); + coordinate = BundleUtils.getCompatibleBundle(extensionManager, type, dto); } catch (final IllegalStateException e) { if (dto == null) { coordinate = BundleCoordinate.UNKNOWN_COORDINATE; @@ -697,7 +700,7 @@ public class FingerprintFactory { // get the temp instance of the ReportingTask so that we know the default property values final BundleCoordinate coordinate = getCoordinate(dto.getType(), dto.getBundle()); - final ConfigurableComponent configurableComponent = ExtensionManager.getTempComponent(dto.getType(), coordinate); + final ConfigurableComponent configurableComponent = extensionManager.getTempComponent(dto.getType(), coordinate); if (configurableComponent == null) { logger.warn("Unable to get ReportingTask of type {}; its default properties will be fingerprinted instead of being ignored.", dto.getType()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index a683a9e..a27962a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -65,7 +65,6 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogRepositoryFactory; -import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.StandardProcessContext; @@ -465,7 +464,7 @@ public final class StandardProcessGroup implements ProcessGroup { private void shutdown(final ProcessGroup procGroup) { for (final ProcessorNode node : procGroup.getProcessors()) { - try (final NarCloseable x = NarCloseable.withComponentNarLoader(node.getProcessor().getClass(), node.getIdentifier())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), node.getProcessor().getClass(), node.getIdentifier())) { final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier()), () -> false); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor(), processContext); } @@ -903,7 +902,7 @@ public final class StandardProcessGroup implements ProcessGroup { conn.verifyCanDelete(); } - try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getProcessor().getClass(), processor.getIdentifier())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), processor.getProcessor().getClass(), processor.getIdentifier())) { final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier()), () -> false); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); } catch (final Exception e) { @@ -951,7 +950,7 @@ public final class StandardProcessGroup implements ProcessGroup { if (removed) { try { LogRepositoryFactory.removeRepository(processor.getIdentifier()); - ExtensionManager.removeInstanceClassLoader(id); + flowController.getExtensionManager().removeInstanceClassLoader(id); } catch (Throwable t) { } } @@ -2108,7 +2107,7 @@ public final class StandardProcessGroup implements ProcessGroup { service.verifyCanDelete(); - try (final NarCloseable x = NarCloseable.withComponentNarLoader(service.getControllerServiceImplementation().getClass(), service.getIdentifier())) { + try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), service.getControllerServiceImplementation().getClass(), service.getIdentifier())) { final ConfigurationContext configurationContext = new StandardConfigurationContext(service, controllerServiceProvider, null, variableRegistry); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, service.getControllerServiceImplementation(), configurationContext); } @@ -2149,7 +2148,7 @@ public final class StandardProcessGroup implements ProcessGroup { } finally { if (removed) { try { - ExtensionManager.removeInstanceClassLoader(service.getIdentifier()); + flowController.getExtensionManager().removeInstanceClassLoader(service.getIdentifier()); } catch (Throwable t) { } } @@ -3308,7 +3307,7 @@ public final class StandardProcessGroup implements ProcessGroup { try { verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); - final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); + final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager()); final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), true); final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); @@ -4348,7 +4347,7 @@ public final class StandardProcessGroup implements ProcessGroup { return null; } - final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(); + final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper(flowController.getExtensionManager()); final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, controllerServiceProvider, flowController.getFlowRegistryClient(), false); final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); @@ -4505,7 +4504,7 @@ public final class StandardProcessGroup implements ProcessGroup { final String processorToAddClass = processorToAdd.getType(); final BundleCoordinate processorToAddCoordinate = toCoordinate(processorToAdd.getBundle()); - final boolean bundleExists = ExtensionManager.getBundles(processorToAddClass).stream() + final boolean bundleExists = flowController.getExtensionManager().getBundles(processorToAddClass).stream() .anyMatch(b -> processorToAddCoordinate.equals(b.getBundleDetails().getCoordinate())); if (!bundleExists) { @@ -4525,7 +4524,7 @@ public final class StandardProcessGroup implements ProcessGroup { final String serviceToAddClass = serviceToAdd.getType(); final BundleCoordinate serviceToAddCoordinate = toCoordinate(serviceToAdd.getBundle()); - final boolean bundleExists = ExtensionManager.getBundles(serviceToAddClass).stream() + final boolean bundleExists = flowController.getExtensionManager().getBundles(serviceToAddClass).stream() .anyMatch(b -> serviceToAddCoordinate.equals(b.getBundleDetails().getCoordinate())); if (!bundleExists) { http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index 26c5224..e7e3ce8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -36,6 +36,7 @@ import org.apache.nifi.controller.serialization.FlowSynchronizationException; import org.apache.nifi.controller.serialization.FlowSynchronizer; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.file.FileUtils; import org.slf4j.Logger; @@ -47,10 +48,12 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD private final StringEncryptor encryptor; private final FlowConfigurationArchiveManager archiveManager; private final NiFiProperties nifiProperties; + private final ExtensionManager extensionManager; private static final Logger LOG = LoggerFactory.getLogger(StandardXMLFlowConfigurationDAO.class); - public StandardXMLFlowConfigurationDAO(final Path flowXml, final StringEncryptor encryptor, final NiFiProperties nifiProperties) throws IOException { + public StandardXMLFlowConfigurationDAO(final Path flowXml, final StringEncryptor encryptor, final NiFiProperties nifiProperties, + final ExtensionManager extensionManager) throws IOException { this.nifiProperties = nifiProperties; final File flowXmlFile = flowXml.toFile(); if (!flowXmlFile.exists()) { @@ -66,6 +69,7 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD this.flowXmlPath = flowXml; this.encryptor = encryptor; + this.extensionManager = extensionManager; this.archiveManager = new FlowConfigurationArchiveManager(flowXmlPath, nifiProperties); } @@ -80,7 +84,7 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD public synchronized void load(final FlowController controller, final DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException { - final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor, nifiProperties); + final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor, nifiProperties, extensionManager); controller.synchronize(flowSynchronizer, dataFlow); if (StandardFlowSynchronizer.isEmpty(dataFlow)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index 074302a..c45d960 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -76,14 +76,21 @@ import java.util.stream.Collectors; public class NiFiRegistryFlowMapper { + + private final ExtensionManager extensionManager; + // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when // we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned' // identifier based on the comopnent's actual id. We do connections last, so that all components will already have been // created before attempting to create the connection, where the ConnectableDTO is converted. private Map<String, String> versionedComponentIds = new HashMap<>(); + public NiFiRegistryFlowMapper(final ExtensionManager extensionManager) { + this.extensionManager = extensionManager; + } + public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final ControllerServiceProvider serviceProvider, final FlowRegistryClient registryClient, - final boolean mapDescendantVersionedFlows) { + final boolean mapDescendantVersionedFlows) { versionedComponentIds.clear(); final InstantiatedVersionedProcessGroup mapped = mapGroup(group, serviceProvider, registryClient, true, mapDescendantVersionedFlows); @@ -385,7 +392,7 @@ public class NiFiRegistryFlowMapper { final List<ControllerServiceAPI> serviceApis = new ArrayList<>(); for (final Class<?> serviceApiClass : serviceApiClasses) { - final BundleCoordinate bundleCoordinate = ExtensionManager.getBundle(serviceApiClass.getClassLoader()).getBundleDetails().getCoordinate(); + final BundleCoordinate bundleCoordinate = extensionManager.getBundle(serviceApiClass.getClassLoader()).getBundleDetails().getCoordinate(); final ControllerServiceAPI serviceApi = new ControllerServiceAPI(); serviceApi.setType(serviceApiClass.getName()); http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/ExtensionManagerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/ExtensionManagerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/ExtensionManagerFactoryBean.java new file mode 100644 index 0000000..5d8ba0f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/ExtensionManagerFactoryBean.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.spring; + +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.ExtensionManagerHolder; +import org.apache.nifi.nar.StandardExtensionDiscoveringManager; +import org.springframework.beans.factory.FactoryBean; + +/** + * Spring factory bean that returns the ExtensionManager instance from ExtensionManagerHolder. + * + * The ExtensionManagerHolder will be initialized before the Spring context starts. + */ +public class ExtensionManagerFactoryBean implements FactoryBean<ExtensionManager> { + + @Override + public ExtensionManager getObject() { + return ExtensionManagerHolder.getExtensionManager(); + } + + @Override + public Class<?> getObjectType() { + return StandardExtensionDiscoveringManager.class; + } + + @Override + public boolean isSingleton() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java index c7a7e7d..2124daf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java @@ -25,6 +25,7 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.leader.election.LeaderElectionManager; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.reporting.BulletinRepository; @@ -51,6 +52,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex private VariableRegistry variableRegistry; private LeaderElectionManager leaderElectionManager; private FlowRegistryClient flowRegistryClient; + private ExtensionManager extensionManager; @Override public Object getObject() throws Exception { @@ -72,7 +74,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex heartbeatMonitor, leaderElectionManager, variableRegistry, - flowRegistryClient); + flowRegistryClient, + extensionManager); } else { flowController = FlowController.createStandaloneInstance( flowFileEventRepository, @@ -82,7 +85,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex encryptor, bulletinRepository, variableRegistry, - flowRegistryClient); + flowRegistryClient, + extensionManager); } } @@ -142,4 +146,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex public void setFlowRegistryClient(final FlowRegistryClient flowRegistryClient) { this.flowRegistryClient = flowRegistryClient; } + + public void setExtensionManager(ExtensionManager extensionManager) { + this.extensionManager = extensionManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java index 5474a56..ab89c25 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java @@ -29,8 +29,8 @@ import java.util.stream.Collectors; * Utility class for Bundles. */ public final class BundleUtils { - private static BundleCoordinate findBundleForType(final String type, final BundleCoordinate desiredCoordinate) { - final List<Bundle> bundles = ExtensionManager.getBundles(type); + private static BundleCoordinate findBundleForType(final ExtensionManager extensionManager, final String type, final BundleCoordinate desiredCoordinate) { + final List<Bundle> bundles = extensionManager.getBundles(type); if (bundles.isEmpty()) { throw new IllegalStateException(String.format("%s is not known to this NiFi instance.", type)); } else if (bundles.size() > 1) { @@ -44,18 +44,19 @@ public final class BundleUtils { } } - private static BundleCoordinate findCompatibleBundle(final String type, final BundleDTO bundleDTO, final boolean allowCompatibleBundle) { + private static BundleCoordinate findCompatibleBundle(final ExtensionManager extensionManager, final String type, + final BundleDTO bundleDTO, final boolean allowCompatibleBundle) { final BundleCoordinate coordinate = new BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(), bundleDTO.getVersion()); - final Bundle bundle = ExtensionManager.getBundle(coordinate); + final Bundle bundle = extensionManager.getBundle(coordinate); if (bundle == null) { if (allowCompatibleBundle) { - return findBundleForType(type, coordinate); + return findBundleForType(extensionManager, type, coordinate); } else { throw new IllegalStateException(String.format("%s from %s is not known to this NiFi instance.", type, coordinate)); } } else { - final List<BundleCoordinate> bundlesForType = ExtensionManager.getBundles(type).stream().map(b -> b.getBundleDetails().getCoordinate()).collect(Collectors.toList()); + final List<BundleCoordinate> bundlesForType = extensionManager.getBundles(type).stream().map(b -> b.getBundleDetails().getCoordinate()).collect(Collectors.toList()); if (bundlesForType.contains(coordinate)) { return coordinate; } else { @@ -91,11 +92,11 @@ public final class BundleUtils { * @return the bundle coordinate * @throws IllegalStateException bundle not found */ - public static BundleCoordinate getBundle(final String type, final BundleDTO bundleDTO) { + public static BundleCoordinate getBundle(final ExtensionManager extensionManager, final String type, final BundleDTO bundleDTO) { if (bundleDTO == null) { - return findBundleForType(type, null); + return findBundleForType(extensionManager, type, null); } else { - return findCompatibleBundle(type, bundleDTO, false); + return findCompatibleBundle(extensionManager, type, bundleDTO, false); } } @@ -132,11 +133,11 @@ public final class BundleUtils { * @return the bundle coordinate * @throws IllegalStateException no compatible bundle found */ - public static BundleCoordinate getCompatibleBundle(final String type, final BundleDTO bundleDTO) { + public static BundleCoordinate getCompatibleBundle(final ExtensionManager extensionManager, final String type, final BundleDTO bundleDTO) { if (bundleDTO == null) { - return findBundleForType(type, null); + return findBundleForType(extensionManager, type, null); } else { - return findCompatibleBundle(type, bundleDTO, true); + return findCompatibleBundle(extensionManager, type, bundleDTO, true); } } @@ -147,10 +148,10 @@ public final class BundleUtils { * * @param versionedGroup the versioned group */ - public static void discoverCompatibleBundles(final VersionedProcessGroup versionedGroup) { + public static void discoverCompatibleBundles(final ExtensionManager extensionManager, final VersionedProcessGroup versionedGroup) { if (versionedGroup.getProcessors() != null) { versionedGroup.getProcessors().forEach(processor -> { - final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(processor.getType(), createBundleDto(processor.getBundle())); + final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(extensionManager, processor.getType(), createBundleDto(processor.getBundle())); final org.apache.nifi.registry.flow.Bundle bundle = new org.apache.nifi.registry.flow.Bundle(); bundle.setArtifact(coordinate.getId()); @@ -162,7 +163,7 @@ public final class BundleUtils { if (versionedGroup.getControllerServices() != null) { versionedGroup.getControllerServices().forEach(controllerService -> { - final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(controllerService.getType(), createBundleDto(controllerService.getBundle())); + final BundleCoordinate coordinate = BundleUtils.getCompatibleBundle(extensionManager, controllerService.getType(), createBundleDto(controllerService.getBundle())); final org.apache.nifi.registry.flow.Bundle bundle = new org.apache.nifi.registry.flow.Bundle(); bundle.setArtifact(coordinate.getId()); @@ -174,7 +175,7 @@ public final class BundleUtils { if (versionedGroup.getProcessGroups() != null) { versionedGroup.getProcessGroups().forEach(processGroup -> { - discoverCompatibleBundles(processGroup); + discoverCompatibleBundles(extensionManager, processGroup); }); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml index d9f89aa..2261ae8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml @@ -40,6 +40,10 @@ <property name="properties" ref="nifiProperties" /> </bean> + <!-- extension manager --> + <bean id="extensionManager" class="org.apache.nifi.spring.ExtensionManagerFactoryBean"> + </bean> + <!-- flow controller --> <bean id="flowController" class="org.apache.nifi.spring.FlowControllerFactoryBean"> <property name="properties" ref="nifiProperties"/> @@ -51,6 +55,7 @@ <property name="variableRegistry" ref="variableRegistry"/> <property name="leaderElectionManager" ref="leaderElectionManager" /> <property name="flowRegistryClient" ref="flowRegistryClient" /> + <property name="extensionManager" ref="extensionManager" /> </bean> <!-- flow service --> http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java index 4750746..df8a126 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java @@ -26,6 +26,8 @@ import org.apache.nifi.controller.serialization.ScheduledStateLookup; import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.VolatileBulletinRepository; +import org.apache.nifi.nar.ExtensionDiscoveringManager; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.flow.FlowRegistryClient; import org.apache.nifi.registry.variable.FileBasedVariableRegistry; @@ -69,6 +71,7 @@ public class StandardFlowServiceTest { private StringEncryptor mockEncryptor; private RevisionManager revisionManager; private VariableRegistry variableRegistry; + private ExtensionManager extensionManager; @BeforeClass public static void setupSuite() { @@ -86,8 +89,9 @@ public class StandardFlowServiceTest { authorizer = mock(Authorizer.class); mockAuditService = mock(AuditService.class); revisionManager = mock(RevisionManager.class); + extensionManager = mock(ExtensionDiscoveringManager.class); flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, authorizer, mockAuditService, mockEncryptor, - new VolatileBulletinRepository(), variableRegistry, mock(FlowRegistryClient.class)); + new VolatileBulletinRepository(), variableRegistry, mock(FlowRegistryClient.class), extensionManager); flowService = StandardFlowService.createStandaloneInstance(flowController, properties, mockEncryptor, revisionManager, authorizer); } http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index 8291634..5516547 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -16,34 +16,6 @@ */ package org.apache.nifi.controller; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.nifi.admin.service.AuditService; @@ -72,8 +44,9 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogRepository; import org.apache.nifi.logging.LogRepositoryFactory; -import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.ExtensionDiscoveringManager; import org.apache.nifi.nar.InstanceClassLoader; +import org.apache.nifi.nar.StandardExtensionDiscoveringManager; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.MockProvenanceRepository; @@ -96,6 +69,35 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestFlowController { private FlowController controller; @@ -108,6 +110,7 @@ public class TestFlowController { private BulletinRepository bulletinRepo; private VariableRegistry variableRegistry; private volatile String propsFile = "src/test/resources/flowcontrollertest.nifi.properties"; + private ExtensionDiscoveringManager extensionManager; /** * Utility method which accepts {@link NiFiProperties} object but calls {@link StringEncryptor#createEncryptor(String, String, String)} with extracted properties. @@ -136,7 +139,8 @@ public class TestFlowController { // use the system bundle systemBundle = SystemBundle.create(nifiProperties); - ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + extensionManager = new StandardExtensionDiscoveringManager(); + extensionManager.discoverExtensions(systemBundle, Collections.emptySet()); User user1 = new User.Builder().identifier("user-id-1").identity("user-1").build(); User user2 = new User.Builder().identifier("user-id-2").identity("user-2").build(); @@ -179,7 +183,7 @@ public class TestFlowController { bulletinRepo = Mockito.mock(BulletinRepository.class); controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, - auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class)); + auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class), extensionManager); } @After @@ -190,7 +194,8 @@ public class TestFlowController { @Test public void testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() throws IOException { - final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(createEncryptorFromProperties(nifiProperties), nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer( + createEncryptorFromProperties(nifiProperties), nifiProperties, extensionManager); // create a mock proposed data flow with the same auth fingerprint as the current authorizer final String authFingerprint = authorizer.getFingerprint(); @@ -253,7 +258,8 @@ public class TestFlowController { @Test public void testSynchronizeFlowWithProcessorReferencingControllerService() throws IOException { - final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(createEncryptorFromProperties(nifiProperties), nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer( + createEncryptorFromProperties(nifiProperties), nifiProperties, extensionManager); // create a mock proposed data flow with the same auth fingerprint as the current authorizer final String authFingerprint = authorizer.getFingerprint(); @@ -292,7 +298,8 @@ public class TestFlowController { @Test public void testSynchronizeFlowWhenAuthorizationsAreEqual() { - final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(createEncryptorFromProperties(nifiProperties), nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer( + createEncryptorFromProperties(nifiProperties), nifiProperties, extensionManager); // create a mock proposed data flow with the same auth fingerprint as the current authorizer final String authFingerprint = authorizer.getFingerprint(); @@ -306,7 +313,8 @@ public class TestFlowController { @Test(expected = UninheritableFlowException.class) public void testSynchronizeFlowWhenAuthorizationsAreDifferent() { - final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(createEncryptorFromProperties(nifiProperties), nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer( + createEncryptorFromProperties(nifiProperties), nifiProperties, extensionManager); // create a mock proposed data flow with different auth fingerprint as the current authorizer final String authFingerprint = "<authorizations></authorizations>"; @@ -319,7 +327,8 @@ public class TestFlowController { @Test(expected = UninheritableFlowException.class) public void testSynchronizeFlowWhenProposedAuthorizationsAreNull() { - final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(createEncryptorFromProperties(nifiProperties), nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer( + createEncryptorFromProperties(nifiProperties), nifiProperties, extensionManager); final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(null); @@ -329,7 +338,8 @@ public class TestFlowController { @Test public void testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() { - final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(createEncryptorFromProperties(nifiProperties), nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer( + createEncryptorFromProperties(nifiProperties), nifiProperties, extensionManager); // create a mock proposed data flow with the same auth fingerprint as the current authorizer final String authFingerprint = authorizer.getFingerprint(); @@ -341,14 +351,15 @@ public class TestFlowController { controller.shutdown(true); controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, - auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class)); + auditService, encryptor, bulletinRepo, variableRegistry, Mockito.mock(FlowRegistryClient.class), extensionManager); controller.synchronize(standardFlowSynchronizer, proposedDataFlow); assertEquals(authFingerprint, authorizer.getFingerprint()); } @Test public void testSynchronizeFlowWhenProposedMissingComponentsAreDifferent() { - final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(createEncryptorFromProperties(nifiProperties), nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer( + createEncryptorFromProperties(nifiProperties), nifiProperties, extensionManager); final Set<String> missingComponents = new HashSet<>(); missingComponents.add("1"); @@ -368,7 +379,7 @@ public class TestFlowController { @Test public void testSynchronizeFlowWhenExistingMissingComponentsAreDifferent() throws IOException { final StringEncryptor stringEncryptor = createEncryptorFromProperties(nifiProperties); - final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(stringEncryptor, nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(stringEncryptor, nifiProperties, extensionManager); final ProcessorNode mockProcessorNode = mock(ProcessorNode.class); when(mockProcessorNode.getIdentifier()).thenReturn("1"); @@ -408,7 +419,8 @@ public class TestFlowController { @Test public void testSynchronizeFlowWhenBundlesAreSame() throws IOException { - final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(createEncryptorFromProperties(nifiProperties), nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer( + createEncryptorFromProperties(nifiProperties), nifiProperties, extensionManager); final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df"); logRepository.removeAllObservers(); @@ -419,7 +431,8 @@ public class TestFlowController { @Test public void testSynchronizeFlowWhenBundlesAreDifferent() throws IOException { - final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(createEncryptorFromProperties(nifiProperties), nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer( + createEncryptorFromProperties(nifiProperties), nifiProperties, extensionManager); final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df"); logRepository.removeAllObservers(); @@ -615,7 +628,7 @@ public class TestFlowController { final String originalName = processorNode.getName(); // the instance class loader shouldn't have any of the resources yet - InstanceClassLoader instanceClassLoader = ExtensionManager.getInstanceClassLoader(id); + InstanceClassLoader instanceClassLoader = extensionManager.getInstanceClassLoader(id); assertNotNull(instanceClassLoader); assertFalse(containsResource(instanceClassLoader.getURLs(), resource1)); assertFalse(containsResource(instanceClassLoader.getURLs(), resource2)); @@ -626,7 +639,7 @@ public class TestFlowController { controller.reload(processorNode, DummySettingsProcessor.class.getName(), coordinate, additionalUrls); // the instance class loader shouldn't have any of the resources yet - instanceClassLoader = ExtensionManager.getInstanceClassLoader(id); + instanceClassLoader = extensionManager.getInstanceClassLoader(id); assertNotNull(instanceClassLoader); assertTrue(containsResource(instanceClassLoader.getURLs(), resource1)); assertTrue(containsResource(instanceClassLoader.getURLs(), resource2)); @@ -676,7 +689,7 @@ public class TestFlowController { final ControllerServiceNode controllerServiceNode = controller.createControllerService(ServiceA.class.getName(), id, coordinate, null, true); // the instance class loader shouldn't have any of the resources yet - URLClassLoader instanceClassLoader = ExtensionManager.getInstanceClassLoader(id); + URLClassLoader instanceClassLoader = extensionManager.getInstanceClassLoader(id); assertNotNull(instanceClassLoader); assertFalse(containsResource(instanceClassLoader.getURLs(), resource1)); assertFalse(containsResource(instanceClassLoader.getURLs(), resource2)); @@ -687,7 +700,7 @@ public class TestFlowController { controller.reload(controllerServiceNode, ServiceB.class.getName(), coordinate, additionalUrls); // the instance class loader shouldn't have any of the resources yet - instanceClassLoader = ExtensionManager.getInstanceClassLoader(id); + instanceClassLoader = extensionManager.getInstanceClassLoader(id); assertNotNull(instanceClassLoader); assertTrue(containsResource(instanceClassLoader.getURLs(), resource1)); assertTrue(containsResource(instanceClassLoader.getURLs(), resource2)); @@ -738,7 +751,7 @@ public class TestFlowController { final ReportingTaskNode node = controller.createReportingTask(DummyReportingTask.class.getName(), id, coordinate, true); // the instance class loader shouldn't have any of the resources yet - InstanceClassLoader instanceClassLoader = ExtensionManager.getInstanceClassLoader(id); + InstanceClassLoader instanceClassLoader = extensionManager.getInstanceClassLoader(id); assertNotNull(instanceClassLoader); assertFalse(containsResource(instanceClassLoader.getURLs(), resource1)); assertFalse(containsResource(instanceClassLoader.getURLs(), resource2)); @@ -748,7 +761,7 @@ public class TestFlowController { controller.reload(node, DummyScheduledReportingTask.class.getName(), coordinate, additionalUrls); // the instance class loader shouldn't have any of the resources yet - instanceClassLoader = ExtensionManager.getInstanceClassLoader(id); + instanceClassLoader = extensionManager.getInstanceClassLoader(id); assertNotNull(instanceClassLoader); assertTrue(containsResource(instanceClassLoader.getURLs(), resource1)); assertTrue(containsResource(instanceClassLoader.getURLs(), resource2)); http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java index eabc899..d2be387 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java @@ -17,26 +17,6 @@ package org.apache.nifi.controller; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; - import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; @@ -51,8 +31,9 @@ import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.expression.ExpressionLanguageCompiler; -import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.ExtensionDiscoveringManager; import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.nar.StandardExtensionDiscoveringManager; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -77,13 +58,41 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class TestStandardProcessorNode { private MockVariableRegistry variableRegistry; + private Bundle systemBundle; + private ExtensionDiscoveringManager extensionManager; + private NiFiProperties niFiProperties; @Before public void setup() { variableRegistry = new MockVariableRegistry(); + niFiProperties = NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", null); + + systemBundle = SystemBundle.create(niFiProperties); + extensionManager = new StandardExtensionDiscoveringManager(); + extensionManager.discoverExtensions(systemBundle, Collections.emptySet()); } @Test(timeout = 10000) @@ -99,7 +108,7 @@ public class TestStandardProcessorNode { final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, coordinate, null); final StandardProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, createValidationContextFactory(), null, null, - NiFiProperties.createBasicNiFiProperties(null, null), new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, new SynchronousValidationTrigger()); + niFiProperties, new StandardComponentVariableRegistry(VariableRegistry.EMPTY_REGISTRY), reloadComponent, extensionManager, new SynchronousValidationTrigger()); final ScheduledExecutorService taskScheduler = new FlowEngine(1, "TestClasspathResources", true); final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, () -> false); @@ -138,7 +147,7 @@ public class TestStandardProcessorNode { final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp)); final StandardProcessorNode procNode = createProcessorNode(processor, reloadComponent); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getIdentifier())){ // Should not have any of the test resources loaded at this point final URL[] testResources = getTestResources(); @@ -163,7 +172,7 @@ public class TestStandardProcessorNode { // Should pass validation assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); } finally { - ExtensionManager.removeInstanceClassLoader(procNode.getIdentifier()); + extensionManager.removeInstanceClassLoader(procNode.getIdentifier()); } } @@ -180,7 +189,7 @@ public class TestStandardProcessorNode { final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp, otherProp)); final StandardProcessorNode procNode = createProcessorNode(processor, reloadComponent); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getIdentifier())){ // Should not have any of the test resources loaded at this point final URL[] testResources = getTestResources(); for (URL testResource : testResources) { @@ -230,7 +239,7 @@ public class TestStandardProcessorNode { // Should STILL pass validation assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); } finally { - ExtensionManager.removeInstanceClassLoader(procNode.getIdentifier()); + extensionManager.removeInstanceClassLoader(procNode.getIdentifier()); } } @@ -246,7 +255,7 @@ public class TestStandardProcessorNode { final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp1, classpathProp2)); final StandardProcessorNode procNode = createProcessorNode(processor, reloadComponent); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getIdentifier())){ // Should not have any of the test resources loaded at this point final URL[] testResources = getTestResources(); @@ -275,7 +284,7 @@ public class TestStandardProcessorNode { // Should pass validation assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); } finally { - ExtensionManager.removeInstanceClassLoader(procNode.getIdentifier()); + extensionManager.removeInstanceClassLoader(procNode.getIdentifier()); } } @@ -291,7 +300,7 @@ public class TestStandardProcessorNode { final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp1, classpathProp2)); final StandardProcessorNode procNode = createProcessorNode(processor, reloadComponent); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getIdentifier())){ // Should not have any of the test resources loaded at this point final URL[] testResources = getTestResources(); @@ -317,7 +326,7 @@ public class TestStandardProcessorNode { // Should pass validation assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); } finally { - ExtensionManager.removeInstanceClassLoader(procNode.getIdentifier()); + extensionManager.removeInstanceClassLoader(procNode.getIdentifier()); } } @@ -327,7 +336,7 @@ public class TestStandardProcessorNode { final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor(); final StandardProcessorNode procNode = createProcessorNode(processor, reloadComponent); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getIdentifier())){ final Map<String, String> properties = new HashMap<>(); properties.put(ModifiesClasspathNoAnnotationProcessor.CLASSPATH_RESOURCE.getName(), @@ -344,7 +353,7 @@ public class TestStandardProcessorNode { // Should pass validation assertTrue(procNode.computeValidationErrors(procNode.getValidationContext()).isEmpty()); } finally { - ExtensionManager.removeInstanceClassLoader(procNode.getIdentifier()); + extensionManager.removeInstanceClassLoader(procNode.getIdentifier()); } } @@ -387,20 +396,18 @@ public class TestStandardProcessorNode { private StandardProcessorNode createProcessorNode(final Processor processor, final ReloadComponent reloadComponent) { final String uuid = UUID.randomUUID().toString(); final ValidationContextFactory validationContextFactory = createValidationContextFactory(); - final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", null); final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); final TerminationAwareLogger componentLog = Mockito.mock(TerminationAwareLogger.class); - final Bundle systemBundle = SystemBundle.create(niFiProperties); - ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); - ExtensionManager.createInstanceClassLoader(processor.getClass().getName(), uuid, systemBundle, null); + extensionManager.createInstanceClassLoader(processor.getClass().getName(), uuid, systemBundle, null); ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(uuid, componentLog, null, null, null); processor.initialize(initContext); final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, systemBundle.getBundleDetails().getCoordinate(), componentLog); return new StandardProcessorNode(loggableComponent, uuid, validationContextFactory, processScheduler, - null, niFiProperties, new StandardComponentVariableRegistry(variableRegistry), reloadComponent, new SynchronousValidationTrigger()); + null, niFiProperties, new StandardComponentVariableRegistry(variableRegistry), reloadComponent, extensionManager, + new SynchronousValidationTrigger()); } private static class MockReloadComponent implements ReloadComponent { http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java index f4606cc..e7f3714 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java @@ -16,14 +16,6 @@ */ package org.apache.nifi.controller.reporting; -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; @@ -40,7 +32,8 @@ import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.encrypt.StringEncryptor; -import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.ExtensionDiscoveringManager; +import org.apache.nifi.nar.StandardExtensionDiscoveringManager; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; @@ -54,6 +47,15 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + public class TestStandardReportingContext { private static final String DEFAULT_SENSITIVE_PROPS_KEY = "nififtw!"; @@ -64,6 +66,7 @@ public class TestStandardReportingContext { private StringEncryptor encryptor; private NiFiProperties nifiProperties; private Bundle systemBundle; + private ExtensionDiscoveringManager extensionManager; private BulletinRepository bulletinRepo; private VariableRegistry variableRegistry; private FlowRegistryClient flowRegistry; @@ -89,7 +92,8 @@ public class TestStandardReportingContext { // use the system bundle systemBundle = SystemBundle.create(nifiProperties); - ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + extensionManager = new StandardExtensionDiscoveringManager(); + extensionManager.discoverExtensions(systemBundle, Collections.emptySet()); User user1 = new User.Builder().identifier("user-id-1").identity("user-1").build(); User user2 = new User.Builder().identifier("user-id-2").identity("user-2").build(); @@ -132,7 +136,8 @@ public class TestStandardReportingContext { flowRegistry = Mockito.mock(FlowRegistryClient.class); bulletinRepo = Mockito.mock(BulletinRepository.class); - controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry, flowRegistry); + controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, + bulletinRepo, variableRegistry, flowRegistry, extensionManager); } @After http://git-wip-us.apache.org/repos/asf/nifi/blob/fdd8cdbb/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java index b8fed54..fa164fc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/ProcessorLifecycleIT.java @@ -37,7 +37,8 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.events.VolatileBulletinRepository; import org.apache.nifi.groups.ProcessGroup; -import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.ExtensionDiscoveringManager; +import org.apache.nifi.nar.StandardExtensionDiscoveringManager; import org.apache.nifi.nar.SystemBundle; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -654,12 +655,13 @@ public class ProcessorLifecycleIT { final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(propsFile, addProps); final Bundle systemBundle = SystemBundle.create(nifiProperties); - ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + final ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager(); + extensionManager.discoverExtensions(systemBundle, Collections.emptySet()); final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties, mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(), new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()), - mock(FlowRegistryClient.class)); + mock(FlowRegistryClient.class), extensionManager); return new FlowControllerAndSystemBundle(flowController, systemBundle); }