http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java new file mode 100644 index 0000000..fed09af --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInvocationHandler.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.nar.NarCloseable; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +public class StandardControllerServiceInvocationHandler implements ControllerServiceInvocationHandler { + + private static final Set<Method> validDisabledMethods; + static { + // methods that are okay to be called when the service is disabled. + final Set<Method> validMethods = new HashSet<>(); + for (final Method method : ControllerService.class.getMethods()) { + validMethods.add(method); + } + for (final Method method : Object.class.getMethods()) { + validMethods.add(method); + } + validDisabledMethods = Collections.unmodifiableSet(validMethods); + } + + private final ControllerService originalService; + private final AtomicReference<ControllerServiceNode> serviceNodeHolder = new AtomicReference<>(null); + + /** + * @param originalService the original service being proxied + */ + public StandardControllerServiceInvocationHandler(final ControllerService originalService) { + this(originalService, null); + } + + /** + * @param originalService the original service being proxied + * @param serviceNode the node holding the original service which will be used for checking the state (disabled vs running) + */ + public StandardControllerServiceInvocationHandler(final ControllerService originalService, final ControllerServiceNode serviceNode) { + this.originalService = originalService; + this.serviceNodeHolder.set(serviceNode); + } + + public void setServiceNode(final ControllerServiceNode serviceNode) { + this.serviceNodeHolder.set(serviceNode); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + final String methodName = method.getName(); + if ("initialize".equals(methodName) || "onPropertyModified".equals(methodName)) { + throw new UnsupportedOperationException(method + " may only be invoked by the NiFi framework"); + } + + final ControllerServiceNode node = serviceNodeHolder.get(); + final ControllerServiceState state = node.getState(); + final boolean disabled = state != ControllerServiceState.ENABLED; // only allow method call if service state is ENABLED. + if (disabled && !validDisabledMethods.contains(method)) { + // Use nar class loader here because we are implicitly calling toString() on the original implementation. + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { + throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService.getIdentifier() + + " because the Controller Service is disabled"); + } catch (final Throwable e) { + throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service with identifier " + + originalService.getIdentifier() + " because the Controller Service is disabled"); + } + } + + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { + return method.invoke(originalService, args); + } catch (final InvocationTargetException e) { + // If the ControllerService throws an Exception, it'll be wrapped in an InvocationTargetException. We want + // to instead re-throw what the ControllerService threw, so we pull it out of the InvocationTargetException. + throw e.getCause(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 0a64253..a543040 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -24,12 +24,15 @@ import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.LoggableComponent; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; @@ -60,8 +63,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class); - private final ControllerService proxedControllerService; - private final ControllerService implementation; + private final AtomicReference<ControllerServiceDetails> controllerServiceHolder = new AtomicReference<>(null); private final ControllerServiceProvider serviceProvider; private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED); @@ -75,28 +77,42 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i private final AtomicBoolean active; - public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, - final VariableRegistry variableRegistry, final ComponentLog logger) { + public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService, + final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory, + final ControllerServiceProvider serviceProvider, final VariableRegistry variableRegistry) { - this(proxiedControllerService, implementation, id, validationContextFactory, serviceProvider, - implementation.getClass().getSimpleName(), implementation.getClass().getCanonicalName(), variableRegistry, logger); + this(implementation, proxiedControllerService, invocationHandler, id, validationContextFactory, serviceProvider, + implementation.getComponent().getClass().getSimpleName(), implementation.getComponent().getClass().getCanonicalName(), variableRegistry, false); } - public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, - final String componentType, final String componentCanonicalClass, final VariableRegistry variableRegistry, - final ComponentLog logger) { + public StandardControllerServiceNode(final LoggableComponent<ControllerService> implementation, final LoggableComponent<ControllerService> proxiedControllerService, + final ControllerServiceInvocationHandler invocationHandler, final String id, final ValidationContextFactory validationContextFactory, + final ControllerServiceProvider serviceProvider, final String componentType, final String componentCanonicalClass, + final VariableRegistry variableRegistry, final boolean isExtensionMissing) { - super(implementation, id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, logger); - this.proxedControllerService = proxiedControllerService; - this.implementation = implementation; + super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, isExtensionMissing); this.serviceProvider = serviceProvider; this.active = new AtomicBoolean(); + setControllerServiceAndProxy(implementation, proxiedControllerService, invocationHandler); } @Override + public ConfigurableComponent getComponent() { + return controllerServiceHolder.get().getImplementation(); + } + + @Override + public ComponentLog getLogger() { + return controllerServiceHolder.get().getComponentLog(); + } + + @Override + public BundleCoordinate getBundleCoordinate() { + return controllerServiceHolder.get().getBundleCoordinate(); + } + + @Override public Authorizable getParentAuthorizable() { final ProcessGroup processGroup = getProcessGroup(); if (processGroup == null) { @@ -127,13 +143,32 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } @Override + public ControllerService getControllerServiceImplementation() { + return controllerServiceHolder.get().getImplementation(); + } + + @Override public ControllerService getProxiedControllerService() { - return proxedControllerService; + return controllerServiceHolder.get().getProxiedControllerService(); } @Override - public ControllerService getControllerServiceImplementation() { - return implementation; + public ControllerServiceInvocationHandler getInvocationHandler() { + return controllerServiceHolder.get().getInvocationHandler(); + } + + @Override + public void setControllerServiceAndProxy(final LoggableComponent<ControllerService> implementation, + final LoggableComponent<ControllerService> proxiedControllerService, + final ControllerServiceInvocationHandler invocationHandler) { + synchronized (this.active) { + if (isActive()) { + throw new IllegalStateException("Cannot modify Controller Service configuration while service is active"); + } + + final ControllerServiceDetails controllerServiceDetails = new ControllerServiceDetails(implementation, proxiedControllerService, invocationHandler); + this.controllerServiceHolder.set(controllerServiceDetails); + } } @Override @@ -211,7 +246,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void verifyCanDelete() { if (getState() != ControllerServiceState.DISABLED) { - throw new IllegalStateException("Controller Service " + implementation.getIdentifier() + " cannot be deleted because it is not disabled"); + throw new IllegalStateException("Controller Service " + getControllerServiceImplementation().getIdentifier() + " cannot be deleted because it is not disabled"); } } @@ -236,7 +271,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } if (!activeReferencesIdentifiers.isEmpty()) { - throw new IllegalStateException(implementation.getIdentifier() + " cannot be disabled because it is referenced by " + activeReferencesIdentifiers.size() + + throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be disabled because it is referenced by " + activeReferencesIdentifiers.size() + " components that are currently running: [" + StringUtils.join(activeReferencesIdentifiers, ", ") + "]"); } } @@ -244,18 +279,18 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void verifyCanEnable() { if (getState() != ControllerServiceState.DISABLED) { - throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not disabled"); + throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled"); } if (!isValid()) { - throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not valid: " + getValidationErrors()); + throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not valid: " + getValidationErrors()); } } @Override public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) { if (getState() != ControllerServiceState.DISABLED) { - throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not disabled"); + throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled"); } final Set<String> ids = new HashSet<>(); @@ -266,7 +301,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i final Collection<ValidationResult> validationResults = getValidationErrors(ids); for (final ValidationResult result : validationResults) { if (!result.isValid()) { - throw new IllegalStateException(implementation.getIdentifier() + " cannot be enabled because it is not valid: " + result); + throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not valid: " + result); } } } @@ -274,7 +309,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void verifyCanUpdate() { if (getState() != ControllerServiceState.DISABLED) { - throw new IllegalStateException(implementation.getIdentifier() + " cannot be updated because it is not disabled"); + throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be updated because it is not disabled"); } } @@ -335,7 +370,10 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) { if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) { - this.active.set(true); + synchronized (active) { + this.active.set(true); + } + final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null, getVariableRegistry()); scheduler.execute(new Runnable() { @Override @@ -447,4 +485,5 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } return results != null ? results : Collections.emptySet(); } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index e4937df..cadc8e7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -16,24 +16,11 @@ */ package org.apache.nifi.controller.service; -import static java.util.Objects.requireNonNull; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - +import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnAdded; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.StateManager; @@ -41,6 +28,7 @@ import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.LoggableComponent; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; @@ -56,7 +44,6 @@ import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.registry.VariableRegistry; - import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.NiFiProperties; @@ -64,30 +51,30 @@ import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + public class StandardControllerServiceProvider implements ControllerServiceProvider { private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class); private final ProcessScheduler processScheduler; - private static final Set<Method> validDisabledMethods; private final BulletinRepository bulletinRepo; private final StateManagerProvider stateManagerProvider; private final VariableRegistry variableRegistry; private final FlowController flowController; private final NiFiProperties nifiProperties; - static { - // methods that are okay to be called when the service is disabled. - final Set<Method> validMethods = new HashSet<>(); - for (final Method method : ControllerService.class.getMethods()) { - validMethods.add(method); - } - for (final Method method : Object.class.getMethods()) { - validMethods.add(method); - } - validDisabledMethods = Collections.unmodifiableSet(validMethods); - } - public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) { @@ -99,108 +86,66 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi this.nifiProperties = nifiProperties; } - private Class<?>[] getInterfaces(final Class<?> cls) { - final List<Class<?>> allIfcs = new ArrayList<>(); - populateInterfaces(cls, allIfcs); - return allIfcs.toArray(new Class<?>[allIfcs.size()]); - } - - private void populateInterfaces(final Class<?> cls, final List<Class<?>> interfacesDefinedThusFar) { - final Class<?>[] ifc = cls.getInterfaces(); - if (ifc != null && ifc.length > 0) { - for (final Class<?> i : ifc) { - interfacesDefinedThusFar.add(i); - } - } - - final Class<?> superClass = cls.getSuperclass(); - if (superClass != null) { - populateInterfaces(superClass, interfacesDefinedThusFar); - } - } - private StateManager getStateManager(final String componentId) { return stateManagerProvider.getStateManager(componentId); } @Override - public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { - if (type == null || id == null) { + public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) { + if (type == null || id == null || bundleCoordinate == null) { throw new NullPointerException(); } + ClassLoader cl = null; final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader(); try { - final ClassLoader cl = ExtensionManager.getClassLoader(type, id); final Class<?> rawClass; - try { - if (cl == null) { - rawClass = Class.forName(type); - } else { - Thread.currentThread().setContextClassLoader(cl); - rawClass = Class.forName(type, false, cl); + final Bundle csBundle = ExtensionManager.getBundle(bundleCoordinate); + if (csBundle == null) { + throw new ControllerServiceInstantiationException("Unable to find bundle for coordinate " + bundleCoordinate.getCoordinate()); } + + cl = ExtensionManager.createInstanceClassLoader(type, id, csBundle); + Thread.currentThread().setContextClassLoader(cl); + rawClass = Class.forName(type, false, cl); } catch (final Exception e) { logger.error("Could not create Controller Service of type " + type + " for ID " + id + "; creating \"Ghost\" implementation", e); Thread.currentThread().setContextClassLoader(currentContextClassLoader); - return createGhostControllerService(type, id); + return createGhostControllerService(type, id, bundleCoordinate); } final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class); final ControllerService originalService = controllerServiceClass.newInstance(); - final AtomicReference<ControllerServiceNode> serviceNodeHolder = new AtomicReference<>(null); - final InvocationHandler invocationHandler = new InvocationHandler() { - @Override - public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { - - final String methodName = method.getName(); - if ("initialize".equals(methodName) || "onPropertyModified".equals(methodName)) { - throw new UnsupportedOperationException(method + " may only be invoked by the NiFi framework"); - } - - final ControllerServiceNode node = serviceNodeHolder.get(); - final ControllerServiceState state = node.getState(); - final boolean disabled = state != ControllerServiceState.ENABLED; // only allow method call if service state is ENABLED. - if (disabled && !validDisabledMethods.contains(method)) { - // Use nar class loader here because we are implicitly calling toString() on the original implementation. - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { - throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService.getIdentifier() - + " because the Controller Service is disabled"); - } catch (final Throwable e) { - throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service with identifier " + id + " because the Controller Service is disabled"); - } - } + final StandardControllerServiceInvocationHandler invocationHandler = new StandardControllerServiceInvocationHandler(originalService); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { - return method.invoke(originalService, args); - } catch (final InvocationTargetException e) { - // If the ControllerService throws an Exception, it'll be wrapped in an InvocationTargetException. We want - // to instead re-throw what the ControllerService threw, so we pull it out of the InvocationTargetException. - throw e.getCause(); - } - } - }; + // extract all interfaces... controllerServiceClass is non null so getAllInterfaces is non null + final List<Class<?>> interfaceList = ClassUtils.getAllInterfaces(controllerServiceClass); + final Class<?>[] interfaces = interfaceList.toArray(new Class<?>[interfaceList.size()]); final ControllerService proxiedService; if (cl == null) { - proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), getInterfaces(controllerServiceClass), invocationHandler); + proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), interfaces, invocationHandler); } else { - proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler); + proxiedService = (ControllerService) Proxy.newProxyInstance(cl, interfaces, invocationHandler); } logger.info("Created Controller Service of type {} with identifier {}", type, id); final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService); originalService.initialize(new StandardControllerServiceInitializationContext(id, serviceLogger, this, getStateManager(id), nifiProperties)); - final ComponentLog logger = new SimpleProcessLogger(id, originalService); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this, variableRegistry); - final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this, variableRegistry, logger); - serviceNodeHolder.set(serviceNode); + final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(originalService, bundleCoordinate, serviceLogger); + final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, serviceLogger); + + final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler, + id, validationContextFactory, this, variableRegistry); serviceNode.setName(rawClass.getSimpleName()); + invocationHandler.setServiceNode(serviceNode); + if (firstTimeAdded) { try (final NarCloseable x = NarCloseable.withComponentNarLoader(originalService.getClass(), originalService.getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService); @@ -219,8 +164,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } - private ControllerServiceNode createGhostControllerService(final String type, final String id) { - final InvocationHandler invocationHandler = new InvocationHandler() { + private ControllerServiceNode createGhostControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate) { + final ControllerServiceInvocationHandler invocationHandler = new ControllerServiceInvocationHandler() { @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { final String methodName = method.getName(); @@ -257,6 +202,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi throw new IllegalStateException("Controller Service could not be created because the Controller Service Type (" + type + ") could not be found"); } } + @Override + public void setServiceNode(ControllerServiceNode serviceNode) { + // nothing to do + } }; final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), @@ -265,10 +214,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - final ComponentLog logger = new SimpleProcessLogger(id, proxiedService); + final LoggableComponent<ControllerService> proxiedLoggableComponent = new LoggableComponent<>(proxiedService, bundleCoordinate, null); - final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, proxiedService, id, - new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry, logger); + final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, id, + new StandardValidationContextFactory(this, variableRegistry), this, componentType, type, variableRegistry, true); return serviceNode; } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java index c9e7b8e..d86a120 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java @@ -26,16 +26,19 @@ import javax.net.ssl.SSLContext; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.logging.ComponentLog; public class StandardStateProviderInitializationContext implements StateProviderInitializationContext { private final String id; private final Map<PropertyDescriptor, PropertyValue> properties; private final SSLContext sslContext; + private final ComponentLog logger; - public StandardStateProviderInitializationContext(final String identifier, final Map<PropertyDescriptor, PropertyValue> properties, final SSLContext sslContext) { + public StandardStateProviderInitializationContext(final String identifier, final Map<PropertyDescriptor, PropertyValue> properties, final SSLContext sslContext, final ComponentLog logger) { this.id = identifier; this.properties = new HashMap<>(properties); this.sslContext = sslContext; + this.logger = logger; } @Override @@ -57,4 +60,9 @@ public class StandardStateProviderInitializationContext implements StateProvider public SSLContext getSSLContext() { return sslContext; } + + @Override + public ComponentLog getLogger() { + return logger; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java index 21368e8..d63ae00 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -29,6 +30,7 @@ import javax.net.ssl.SSLContext; import org.apache.commons.lang3.ArrayUtils; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.bundle.Bundle; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -44,7 +46,9 @@ import org.apache.nifi.controller.state.StandardStateProviderInitializationConte import org.apache.nifi.controller.state.config.StateManagerConfiguration; import org.apache.nifi.controller.state.config.StateProviderConfiguration; import org.apache.nifi.framework.security.util.SslContextFactory; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardValidationContext; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.NiFiProperties; @@ -183,7 +187,8 @@ public class StandardStateManagerProvider implements StateManagerProvider{ } final SSLContext sslContext = SslContextFactory.createSslContext(properties, false); - final StateProviderInitializationContext initContext = new StandardStateProviderInitializationContext(providerId, propertyMap, sslContext); + final ComponentLog logger = new SimpleProcessLogger(providerId, provider); + final StateProviderInitializationContext initContext = new StandardStateProviderInitializationContext(providerId, propertyMap, sslContext, logger); synchronized (provider) { provider.initialize(initContext); @@ -213,15 +218,17 @@ public class StandardStateManagerProvider implements StateManagerProvider{ private static StateProvider instantiateStateProvider(final String type) throws ClassNotFoundException, InstantiationException, IllegalAccessException { final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); try { - final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type); - final Class<?> rawClass; - if (detectedClassLoaderForType == null) { - // try to find from the current class loader - rawClass = Class.forName(type); - } else { - // try to find from the registered classloader for that type - rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type)); + final List<Bundle> bundles = ExtensionManager.getBundles(type); + if (bundles.size() == 0) { + throw new IllegalStateException(String.format("The specified class '%s' is not known to this nifi.", type)); } + if (bundles.size() > 1) { + throw new IllegalStateException(String.format("Multiple bundles found for the specified class '%s', only one is allowed.", type)); + } + + final Bundle bundle = bundles.get(0); + final ClassLoader detectedClassLoaderForType = bundle.getClassLoader(); + final Class<?> rawClass = Class.forName(type, true, detectedClassLoaderForType); Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); final Class<? extends StateProvider> mgrClass = rawClass.asSubclass(StateProvider.class); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 0e01349..8d56140 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -17,6 +17,7 @@ package org.apache.nifi.fingerprint; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerService; @@ -27,7 +28,9 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.processor.Processor; import org.apache.nifi.reporting.ReportingTask; +import org.apache.nifi.util.BundleUtils; import org.apache.nifi.util.DomUtils; +import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.slf4j.Logger; @@ -337,13 +340,18 @@ public class FingerprintFactory { // annotation data appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "annotationData")); + // get the bundle details if possible + final BundleDTO bundle = FlowFromDOMFactory.getBundle(DomUtils.getChild(processorElem, "bundle")); + addBundleFingerprint(builder, bundle); + // create an instance of the Processor so that we know the default property values Processor processor = null; try { if (controller != null) { - processor = controller.createProcessor(className, UUID.randomUUID().toString(), false).getProcessor(); + final BundleCoordinate coordinate = getCoordinate(className, bundle); + processor = controller.createProcessor(className, UUID.randomUUID().toString(), coordinate, false).getProcessor(); } - } catch (ProcessorInstantiationException e) { + } catch (ProcessorInstantiationException | IllegalStateException e) { logger.warn("Unable to create Processor of type {} due to {}; its default properties will be fingerprinted instead of being ignored.", className, e.toString()); if (logger.isDebugEnabled()) { logger.warn("", e); @@ -565,6 +573,9 @@ public class FingerprintFactory { builder.append(dto.getId()); builder.append(dto.getType()); builder.append(dto.getName()); + + addBundleFingerprint(builder, dto.getBundle()); + builder.append(dto.getComments()); builder.append(dto.getAnnotationData()); builder.append(dto.getState()); @@ -573,7 +584,8 @@ public class FingerprintFactory { ControllerService controllerService = null; try { if (controller != null) { - controllerService = controller.createControllerService(dto.getType(), UUID.randomUUID().toString(), false).getControllerServiceImplementation(); + final BundleCoordinate coordinate = getCoordinate(dto.getType(), dto.getBundle()); + controllerService = controller.createControllerService(dto.getType(), UUID.randomUUID().toString(), coordinate, false).getControllerServiceImplementation(); } } catch (Exception e) { logger.warn("Unable to create ControllerService of type {} due to {}; its default properties will be fingerprinted instead of being ignored.", dto.getType(), e.toString()); @@ -596,10 +608,37 @@ public class FingerprintFactory { } } + private void addBundleFingerprint(final StringBuilder builder, final BundleDTO bundle) { + if (bundle != null) { + builder.append(bundle.getGroup()); + builder.append(bundle.getArtifact()); + builder.append(bundle.getVersion()); + } else { + builder.append("MISSING_BUNDLE"); + } + } + + private BundleCoordinate getCoordinate(final String type, final BundleDTO dto) { + BundleCoordinate coordinate; + try { + coordinate = BundleUtils.getCompatibleBundle(type, dto); + } catch (final IllegalStateException e) { + if (dto == null) { + coordinate = BundleCoordinate.UNKNOWN_COORDINATE; + } else { + coordinate = new BundleCoordinate(dto.getGroup(), dto.getArtifact(), dto.getVersion()); + } + } + return coordinate; + } + private void addReportingTaskFingerprint(final StringBuilder builder, final ReportingTaskDTO dto, final FlowController controller) { builder.append(dto.getId()); builder.append(dto.getType()); builder.append(dto.getName()); + + addBundleFingerprint(builder, dto.getBundle()); + builder.append(dto.getComments()); builder.append(dto.getSchedulingPeriod()); builder.append(dto.getSchedulingStrategy()); @@ -609,7 +648,8 @@ public class FingerprintFactory { ReportingTask reportingTask = null; try { if (controller != null) { - reportingTask = controller.createReportingTask(dto.getType(), UUID.randomUUID().toString(), false, false).getReportingTask(); + final BundleCoordinate coordinate = getCoordinate(dto.getType(), dto.getBundle()); + reportingTask = controller.createReportingTask(dto.getType(), UUID.randomUUID().toString(), coordinate, false, false).getReportingTask(); } } catch (Exception e) { logger.warn("Unable to create ReportingTask of type {} due to {}; its default properties will be fingerprinted instead of being ignored.", dto.getType(), e.toString()); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java index 08658b1..0565eb7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.MissingBundleException; import org.apache.nifi.controller.UninheritableFlowException; import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.controller.serialization.FlowSynchronizationException; @@ -49,9 +50,10 @@ public interface FlowConfigurationDAO { * @throws FlowSerializationException if proposed flow is not a valid flow configuration file * @throws UninheritableFlowException if the proposed flow cannot be loaded by the controller because in doing so would risk orphaning flow files * @throws FlowSynchronizationException if updates to the controller failed. If this exception is thrown, then the controller should be considered unsafe to be used + * @throws MissingBundleException if the proposed flow cannot be loaded by the controller because it contains a bundle that does not exist in the controller */ void load(FlowController controller, DataFlow dataFlow) - throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException; + throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException; /** * Loads the stored flow onto the given stream. http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index 765bf6f..26c5224 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -28,6 +28,7 @@ import java.util.zip.GZIPOutputStream; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.MissingBundleException; import org.apache.nifi.controller.StandardFlowSynchronizer; import org.apache.nifi.controller.UninheritableFlowException; import org.apache.nifi.controller.serialization.FlowSerializationException; @@ -77,7 +78,7 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD @Override public synchronized void load(final FlowController controller, final DataFlow dataFlow) - throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { + throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException { final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor, nifiProperties); controller.synchronize(flowSynchronizer, dataFlow); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java index 8abd5cd..c8d2549 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/GhostProcessor.java @@ -79,6 +79,7 @@ public class GhostProcessor implements Processor { @Override public void initialize(final ProcessorInitializationContext context) { + } @Override @@ -96,4 +97,5 @@ public class GhostProcessor implements Processor { public String toString() { return "GhostProcessor[id=" + id + "]"; } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java index 167bc7a..71a8236 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/GhostReportingTask.java @@ -24,12 +24,14 @@ import java.util.List; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; public class GhostReportingTask implements ReportingTask { private String id; private String canonicalClassName; + private ComponentLog logger; public void setIdentifier(final String id) { this.id = id; @@ -84,6 +86,7 @@ public class GhostReportingTask implements ReportingTask { @Override public void initialize(ReportingInitializationContext config) throws InitializationException { + this.logger = config.getLogger(); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java new file mode 100644 index 0000000..eda045a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/BundleUtils.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.web.api.dto.BundleDTO; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Utility class for Bundles. + */ +public final class BundleUtils { + + private static BundleCoordinate findBundleForType(final String type, final BundleCoordinate desiredCoordinate) { + final List<Bundle> bundles = ExtensionManager.getBundles(type); + if (bundles.isEmpty()) { + throw new IllegalStateException(String.format("%s is not known to this NiFi instance.", type)); + } else if (bundles.size() > 1) { + if (desiredCoordinate == null) { + throw new IllegalStateException(String.format("Multiple versions of %s exist.", type)); + } else { + throw new IllegalStateException(String.format("Multiple versions of %s exist. No exact match for %s.", type, desiredCoordinate)); + } + } else { + return bundles.get(0).getBundleDetails().getCoordinate(); + } + } + + private static BundleCoordinate findCompatibleBundle(final String type, final BundleDTO bundleDTO, final boolean allowCompatibleBundle) { + final BundleCoordinate coordinate = new BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(), bundleDTO.getVersion()); + final Bundle bundle = ExtensionManager.getBundle(coordinate); + + if (bundle == null) { + if (allowCompatibleBundle) { + return findBundleForType(type, coordinate); + } else { + throw new IllegalStateException(String.format("%s from %s is not known to this NiFi instance.", type, coordinate)); + } + } else { + final List<BundleCoordinate> bundlesForType = ExtensionManager.getBundles(type).stream().map(b -> b.getBundleDetails().getCoordinate()).collect(Collectors.toList()); + if (bundlesForType.contains(coordinate)) { + return coordinate; + } else { + throw new IllegalStateException(String.format("Found bundle %s but does not support %s", coordinate, type)); + } + } + } + + /** + * Gets a bundle that supports the specified type. If the bundle is specified, an + * exact match must be available. + * + * <ul> + * <li>If bundleDTO is specified</li> + * <ul> + * <li>Matching bundle found</li> + * <ul> + * <li>If bundle supports type, use it</li> + * <li>If bundle doesn't support type, throw IllegalStateException</li> + * </ul> + * <li>No matching bundle found, IllegalStateException</li> + * </ul> + * <li>If bundleDTO is not specified</li> + * <ul> + * <li>One bundle that supports the specified type, use it</li> + * <li>No bundle that supports the specified type, IllegalStateException</li> + * <li>Multiple bundle that supports the specified type, IllegalStateException</li> + * </ul> + * </ul> + * + * @param type the component type + * @param bundleDTO bundle to find the component + * @return the bundle coordinate + * @throws IllegalStateException bundle not found + */ + public static BundleCoordinate getBundle(final String type, final BundleDTO bundleDTO) { + if (bundleDTO == null) { + return findBundleForType(type, null); + } else { + return findCompatibleBundle(type, bundleDTO, false); + } + } + + /** + * Gets a compatible bundle that supports the specified type. If the bundle is + * specified but is not available, a compatible bundle may be returned if there + * is only one. + * + * <ul> + * <li>If bundleDTO is specified</li> + * <ul> + * <li>Matching bundle found</li> + * <ul> + * <li>If bundle supports type, use it</li> + * <li>If bundle doesn't support type, throw IllegalStateException</li> + * </ul> + * <li>No matching bundle found</li> + * <ul> + * <li>One bundle that supports the specified type, use it</li> + * <li>No bundle that supports the specified type, IllegalStateException</li> + * <li>Multiple bundle that supports the specified type, IllegalStateException</li> + * </ul> + * </ul> + * <li>If bundleDTO is not specified</li> + * <ul> + * <li>One bundle that supports the specified type, use it</li> + * <li>No bundle that supports the specified type, IllegalStateException</li> + * <li>Multiple bundle that supports the specified type, IllegalStateException</li> + * </ul> + * </ul> + * + * @param type the component type + * @param bundleDTO bundle to find the component + * @return the bundle coordinate + * @throws IllegalStateException no compatible bundle found + */ + public static BundleCoordinate getCompatibleBundle(final String type, final BundleDTO bundleDTO) { + if (bundleDTO == null) { + return findBundleForType(type, null); + } else { + return findCompatibleBundle(type, bundleDTO, true); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index 02a9ca5..e21b991 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -59,6 +59,7 @@ <!-- "class" is the actual Java class that performs the type of processing desired--> <xs:element name="class" type="NonEmptyStringType"/> + <xs:element name="bundle" type="BundleType" /> <!-- the number of concurrent tasks for this configured processor that can be executed at any one time. This value can be 0 @@ -283,6 +284,14 @@ </xs:complexContent> </xs:complexType> + <xs:complexType name="BundleType"> + <xs:sequence> + <xs:element name="group" type="NonEmptyStringType" /> + <xs:element name="artifact" type="NonEmptyStringType" /> + <xs:element name="version" type="NonEmptyStringType" /> + </xs:sequence> + </xs:complexType> + <xs:complexType name="PositionType"> <xs:attribute name="x" type="xs:double" use="required" /> <xs:attribute name="y" type="xs:double" use="required" /> @@ -365,6 +374,7 @@ <xs:element name="name" type="NonEmptyStringType" /> <xs:element name="comment" type="xs:string" /> <xs:element name="class" type="NonEmptyStringType" /> + <xs:element name="bundle" type="BundleType" /> <xs:element name="enabled" type="xs:boolean" /> <xs:element name="property" type="PropertyType" minOccurs="0" maxOccurs="unbounded"/> @@ -384,6 +394,7 @@ <xs:element name="name" type="NonEmptyStringType" /> <xs:element name="comment" type="xs:string" /> <xs:element name="class" type="NonEmptyStringType" /> + <xs:element name="bundle" type="BundleType" /> <xs:element name="schedulingPeriod" type="NonEmptyStringType"/> <xs:element name="scheduledState" type="ScheduledState" /> <xs:element name="schedulingStrategy" type="SchedulingStrategy" /> http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy index 28b314c..738b25d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowSynchronizerSpec.groovy @@ -18,27 +18,37 @@ package org.apache.nifi.controller import groovy.xml.XmlUtil import org.apache.nifi.authorization.Authorizer +import org.apache.nifi.bundle.BundleCoordinate import org.apache.nifi.cluster.protocol.DataFlow import org.apache.nifi.connectable.* import org.apache.nifi.controller.label.Label import org.apache.nifi.controller.queue.FlowFileQueue import org.apache.nifi.groups.ProcessGroup import org.apache.nifi.groups.RemoteProcessGroup +import org.apache.nifi.nar.ExtensionManager import org.apache.nifi.processor.Relationship import org.apache.nifi.reporting.BulletinRepository import org.apache.nifi.util.NiFiProperties +import spock.lang.Shared import spock.lang.Specification import spock.lang.Unroll class StandardFlowSynchronizerSpec extends Specification { - + + @Shared + def systemBundle; + def setupSpec() { def propFile = StandardFlowSynchronizerSpec.class.getResource("/nifi.properties").getFile() System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, propFile + + def niFiProperties = NiFiProperties.createBasicNiFiProperties(null, null); + systemBundle = ExtensionManager.createSystemBundle(niFiProperties); + ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); } def teardownSpec() { - System.clearProperty NiFiProperties.PROPERTIES_FILE_PATH + } @Unroll @@ -67,18 +77,26 @@ class StandardFlowSynchronizerSpec extends Specification { // the unit under test def nifiProperties = NiFiProperties.createBasicNiFiProperties(null, null) def flowSynchronizer = new StandardFlowSynchronizer(null,nifiProperties) + def firstRootGroup = Mock ProcessGroup when: "the flow is synchronized with the current state of the controller" flowSynchronizer.sync controller, proposedFlow, null then: "establish interactions for the mocked collaborators of StandardFlowSynchronizer to store the ending positions of components" - 1 * controller.isInitialized() >> false + 1 * firstRootGroup.findAllProcessors() >> [] + 1 * controller.isFlowSynchronized() >> false _ * controller.rootGroupId >> flowControllerXml.rootGroup.id.text() _ * controller.getGroup(_) >> { String id -> positionableMocksById.get(id) } _ * controller.snippetManager >> snippetManager _ * controller.bulletinRepository >> bulletinRepository _ * controller.authorizer >> authorizer _ * controller./set.*/(*_) + _ * controller.getAllControllerServices() >> [] + _ * controller.getAllReportingTasks() >> [] + _ * controller.getRootGroup() >>> [ + firstRootGroup, + positionableMocksById.get(controller.rootGroupId) + ] _ * controller.createProcessGroup(_) >> { String pgId -> def processGroup = Mock(ProcessGroup) _ * processGroup.getIdentifier() >> pgId @@ -112,7 +130,7 @@ class StandardFlowSynchronizerSpec extends Specification { return processGroup } - _ * controller.createProcessor(_, _, _) >> { String type, String id, boolean firstTimeAdded -> + _ * controller.createProcessor(_, _, _, _) >> { String type, String id, BundleCoordinate coordinate, boolean firstTimeAdded -> def processor = Mock(ProcessorNode) _ * processor.getPosition() >> { positionablePositionsById.get(id) } _ * processor.setPosition(_) >> { Position pos -> @@ -120,6 +138,7 @@ class StandardFlowSynchronizerSpec extends Specification { } _ * processor./(add|set).*/(*_) _ * processor.getIdentifier() >> id + _ * processor.getBundleCoordinate() >> coordinate _ * processor.getRelationship(_) >> { String n -> new Relationship.Builder().name(n).build() } positionableMocksById.put(id, processor) return processor @@ -197,6 +216,7 @@ class StandardFlowSynchronizerSpec extends Specification { [] as byte[] } _ * proposedFlow.authorizerFingerprint >> null + _ * proposedFlow.missingComponents >> [] _ * flowFileQueue./set.*/(*_) _ * _.hashCode() >> 1 http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java index 79b59d7..0bc216f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java @@ -27,6 +27,7 @@ import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.VolatileBulletinRepository; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.util.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; @@ -46,8 +47,8 @@ import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; -import org.apache.nifi.util.FileBasedVariableRegistry; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -75,6 +76,9 @@ public class StandardFlowServiceTest { @Before public void setup() throws Exception { properties = NiFiProperties.createBasicNiFiProperties(null, null); + + + variableRegistry = new FileBasedVariableRegistry(properties.getVariableRegistryPropertiesPaths()); mockFlowFileEventRepository = mock(FlowFileEventRepository.class); authorizer = mock(Authorizer.class); @@ -88,7 +92,7 @@ public class StandardFlowServiceTest { @Test public void testLoadWithFlow() throws IOException { byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml")); - flowService.load(new StandardDataFlow(flowBytes, null, null)); + flowService.load(new StandardDataFlow(flowBytes, null, null, new HashSet<>())); FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -103,16 +107,16 @@ public class StandardFlowServiceTest { @Test(expected = FlowSerializationException.class) public void testLoadWithCorruptFlow() throws IOException { byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-corrupt.xml")); - flowService.load(new StandardDataFlow(flowBytes, null, null)); + flowService.load(new StandardDataFlow(flowBytes, null, null, new HashSet<>())); } @Test public void testLoadExistingFlow() throws IOException { byte[] flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml")); - flowService.load(new StandardDataFlow(flowBytes, null, null)); + flowService.load(new StandardDataFlow(flowBytes, null, null, new HashSet<>())); flowBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-inheritable.xml")); - flowService.load(new StandardDataFlow(flowBytes, null, null)); + flowService.load(new StandardDataFlow(flowBytes, null, null, new HashSet<>())); FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor); ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -126,11 +130,11 @@ public class StandardFlowServiceTest { @Test public void testLoadExistingFlowWithUninheritableFlow() throws IOException { byte[] originalBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml")); - flowService.load(new StandardDataFlow(originalBytes, null, null)); + flowService.load(new StandardDataFlow(originalBytes, null, null, new HashSet<>())); try { byte[] updatedBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-uninheritable.xml")); - flowService.load(new StandardDataFlow(updatedBytes, null, null)); + flowService.load(new StandardDataFlow(updatedBytes, null, null, new HashSet<>())); fail("should have thrown " + UninheritableFlowException.class); } catch (UninheritableFlowException ufe) { @@ -148,11 +152,11 @@ public class StandardFlowServiceTest { @Test public void testLoadExistingFlowWithCorruptFlow() throws IOException { byte[] originalBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow.xml")); - flowService.load(new StandardDataFlow(originalBytes, null, null)); + flowService.load(new StandardDataFlow(originalBytes, null, null, new HashSet<>())); try { byte[] updatedBytes = IOUtils.toByteArray(StandardFlowServiceTest.class.getResourceAsStream("/conf/all-flow-corrupt.xml")); - flowService.load(new StandardDataFlow(updatedBytes, null, null)); + flowService.load(new StandardDataFlow(updatedBytes, null, null, new HashSet<>())); fail("should have thrown " + FlowSerializationException.class); } catch (FlowSerializationException ufe) {
