http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java new file mode 100644 index 0000000..7a35f5a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java @@ -0,0 +1,728 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.DisabledServiceValidationResult; +import org.apache.nifi.components.validation.ValidationState; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.service.ControllerServiceDisabledException; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.util.CharacterFilterUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractComponentNode implements ComponentNode { + private static final Logger logger = LoggerFactory.getLogger(AbstractComponentNode.class); + + private final String id; + private final ValidationContextFactory validationContextFactory; + private final ControllerServiceProvider serviceProvider; + private final AtomicReference<String> name; + private final AtomicReference<String> annotationData = new AtomicReference<>(); + private final AtomicReference<ValidationContext> validationContext = new AtomicReference<>(); + private final String componentType; + private final String componentCanonicalClass; + private final ComponentVariableRegistry variableRegistry; + private final ReloadComponent reloadComponent; + + private final AtomicBoolean isExtensionMissing; + + private final Lock lock = new ReentrantLock(); + private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>(); + private volatile String additionalResourcesFingerprint; + private AtomicReference<ValidationState> validationState = new AtomicReference<>(new ValidationState(ValidationStatus.VALIDATING, Collections.emptyList())); + private final ValidationTrigger validationTrigger; + + public AbstractComponentNode(final String id, + final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, + final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, + final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger, final boolean isExtensionMissing) { + this.id = id; + this.validationContextFactory = validationContextFactory; + this.serviceProvider = serviceProvider; + this.name = new AtomicReference<>(componentType); + this.componentType = componentType; + this.componentCanonicalClass = componentCanonicalClass; + this.variableRegistry = variableRegistry; + this.validationTrigger = validationTrigger; + this.reloadComponent = reloadComponent; + this.isExtensionMissing = new AtomicBoolean(isExtensionMissing); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public void setExtensionMissing(boolean extensionMissing) { + this.isExtensionMissing.set(extensionMissing); + } + + @Override + public boolean isExtensionMissing() { + return isExtensionMissing.get(); + } + + @Override + public String getName() { + return name.get(); + } + + @Override + public void setName(final String name) { + this.name.set(CharacterFilterUtils.filterInvalidXmlCharacters(Objects.requireNonNull(name).intern())); + } + + @Override + public String getAnnotationData() { + return annotationData.get(); + } + + @Override + public void setAnnotationData(final String data) { + annotationData.set(CharacterFilterUtils.filterInvalidXmlCharacters(data)); + resetValidationState(); + } + + @Override + public Set<URL> getAdditionalClasspathResources(final List<PropertyDescriptor> propertyDescriptors) { + final Set<String> modulePaths = new LinkedHashSet<>(); + for (final PropertyDescriptor descriptor : propertyDescriptors) { + if (descriptor.isDynamicClasspathModifier()) { + final String value = getProperty(descriptor); + if (!StringUtils.isEmpty(value)) { + final StandardPropertyValue propertyValue = new StandardPropertyValue(value, null, variableRegistry); + modulePaths.add(propertyValue.evaluateAttributeExpressions().getValue()); + } + } + } + + final Set<URL> additionalUrls = new LinkedHashSet<>(); + try { + final URL[] urls = ClassLoaderUtils.getURLsForClasspath(modulePaths, null, true); + if (urls != null) { + for (final URL url : urls) { + additionalUrls.add(url); + } + } + } catch (MalformedURLException mfe) { + getLogger().error("Error processing classpath resources for " + id + ": " + mfe.getMessage(), mfe); + } + return additionalUrls; + } + + @Override + public void setProperties(final Map<String, String> properties, final boolean allowRemovalOfRequiredProperties) { + if (properties == null) { + return; + } + + lock.lock(); + try { + verifyModifiable(); + + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), id)) { + boolean classpathChanged = false; + for (final Map.Entry<String, String> entry : properties.entrySet()) { + // determine if any of the property changes require resetting the InstanceClassLoader + final PropertyDescriptor descriptor = getComponent().getPropertyDescriptor(entry.getKey()); + if (descriptor.isDynamicClasspathModifier()) { + classpathChanged = true; + } + + if (entry.getKey() != null && entry.getValue() == null) { + removeProperty(entry.getKey(), allowRemovalOfRequiredProperties); + } else if (entry.getKey() != null) { + setProperty(entry.getKey(), CharacterFilterUtils.filterInvalidXmlCharacters(entry.getValue())); + } + } + + // if at least one property with dynamicallyModifiesClasspath(true) was set, then reload the component with the new urls + if (classpathChanged) { + logger.info("Updating classpath for " + this.componentType + " with the ID " + this.getIdentifier()); + + final Set<URL> additionalUrls = getAdditionalClasspathResources(getComponent().getPropertyDescriptors()); + try { + reload(additionalUrls); + } catch (Exception e) { + getLogger().error("Error reloading component with id " + id + ": " + e.getMessage(), e); + } + } + } + + logger.debug("Setting properties to {}; resetting validation state", properties); + resetValidationState(); + } finally { + lock.unlock(); + } + } + + // Keep setProperty/removeProperty private so that all calls go through setProperties + private void setProperty(final String name, final String value) { + if (null == name || null == value) { + throw new IllegalArgumentException("Name or Value can not be null"); + } + + final PropertyDescriptor descriptor = getComponent().getPropertyDescriptor(name); + + final String oldValue = properties.put(descriptor, value); + if (!value.equals(oldValue)) { + + if (descriptor.getControllerServiceDefinition() != null) { + if (oldValue != null) { + final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue); + if (oldNode != null) { + oldNode.removeReference(this); + } + } + + final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value); + if (newNode != null) { + newNode.addReference(this); + } + } + + try { + onPropertyModified(descriptor, oldValue, value); + } catch (final Exception e) { + // nothing really to do here... + } + } + } + + /** + * Removes the property and value for the given property name if a + * descriptor and value exists for the given name. If the property is + * optional its value might be reset to default or will be removed entirely + * if was a dynamic property. + * + * @param name the property to remove + * @param allowRemovalOfRequiredProperties whether or not the property should be removed if it's required + * @return true if removed; false otherwise + * @throws java.lang.IllegalArgumentException if the name is null + */ + private boolean removeProperty(final String name, final boolean allowRemovalOfRequiredProperties) { + if (null == name) { + throw new IllegalArgumentException("Name can not be null"); + } + + final PropertyDescriptor descriptor = getComponent().getPropertyDescriptor(name); + String value = null; + + final boolean allowRemoval = allowRemovalOfRequiredProperties || !descriptor.isRequired(); + if (allowRemoval && (value = properties.remove(descriptor)) != null) { + + if (descriptor.getControllerServiceDefinition() != null) { + if (value != null) { + final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value); + if (oldNode != null) { + oldNode.removeReference(this); + } + } + } + + try { + onPropertyModified(descriptor, value, null); + } catch (final Exception e) { + getLogger().error(e.getMessage(), e); + } + + return true; + } + + return false; + } + + @Override + public Map<PropertyDescriptor, String> getProperties() { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getIdentifier())) { + final List<PropertyDescriptor> supported = getComponent().getPropertyDescriptors(); + if (supported == null || supported.isEmpty()) { + return Collections.unmodifiableMap(properties); + } else { + final Map<PropertyDescriptor, String> props = new LinkedHashMap<>(); + for (final PropertyDescriptor descriptor : supported) { + props.put(descriptor, null); + } + props.putAll(properties); + return props; + } + } + } + + @Override + public String getProperty(final PropertyDescriptor property) { + return properties.get(property); + } + + @Override + public void refreshProperties() { + // use setProperty instead of setProperties so we can bypass the class loading logic + getProperties().entrySet().stream() + .filter(e -> e.getKey() != null && e.getValue() != null) + .forEach(e -> setProperty(e.getKey().getName(), e.getValue())); + } + + /** + * Generates fingerprint for the additional urls and compares it with the previous + * fingerprint value. If the fingerprint values don't match, the function calls the + * component's reload() to load the newly found resources. + */ + @Override + public synchronized void reloadAdditionalResourcesIfNecessary() { + // Components that don't have any PropertyDescriptors marked `dynamicallyModifiesClasspath` + // won't have the fingerprint i.e. will be null, in such cases do nothing + if (additionalResourcesFingerprint == null) { + return; + } + + final List<PropertyDescriptor> descriptors = new ArrayList<>(this.getProperties().keySet()); + final Set<URL> additionalUrls = this.getAdditionalClasspathResources(descriptors); + + final String newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls); + if(!StringUtils.equals(additionalResourcesFingerprint, newFingerprint)) { + setAdditionalResourcesFingerprint(newFingerprint); + try { + logger.info("Updating classpath for " + this.componentType + " with the ID " + this.getIdentifier()); + reload(additionalUrls); + } catch (Exception e) { + logger.error("Error reloading component with id " + id + ": " + e.getMessage(), e); + } + } + } + + @Override + public int hashCode() { + return 273171 * id.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + + if (!(obj instanceof ComponentNode)) { + return false; + } + + final ComponentNode other = (ComponentNode) obj; + return id.equals(other.getIdentifier()); + } + + @Override + public String toString() { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + return getComponent().toString(); + } + } + + @Override + public final void performValidation() { + boolean replaced = false; + do { + final ValidationState validationState = getValidationState(); + + final ValidationContext validationContext = getValidationContext(); + final Collection<ValidationResult> results = new ArrayList<>(); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getIdentifier())) { + final Collection<ValidationResult> validationResults = computeValidationErrors(validationContext); + results.addAll(validationResults); + + // validate selected controller services implement the API required by the processor + final Collection<ValidationResult> referencedServiceValidationResults = validateReferencedControllerServices(validationContext); + results.addAll(referencedServiceValidationResults); + } + + final ValidationStatus status = results.isEmpty() ? ValidationStatus.VALID : ValidationStatus.INVALID; + final ValidationState updatedState = new ValidationState(status, results); + replaced = replaceValidationState(validationState, updatedState); + } while (!replaced); + } + + protected Collection<ValidationResult> computeValidationErrors(final ValidationContext validationContext) { + Throwable failureCause = null; + try { + final Collection<ValidationResult> results = getComponent().validate(validationContext); + logger.debug("Computed validation errors with Validation Context {}; results = {}", validationContext, results); + + return results; + } catch (final ControllerServiceDisabledException e) { + getLogger().debug("Failed to perform validation due to " + e, e); + return Collections.singleton( + new DisabledServiceValidationResult("Component", e.getControllerServiceId(), "performing validation depends on referencing a Controller Service that is currently disabled")); + } catch (final Exception e) { + // We don't want to log this as an error because we will return a ValidationResult that is + // invalid. However, we do want to make the stack trace available if needed, so we log it at + // a debug level. + getLogger().debug("Failed to perform validation due to " + e, e); + failureCause = e; + } catch (final Error e) { + getLogger().error("Failed to perform validation due to " + e, e); + failureCause = e; + } + + return Collections.singleton(new ValidationResult.Builder() + .subject("Component") + .valid(false) + .explanation("Failed to perform validation due to " + failureCause) + .build()); + } + + protected final Collection<ValidationResult> validateReferencedControllerServices(final ValidationContext validationContext) { + final List<PropertyDescriptor> supportedDescriptors = getComponent().getPropertyDescriptors(); + if (supportedDescriptors == null) { + return Collections.emptyList(); + } + + final Collection<ValidationResult> validationResults = new ArrayList<>(); + for (final PropertyDescriptor descriptor : supportedDescriptors) { + if (descriptor.getControllerServiceDefinition() == null) { + // skip properties that aren't for a controller service + continue; + } + + final String controllerServiceId = validationContext.getProperty(descriptor).getValue(); + if (controllerServiceId == null) { + continue; + } + + final ControllerServiceNode controllerServiceNode = getControllerServiceProvider().getControllerServiceNode(controllerServiceId); + if (controllerServiceNode == null) { + final ValidationResult result = createInvalidResult(controllerServiceId, descriptor.getDisplayName(), + "Invalid Controller Service: " + controllerServiceId + " is not a valid Controller Service Identifier"); + + validationResults.add(result); + continue; + } + + final ValidationResult apiResult = validateControllerServiceApi(descriptor, controllerServiceNode); + if (apiResult != null) { + validationResults.add(apiResult); + continue; + } + + if (!controllerServiceNode.isActive()) { + validationResults.add(new DisabledServiceValidationResult(descriptor.getDisplayName(), controllerServiceId)); + } + } + + return validationResults; + } + + + private ValidationResult validateControllerServiceApi(final PropertyDescriptor descriptor, final ControllerServiceNode controllerServiceNode) { + final Class<? extends ControllerService> controllerServiceApiClass = descriptor.getControllerServiceDefinition(); + final ClassLoader controllerServiceApiClassLoader = controllerServiceApiClass.getClassLoader(); + + final String serviceId = controllerServiceNode.getIdentifier(); + final String propertyName = descriptor.getDisplayName(); + + final Bundle controllerServiceApiBundle = ExtensionManager.getBundle(controllerServiceApiClassLoader); + if (controllerServiceApiBundle == null) { + return createInvalidResult(serviceId, propertyName, "Unable to find bundle for ControllerService API class " + controllerServiceApiClass.getCanonicalName()); + } + final BundleCoordinate controllerServiceApiCoordinate = controllerServiceApiBundle.getBundleDetails().getCoordinate(); + + final Bundle controllerServiceBundle = ExtensionManager.getBundle(controllerServiceNode.getBundleCoordinate()); + if (controllerServiceBundle == null) { + return createInvalidResult(serviceId, propertyName, "Unable to find bundle for coordinate " + controllerServiceNode.getBundleCoordinate()); + } + final BundleCoordinate controllerServiceCoordinate = controllerServiceBundle.getBundleDetails().getCoordinate(); + + final boolean matchesApi = matchesApi(controllerServiceBundle, controllerServiceApiCoordinate); + + if (!matchesApi) { + final String controllerServiceType = controllerServiceNode.getComponentType(); + final String controllerServiceApiType = controllerServiceApiClass.getSimpleName(); + + final String explanation = new StringBuilder() + .append(controllerServiceType).append(" - ").append(controllerServiceCoordinate.getVersion()) + .append(" from ").append(controllerServiceCoordinate.getGroup()).append(" - ").append(controllerServiceCoordinate.getId()) + .append(" is not compatible with ").append(controllerServiceApiType).append(" - ").append(controllerServiceApiCoordinate.getVersion()) + .append(" from ").append(controllerServiceApiCoordinate.getGroup()).append(" - ").append(controllerServiceApiCoordinate.getId()) + .toString(); + + return createInvalidResult(serviceId, propertyName, explanation); + } + + return null; + } + + private ValidationResult createInvalidResult(final String serviceId, final String propertyName, final String explanation) { + return new ValidationResult.Builder() + .input(serviceId) + .subject(propertyName) + .valid(false) + .explanation(explanation) + .build(); + } + + /** + * Determines if the given controller service node has the required API as an ancestor. + * + * @param controllerServiceImplBundle the bundle of a controller service being referenced by a processor + * @param requiredApiCoordinate the controller service API required by the processor + * @return true if the controller service node has the require API as an ancestor, false otherwise + */ + private boolean matchesApi(final Bundle controllerServiceImplBundle, final BundleCoordinate requiredApiCoordinate) { + // start with the coordinate of the controller service for cases where the API and service are in the same bundle + BundleCoordinate controllerServiceDependencyCoordinate = controllerServiceImplBundle.getBundleDetails().getCoordinate(); + + boolean foundApiDependency = false; + while (controllerServiceDependencyCoordinate != null) { + // determine if the dependency coordinate matches the required API + if (requiredApiCoordinate.equals(controllerServiceDependencyCoordinate)) { + foundApiDependency = true; + break; + } + + // move to the next dependency in the chain, or stop if null + final Bundle controllerServiceDependencyBundle = ExtensionManager.getBundle(controllerServiceDependencyCoordinate); + if (controllerServiceDependencyBundle == null) { + controllerServiceDependencyCoordinate = null; + } else { + controllerServiceDependencyCoordinate = controllerServiceDependencyBundle.getBundleDetails().getDependencyCoordinate(); + } + } + + return foundApiDependency; + } + + @Override + public PropertyDescriptor getPropertyDescriptor(final String name) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + return getComponent().getPropertyDescriptor(name); + } + } + + @Override + public List<PropertyDescriptor> getPropertyDescriptors() { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + return getComponent().getPropertyDescriptors(); + } + } + + + private final void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { + getComponent().onPropertyModified(descriptor, oldValue, newValue); + } + } + + + @Override + public ValidationStatus getValidationStatus() { + return validationState.get().getStatus(); + } + + @Override + public ValidationStatus getValidationStatus(long timeout, TimeUnit timeUnit) { + long millis = timeUnit.toMillis(timeout); + final long maxTime = System.currentTimeMillis() + millis; + + synchronized (validationState) { + while (getValidationStatus() == ValidationStatus.VALIDATING) { + try { + final long waitMillis = Math.max(0, maxTime - System.currentTimeMillis()); + if (waitMillis <= 0) { + break; + } + + validationState.wait(waitMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return getValidationStatus(); + } + } + + return getValidationStatus(); + } + } + + protected ValidationState getValidationState() { + return validationState.get(); + } + + private boolean replaceValidationState(final ValidationState expectedState, final ValidationState newState) { + synchronized (validationState) { + if (validationState.compareAndSet(expectedState, newState)) { + validationState.notifyAll(); + return true; + } + + return false; + } + } + + protected void resetValidationState() { + validationContext.set(null); + validationState.set(new ValidationState(ValidationStatus.VALIDATING, Collections.emptyList())); + validationTrigger.triggerAsync(this); + } + + @Override + public Collection<ValidationResult> getValidationErrors() { + return getValidationErrors(Collections.emptySet()); + } + + protected Collection<ValidationResult> getValidationErrors(final Set<ControllerServiceNode> servicesToIgnore) { + final ValidationState validationState = this.validationState.get(); + if (validationState.getStatus() == ValidationStatus.VALIDATING) { + return null; + } + + final Collection<ValidationResult> validationErrors = validationState.getValidationErrors(); + if (servicesToIgnore == null || servicesToIgnore.isEmpty()) { + return validationErrors; + } + + final Set<String> ignoredServiceIds = servicesToIgnore.stream() + .map(ControllerServiceNode::getIdentifier) + .collect(Collectors.toSet()); + + final List<ValidationResult> retainedValidationErrors = new ArrayList<>(); + for (final ValidationResult result : validationErrors) { + if (!(result instanceof DisabledServiceValidationResult)) { + retainedValidationErrors.add(result); + continue; + } + + final String serviceId = ((DisabledServiceValidationResult) result).getControllerServiceIdentifier(); + if (!ignoredServiceIds.contains(serviceId)) { + retainedValidationErrors.add(result); + } + } + + return retainedValidationErrors; + } + + public abstract void verifyModifiable() throws IllegalStateException; + + /** + * + */ + ControllerServiceProvider getControllerServiceProvider() { + return this.serviceProvider; + } + + @Override + public String getCanonicalClassName() { + return componentCanonicalClass; + } + + @Override + public String getComponentType() { + return componentType; + } + + protected ValidationContextFactory getValidationContextFactory() { + return this.validationContextFactory; + } + + protected ValidationContext getValidationContext() { + while (true) { + ValidationContext context = this.validationContext.get(); + if (context != null) { + return context; + } + + // Use a lock here because we want to prevent calls to getProperties() from happening while setProperties() is also happening. + final Map<PropertyDescriptor, String> properties; + lock.lock(); + try { + properties = getProperties(); + } finally { + lock.unlock(); + } + context = getValidationContextFactory().newValidationContext(properties, getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); + + final boolean updated = validationContext.compareAndSet(null, context); + if (updated) { + logger.debug("Updating validation context to {}", context); + return context; + } + } + } + + @Override + public ComponentVariableRegistry getVariableRegistry() { + return this.variableRegistry; + } + + protected ReloadComponent getReloadComponent() { + return this.reloadComponent; + } + + @Override + public void verifyCanUpdateBundle(final BundleCoordinate incomingCoordinate) throws IllegalArgumentException { + final BundleCoordinate existingCoordinate = getBundleCoordinate(); + + // determine if this update is changing the bundle for the processor + if (!existingCoordinate.equals(incomingCoordinate)) { + // if it is changing the bundle, only allow it to change to a different version within same group and id + if (!existingCoordinate.getGroup().equals(incomingCoordinate.getGroup()) + || !existingCoordinate.getId().equals(incomingCoordinate.getId())) { + throw new IllegalArgumentException(String.format( + "Unable to update component %s from %s to %s because bundle group and id must be the same.", + getIdentifier(), existingCoordinate.getCoordinate(), incomingCoordinate.getCoordinate())); + } + } + } + + protected void setAdditionalResourcesFingerprint(String additionalResourcesFingerprint) { + this.additionalResourcesFingerprint = additionalResourcesFingerprint; + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java deleted file mode 100644 index 82a79f0..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ /dev/null @@ -1,607 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.net.MalformedURLException; -import java.net.URL; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.attribute.expression.language.StandardPropertyValue; -import org.apache.nifi.bundle.Bundle; -import org.apache.nifi.bundle.BundleCoordinate; -import org.apache.nifi.components.ConfigurableComponent; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.service.ControllerServiceNode; -import org.apache.nifi.controller.service.ControllerServiceProvider; -import org.apache.nifi.nar.ExtensionManager; -import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.registry.ComponentVariableRegistry; -import org.apache.nifi.util.CharacterFilterUtils; -import org.apache.nifi.util.file.classloader.ClassLoaderUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent { - private static final Logger logger = LoggerFactory.getLogger(AbstractConfiguredComponent.class); - - private final String id; - private final ValidationContextFactory validationContextFactory; - private final ControllerServiceProvider serviceProvider; - private final AtomicReference<String> name; - private final AtomicReference<String> annotationData = new AtomicReference<>(); - private final AtomicReference<ValidationContext> validationContext = new AtomicReference<>(); - private final String componentType; - private final String componentCanonicalClass; - private final ComponentVariableRegistry variableRegistry; - private final ReloadComponent reloadComponent; - - private final AtomicBoolean isExtensionMissing; - - private final Lock lock = new ReentrantLock(); - private final ConcurrentMap<PropertyDescriptor, String> properties = new ConcurrentHashMap<>(); - private volatile String additionalResourcesFingerprint; - - public AbstractConfiguredComponent(final String id, - final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, - final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, - final ReloadComponent reloadComponent, final boolean isExtensionMissing) { - this.id = id; - this.validationContextFactory = validationContextFactory; - this.serviceProvider = serviceProvider; - this.name = new AtomicReference<>(componentType); - this.componentType = componentType; - this.componentCanonicalClass = componentCanonicalClass; - this.variableRegistry = variableRegistry; - this.isExtensionMissing = new AtomicBoolean(isExtensionMissing); - this.reloadComponent = reloadComponent; - } - - @Override - public String getIdentifier() { - return id; - } - - @Override - public void setExtensionMissing(boolean extensionMissing) { - this.isExtensionMissing.set(extensionMissing); - } - - @Override - public boolean isExtensionMissing() { - return isExtensionMissing.get(); - } - - @Override - public String getName() { - return name.get(); - } - - @Override - public void setName(final String name) { - this.name.set(CharacterFilterUtils.filterInvalidXmlCharacters(Objects.requireNonNull(name).intern())); - } - - @Override - public String getAnnotationData() { - return annotationData.get(); - } - - @Override - public void setAnnotationData(final String data) { - invalidateValidationContext(); - annotationData.set(CharacterFilterUtils.filterInvalidXmlCharacters(data)); - } - - @Override - public Set<URL> getAdditionalClasspathResources(final List<PropertyDescriptor> propertyDescriptors) { - final Set<String> modulePaths = new LinkedHashSet<>(); - for (final PropertyDescriptor descriptor : propertyDescriptors) { - if (descriptor.isDynamicClasspathModifier()) { - final String value = getProperty(descriptor); - if (!StringUtils.isEmpty(value)) { - final StandardPropertyValue propertyValue = new StandardPropertyValue(value, null, variableRegistry); - modulePaths.add(propertyValue.evaluateAttributeExpressions().getValue()); - } - } - } - - final Set<URL> additionalUrls = new LinkedHashSet<>(); - try { - final URL[] urls = ClassLoaderUtils.getURLsForClasspath(modulePaths, null, true); - if (urls != null) { - for (final URL url : urls) { - additionalUrls.add(url); - } - } - } catch (MalformedURLException mfe) { - getLogger().error("Error processing classpath resources for " + id + ": " + mfe.getMessage(), mfe); - } - return additionalUrls; - } - - @Override - public void setProperties(final Map<String, String> properties, final boolean allowRemovalOfRequiredProperties) { - if (properties == null) { - return; - } - - lock.lock(); - try { - verifyModifiable(); - - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), id)) { - boolean classpathChanged = false; - for (final Map.Entry<String, String> entry : properties.entrySet()) { - // determine if any of the property changes require resetting the InstanceClassLoader - final PropertyDescriptor descriptor = getComponent().getPropertyDescriptor(entry.getKey()); - if (descriptor.isDynamicClasspathModifier()) { - classpathChanged = true; - } - - if (entry.getKey() != null && entry.getValue() == null) { - removeProperty(entry.getKey(), allowRemovalOfRequiredProperties); - } else if (entry.getKey() != null) { - setProperty(entry.getKey(), CharacterFilterUtils.filterInvalidXmlCharacters(entry.getValue())); - } - } - - // if at least one property with dynamicallyModifiesClasspath(true) was set, then reload the component with the new urls - if (classpathChanged) { - logger.info("Updating classpath for " + this.componentType + " with the ID " + this.getIdentifier()); - - final Set<URL> additionalUrls = getAdditionalClasspathResources(getComponent().getPropertyDescriptors()); - try { - reload(additionalUrls); - } catch (Exception e) { - getLogger().error("Error reloading component with id " + id + ": " + e.getMessage(), e); - } - } - } - } finally { - lock.unlock(); - } - } - - // Keep setProperty/removeProperty private so that all calls go through setProperties - private void setProperty(final String name, final String value) { - if (null == name || null == value) { - throw new IllegalArgumentException("Name or Value can not be null"); - } - - final PropertyDescriptor descriptor = getComponent().getPropertyDescriptor(name); - - final String oldValue = properties.put(descriptor, value); - if (!value.equals(oldValue)) { - - if (descriptor.getControllerServiceDefinition() != null) { - if (oldValue != null) { - final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(oldValue); - if (oldNode != null) { - oldNode.removeReference(this); - } - } - - final ControllerServiceNode newNode = serviceProvider.getControllerServiceNode(value); - if (newNode != null) { - newNode.addReference(this); - } - } - - try { - onPropertyModified(descriptor, oldValue, value); - } catch (final Exception e) { - // nothing really to do here... - } - } - } - - /** - * Removes the property and value for the given property name if a - * descriptor and value exists for the given name. If the property is - * optional its value might be reset to default or will be removed entirely - * if was a dynamic property. - * - * @param name the property to remove - * @param allowRemovalOfRequiredProperties whether or not the property should be removed if it's required - * @return true if removed; false otherwise - * @throws java.lang.IllegalArgumentException if the name is null - */ - private boolean removeProperty(final String name, final boolean allowRemovalOfRequiredProperties) { - if (null == name) { - throw new IllegalArgumentException("Name can not be null"); - } - - final PropertyDescriptor descriptor = getComponent().getPropertyDescriptor(name); - String value = null; - - final boolean allowRemoval = allowRemovalOfRequiredProperties || !descriptor.isRequired(); - if (allowRemoval && (value = properties.remove(descriptor)) != null) { - - if (descriptor.getControllerServiceDefinition() != null) { - if (value != null) { - final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value); - if (oldNode != null) { - oldNode.removeReference(this); - } - } - } - - try { - onPropertyModified(descriptor, value, null); - } catch (final Exception e) { - getLogger().error(e.getMessage(), e); - } - - return true; - } - - return false; - } - - @Override - public Map<PropertyDescriptor, String> getProperties() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { - final List<PropertyDescriptor> supported = getComponent().getPropertyDescriptors(); - if (supported == null || supported.isEmpty()) { - return Collections.unmodifiableMap(properties); - } else { - final Map<PropertyDescriptor, String> props = new LinkedHashMap<>(); - for (final PropertyDescriptor descriptor : supported) { - props.put(descriptor, null); - } - props.putAll(properties); - return props; - } - } - } - - @Override - public String getProperty(final PropertyDescriptor property) { - return properties.get(property); - } - - @Override - public void refreshProperties() { - // use setProperty instead of setProperties so we can bypass the class loading logic - getProperties().entrySet().stream() - .filter(e -> e.getKey() != null && e.getValue() != null) - .forEach(e -> setProperty(e.getKey().getName(), e.getValue())); - } - - /** - * Generates fingerprint for the additional urls and compares it with the previous - * fingerprint value. If the fingerprint values don't match, the function calls the - * component's reload() to load the newly found resources. - */ - @Override - public synchronized void reloadAdditionalResourcesIfNecessary() { - // Components that don't have any PropertyDescriptors marked `dynamicallyModifiesClasspath` - // won't have the fingerprint i.e. will be null, in such cases do nothing - if (additionalResourcesFingerprint == null) { - return; - } - - final List<PropertyDescriptor> descriptors = new ArrayList<>(this.getProperties().keySet()); - final Set<URL> additionalUrls = this.getAdditionalClasspathResources(descriptors); - - final String newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls); - if(!StringUtils.equals(additionalResourcesFingerprint, newFingerprint)) { - setAdditionalResourcesFingerprint(newFingerprint); - try { - logger.info("Updating classpath for " + this.componentType + " with the ID " + this.getIdentifier()); - reload(additionalUrls); - } catch (Exception e) { - logger.error("Error reloading component with id " + id + ": " + e.getMessage(), e); - } - } - } - - @Override - public int hashCode() { - return 273171 * id.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if (obj == this) { - return true; - } - if (obj == null) { - return false; - } - - if (!(obj instanceof ConfiguredComponent)) { - return false; - } - - final ConfiguredComponent other = (ConfiguredComponent) obj; - return id.equals(other.getIdentifier()); - } - - @Override - public String toString() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { - return getComponent().toString(); - } - } - - @Override - public Collection<ValidationResult> validate(final ValidationContext context) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { - final Collection<ValidationResult> validationResults = getComponent().validate(context); - - // validate selected controller services implement the API required by the processor - - final List<PropertyDescriptor> supportedDescriptors = getComponent().getPropertyDescriptors(); - if (null != supportedDescriptors) { - for (final PropertyDescriptor descriptor : supportedDescriptors) { - if (descriptor.getControllerServiceDefinition() == null) { - // skip properties that aren't for a controller service - continue; - } - - final String controllerServiceId = context.getProperty(descriptor).getValue(); - if (controllerServiceId == null) { - // if the property value is null we should already have a validation error - continue; - } - - final ControllerServiceNode controllerServiceNode = getControllerServiceProvider().getControllerServiceNode(controllerServiceId); - if (controllerServiceNode == null) { - // if the node was null we should already have a validation error - continue; - } - - final Class<? extends ControllerService> controllerServiceApiClass = descriptor.getControllerServiceDefinition(); - final ClassLoader controllerServiceApiClassLoader = controllerServiceApiClass.getClassLoader(); - - final Consumer<String> addValidationError = explanation -> validationResults.add(new ValidationResult.Builder() - .input(controllerServiceId) - .subject(descriptor.getDisplayName()) - .valid(false) - .explanation(explanation) - .build()); - - final Bundle controllerServiceApiBundle = ExtensionManager.getBundle(controllerServiceApiClassLoader); - if (controllerServiceApiBundle == null) { - addValidationError.accept(String.format("Unable to find bundle for ControllerService API class %s.", controllerServiceApiClass.getCanonicalName())); - continue; - } - final BundleCoordinate controllerServiceApiCoordinate = controllerServiceApiBundle.getBundleDetails().getCoordinate(); - - final Bundle controllerServiceBundle = ExtensionManager.getBundle(controllerServiceNode.getBundleCoordinate()); - if (controllerServiceBundle == null) { - addValidationError.accept(String.format("Unable to find bundle for coordinate %s.", controllerServiceNode.getBundleCoordinate())); - continue; - } - final BundleCoordinate controllerServiceCoordinate = controllerServiceBundle.getBundleDetails().getCoordinate(); - - final boolean matchesApi = matchesApi(controllerServiceBundle, controllerServiceApiCoordinate); - - if (!matchesApi) { - final String controllerServiceType = controllerServiceNode.getComponentType(); - final String controllerServiceApiType = controllerServiceApiClass.getSimpleName(); - - final String explanation = new StringBuilder() - .append(controllerServiceType).append(" - ").append(controllerServiceCoordinate.getVersion()) - .append(" from ").append(controllerServiceCoordinate.getGroup()).append(" - ").append(controllerServiceCoordinate.getId()) - .append(" is not compatible with ").append(controllerServiceApiType).append(" - ").append(controllerServiceApiCoordinate.getVersion()) - .append(" from ").append(controllerServiceApiCoordinate.getGroup()).append(" - ").append(controllerServiceApiCoordinate.getId()) - .toString(); - - addValidationError.accept(explanation); - } - - } - } - - return validationResults; - } - } - - /** - * Determines if the given controller service node has the required API as an ancestor. - * - * @param controllerServiceImplBundle the bundle of a controller service being referenced by a processor - * @param requiredApiCoordinate the controller service API required by the processor - * @return true if the controller service node has the require API as an ancestor, false otherwise - */ - private boolean matchesApi(final Bundle controllerServiceImplBundle, final BundleCoordinate requiredApiCoordinate) { - // start with the coordinate of the controller service for cases where the API and service are in the same bundle - BundleCoordinate controllerServiceDependencyCoordinate = controllerServiceImplBundle.getBundleDetails().getCoordinate(); - - boolean foundApiDependency = false; - while (controllerServiceDependencyCoordinate != null) { - // determine if the dependency coordinate matches the required API - if (requiredApiCoordinate.equals(controllerServiceDependencyCoordinate)) { - foundApiDependency = true; - break; - } - - // move to the next dependency in the chain, or stop if null - final Bundle controllerServiceDependencyBundle = ExtensionManager.getBundle(controllerServiceDependencyCoordinate); - if (controllerServiceDependencyBundle == null) { - controllerServiceDependencyCoordinate = null; - } else { - controllerServiceDependencyCoordinate = controllerServiceDependencyBundle.getBundleDetails().getDependencyCoordinate(); - } - } - - return foundApiDependency; - } - - @Override - public PropertyDescriptor getPropertyDescriptor(final String name) { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { - return getComponent().getPropertyDescriptor(name); - } - } - - @Override - public final void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - invalidateValidationContext(); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { - getComponent().onPropertyModified(descriptor, oldValue, newValue); - } - } - - @Override - public List<PropertyDescriptor> getPropertyDescriptors() { - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { - return getComponent().getPropertyDescriptors(); - } - } - - @Override - public boolean isValid() { - final Collection<ValidationResult> validationResults = validate(getValidationContext()); - - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - return false; - } - } - - return true; - } - - @Override - public Collection<ValidationResult> getValidationErrors() { - return getValidationErrors(Collections.<String>emptySet()); - } - - public Collection<ValidationResult> getValidationErrors(final Set<String> serviceIdentifiersNotToValidate) { - final List<ValidationResult> results = new ArrayList<>(); - lock.lock(); - try { - final ValidationContext validationContext; - if (serviceIdentifiersNotToValidate == null || serviceIdentifiersNotToValidate.isEmpty()) { - validationContext = getValidationContext(); - } else { - validationContext = getValidationContextFactory().newValidationContext(serviceIdentifiersNotToValidate, - getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); - } - - final Collection<ValidationResult> validationResults; - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getComponent().getClass(), getComponent().getIdentifier())) { - validationResults = getComponent().validate(validationContext); - } - - for (final ValidationResult result : validationResults) { - if (!result.isValid()) { - results.add(result); - } - } - } catch (final Throwable t) { - logger.error("Failed to perform validation of " + this, t); - results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build()); - } finally { - lock.unlock(); - } - return results; - } - - public abstract void verifyModifiable() throws IllegalStateException; - - /** - * - */ - ControllerServiceProvider getControllerServiceProvider() { - return this.serviceProvider; - } - - @Override - public String getCanonicalClassName() { - return componentCanonicalClass; - } - - @Override - public String getComponentType() { - return componentType; - } - - protected ValidationContextFactory getValidationContextFactory() { - return this.validationContextFactory; - } - - protected void invalidateValidationContext() { - this.validationContext.set(null); - } - - protected ValidationContext getValidationContext() { - while (true) { - ValidationContext context = this.validationContext.get(); - if (context != null) { - return context; - } - - context = getValidationContextFactory().newValidationContext(getProperties(), getAnnotationData(), getProcessGroupIdentifier(), getIdentifier()); - final boolean updated = validationContext.compareAndSet(null, context); - if (updated) { - return context; - } - } - } - - @Override - public ComponentVariableRegistry getVariableRegistry() { - return this.variableRegistry; - } - - protected ReloadComponent getReloadComponent() { - return this.reloadComponent; - } - - @Override - public void verifyCanUpdateBundle(final BundleCoordinate incomingCoordinate) throws IllegalArgumentException { - final BundleCoordinate existingCoordinate = getBundleCoordinate(); - - // determine if this update is changing the bundle for the processor - if (!existingCoordinate.equals(incomingCoordinate)) { - // if it is changing the bundle, only allow it to change to a different version within same group and id - if (!existingCoordinate.getGroup().equals(incomingCoordinate.getGroup()) - || !existingCoordinate.getId().equals(incomingCoordinate.getId())) { - throw new IllegalArgumentException(String.format( - "Unable to update component %s from %s to %s because bundle group and id must be the same.", - getIdentifier(), existingCoordinate.getCoordinate(), incomingCoordinate.getCoordinate())); - } - } - } - - protected void setAdditionalResourcesFingerprint(String additionalResourcesFingerprint) { - this.additionalResourcesFingerprint = additionalResourcesFingerprint; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java new file mode 100644 index 0000000..d65e8d6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.net.URL; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.authorization.AccessDeniedException; +import org.apache.nifi.authorization.AuthorizationResult; +import org.apache.nifi.authorization.AuthorizationResult.Result; +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.RequestAction; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.authorization.resource.ComponentAuthorizable; +import org.apache.nifi.authorization.resource.RestrictedComponentsAuthorizableFactory; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.registry.ComponentVariableRegistry; + +public interface ComponentNode extends ComponentAuthorizable { + + @Override + public String getIdentifier(); + + public String getName(); + + public void setName(String name); + + public String getAnnotationData(); + + public void setAnnotationData(String data); + + public default void setProperties(Map<String, String> properties) { + setProperties(properties, false); + } + + public void setProperties(Map<String, String> properties, boolean allowRemovalOfRequiredProperties); + + public Map<PropertyDescriptor, String> getProperties(); + + public String getProperty(final PropertyDescriptor property); + + void reload(Set<URL> additionalUrls) throws Exception; + + void refreshProperties(); + + Set<URL> getAdditionalClasspathResources(List<PropertyDescriptor> propertyDescriptors); + + BundleCoordinate getBundleCoordinate(); + + ConfigurableComponent getComponent(); + + TerminationAwareLogger getLogger(); + + boolean isExtensionMissing(); + + void setExtensionMissing(boolean extensionMissing); + + void verifyCanUpdateBundle(BundleCoordinate bundleCoordinate) throws IllegalStateException; + + void reloadAdditionalResourcesIfNecessary(); + + /** + * @return the any validation errors for this connectable + */ + Collection<ValidationResult> getValidationErrors(); + + /** + * @return the type of the component. I.e., the class name of the implementation + */ + String getComponentType(); + + /** + * @return the class of the underlying + */ + Class<?> getComponentClass(); + + /** + * @return the Canonical Class Name of the component + */ + String getCanonicalClassName(); + + /** + * @return whether or not the underlying implementation has any restrictions + */ + boolean isRestricted(); + + /** + * @return whether or not the underlying implementation is deprecated + */ + boolean isDeprecated(); + + /** + * Indicates whether or not validation should be run on the component, based on its current state. + * + * @return <code>true</code> if the component needs validation, <code>false</code> otherwise + */ + boolean isValidationNecessary(); + + /** + * @return the variable registry for this component + */ + ComponentVariableRegistry getVariableRegistry(); + + /** + * Returns the processor's current Validation Status + * + * @return the processor's current Validation Status + */ + public abstract ValidationStatus getValidationStatus(); + + /** + * Returns the processor's Validation Status, waiting up to the given amount of time for the Validation to complete + * if it is currently in the process of validating. If the processor is currently in the process of validation and + * the validation logic does not complete in the given amount of time, or if the thread is interrupted, then a Validation Status + * of {@link ValidationStatus#VALIDATING VALIDATING} will be returned. + * + * @param timeout the max amount of time to wait + * @param unit the time unit + * @return the ValidationStatus + */ + public abstract ValidationStatus getValidationStatus(long timeout, TimeUnit unit); + + /** + * Asynchronously begins the validation process + */ + public abstract void performValidation(); + + /** + * Returns a {@link List} of all {@link PropertyDescriptor}s that this + * component supports. + * + * @return PropertyDescriptor objects this component currently supports + */ + List<PropertyDescriptor> getPropertyDescriptors(); + + /** + * @param name to lookup the descriptor + * @return the PropertyDescriptor with the given name, if it exists; + * otherwise, returns <code>null</code> + */ + PropertyDescriptor getPropertyDescriptor(String name); + + + @Override + default AuthorizationResult checkAuthorization(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) { + // if this is a modification request and the reporting task is restricted ensure the user has elevated privileges. if this + // is not a modification request, we just want to use the normal rules + if (RequestAction.WRITE.equals(action) && isRestricted()) { + final Set<Authorizable> restrictedComponentsAuthorizables = RestrictedComponentsAuthorizableFactory.getRestrictedComponentsAuthorizable(getComponentClass()); + + for (final Authorizable restrictedComponentsAuthorizable : restrictedComponentsAuthorizables) { + final AuthorizationResult result = restrictedComponentsAuthorizable.checkAuthorization(authorizer, RequestAction.WRITE, user, resourceContext); + if (Result.Denied.equals(result.getResult())) { + return result; + } + } + } + + // defer to the base authorization check + return ComponentAuthorizable.super.checkAuthorization(authorizer, action, user, resourceContext); + } + + @Override + default void authorize(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) throws AccessDeniedException { + // if this is a modification request and the reporting task is restricted ensure the user has elevated privileges. if this + // is not a modification request, we just want to use the normal rules + if (RequestAction.WRITE.equals(action) && isRestricted()) { + final Set<Authorizable> restrictedComponentsAuthorizables = RestrictedComponentsAuthorizableFactory.getRestrictedComponentsAuthorizable(getComponentClass()); + + for (final Authorizable restrictedComponentsAuthorizable : restrictedComponentsAuthorizables) { + restrictedComponentsAuthorizable.authorize(authorizer, RequestAction.WRITE, user, resourceContext); + } + } + + // defer to the base authorization check + ComponentAuthorizable.super.authorize(authorizer, action, user, resourceContext); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java deleted file mode 100644 index 2a8f6a2..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -import java.net.URL; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.authorization.AccessDeniedException; -import org.apache.nifi.authorization.AuthorizationResult; -import org.apache.nifi.authorization.AuthorizationResult.Result; -import org.apache.nifi.authorization.Authorizer; -import org.apache.nifi.authorization.RequestAction; -import org.apache.nifi.authorization.resource.Authorizable; -import org.apache.nifi.authorization.resource.ComponentAuthorizable; -import org.apache.nifi.authorization.resource.RestrictedComponentsAuthorizableFactory; -import org.apache.nifi.authorization.user.NiFiUser; -import org.apache.nifi.bundle.BundleCoordinate; -import org.apache.nifi.components.ConfigurableComponent; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.registry.ComponentVariableRegistry; - -public interface ConfiguredComponent extends ComponentAuthorizable { - - @Override - public String getIdentifier(); - - public String getName(); - - public void setName(String name); - - public String getAnnotationData(); - - public void setAnnotationData(String data); - - public default void setProperties(Map<String, String> properties) { - setProperties(properties, false); - } - - public void setProperties(Map<String, String> properties, boolean allowRemovalOfRequiredProperties); - - public Map<PropertyDescriptor, String> getProperties(); - - public String getProperty(final PropertyDescriptor property); - - boolean isValid(); - - void reload(Set<URL> additionalUrls) throws Exception; - - void refreshProperties(); - - Set<URL> getAdditionalClasspathResources(List<PropertyDescriptor> propertyDescriptors); - - BundleCoordinate getBundleCoordinate(); - - ConfigurableComponent getComponent(); - - TerminationAwareLogger getLogger(); - - boolean isExtensionMissing(); - - void setExtensionMissing(boolean extensionMissing); - - void verifyCanUpdateBundle(BundleCoordinate bundleCoordinate) throws IllegalStateException; - - void reloadAdditionalResourcesIfNecessary(); - - /** - * @return the any validation errors for this connectable - */ - Collection<ValidationResult> getValidationErrors(); - - /** - * @return the type of the component. I.e., the class name of the implementation - */ - String getComponentType(); - - /** - * @return the class of the underlying - */ - Class<?> getComponentClass(); - - /** - * @return the Canonical Class Name of the component - */ - String getCanonicalClassName(); - - /** - * @return whether or not the underlying implementation has any restrictions - */ - boolean isRestricted(); - - /** - * @return whether or not the underlying implementation is deprecated - */ - boolean isDeprecated(); - - /** - * @return the variable registry for this component - */ - ComponentVariableRegistry getVariableRegistry(); - - @Override - default AuthorizationResult checkAuthorization(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) { - // if this is a modification request and the reporting task is restricted ensure the user has elevated privileges. if this - // is not a modification request, we just want to use the normal rules - if (RequestAction.WRITE.equals(action) && isRestricted()) { - final Set<Authorizable> restrictedComponentsAuthorizables = RestrictedComponentsAuthorizableFactory.getRestrictedComponentsAuthorizable(getComponentClass()); - - for (final Authorizable restrictedComponentsAuthorizable : restrictedComponentsAuthorizables) { - final AuthorizationResult result = restrictedComponentsAuthorizable.checkAuthorization(authorizer, RequestAction.WRITE, user, resourceContext); - if (Result.Denied.equals(result.getResult())) { - return result; - } - } - } - - // defer to the base authorization check - return ComponentAuthorizable.super.checkAuthorization(authorizer, action, user, resourceContext); - } - - @Override - default void authorize(Authorizer authorizer, RequestAction action, NiFiUser user, Map<String, String> resourceContext) throws AccessDeniedException { - // if this is a modification request and the reporting task is restricted ensure the user has elevated privileges. if this - // is not a modification request, we just want to use the normal rules - if (RequestAction.WRITE.equals(action) && isRestricted()) { - final Set<Authorizable> restrictedComponentsAuthorizables = RestrictedComponentsAuthorizableFactory.getRestrictedComponentsAuthorizable(getComponentClass()); - - for (final Authorizable restrictedComponentsAuthorizable : restrictedComponentsAuthorizables) { - restrictedComponentsAuthorizable.authorize(authorizer, RequestAction.WRITE, user, resourceContext); - } - } - - // defer to the base authorization check - ComponentAuthorizable.super.authorize(authorizer, action, user, resourceContext); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java index de005e4..e680dcd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java @@ -83,15 +83,35 @@ public interface ProcessScheduler { */ void terminateProcessor(ProcessorNode procNode); - /* - * Notifies the schedule that the given processor is being removed so the scheduler may clean up any resources - * related to the given processor. + /** + * Notifies the scheduler that the given Processor has been removed from the flow * - * @param procNode the processor node being removed + * @param procNode the processor being removed */ void onProcessorRemoved(ProcessorNode procNode); /** + * Notifies the scheduler that the given port has been removed from the flow + * + * @param port the port being removed + */ + void onPortRemoved(Port port); + + /** + * Notifies the scheduler that the given funnel has been removed from the flow + * + * @param funnel the funnel being removed + */ + void onFunnelRemoved(Funnel funnel); + + /** + * Notifies the scheduler that the given reporting task has been removed from the flow + * + * @param reportingTask the reporting task being removed + */ + void onReportingTaskRemoved(ReportingTaskNode reportingTask); + + /** * Starts scheduling the given Port to run. If the Port is already scheduled * to run, does nothing. * http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index ebc195d..a5fe9b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.components.validation.ValidationTrigger; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.scheduling.LifecycleState; import org.apache.nifi.controller.scheduling.SchedulingAgent; @@ -40,7 +41,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class ProcessorNode extends AbstractConfiguredComponent implements Connectable { +public abstract class ProcessorNode extends AbstractComponentNode implements Connectable { private static final Logger logger = LoggerFactory.getLogger(ProcessorNode.class); @@ -49,8 +50,8 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen public ProcessorNode(final String id, final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider, final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, - final ReloadComponent reloadComponent, final boolean isExtensionMissing) { - super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, isExtensionMissing); + final ReloadComponent reloadComponent, final ValidationTrigger validationTrigger, final boolean isExtensionMissing) { + super(id, validationContextFactory, serviceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, validationTrigger, isExtensionMissing); this.scheduledState = new AtomicReference<>(ScheduledState.STOPPED); } @@ -82,9 +83,6 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen */ public abstract int getTerminatedThreadCount(); - @Override - public abstract boolean isValid(); - public abstract void setBulletinLevel(LogLevel bulletinLevel); public abstract LogLevel getBulletinLevel(); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java index 22cc3b5..09ae5fb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java @@ -24,7 +24,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy; import java.util.Set; import java.util.concurrent.TimeUnit; -public interface ReportingTaskNode extends ConfiguredComponent { +public interface ReportingTaskNode extends ComponentNode { void setSchedulingStrategy(SchedulingStrategy schedulingStrategy); http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java index f0771be..5f43aab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Template.java @@ -44,6 +44,27 @@ public class Template implements ComponentAuthorizable { return procGroup == null ? null : procGroup.getIdentifier(); } + @Override + public int hashCode() { + return 41 + 11 * getIdentifier().hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (!(obj instanceof Template)) { + return false; + } + + final Template other = (Template) obj; + return getIdentifier().equals(other.getIdentifier()); + } + /** * Returns a TemplateDTO object that describes the contents of this Template * http://git-wip-us.apache.org/repos/asf/nifi/blob/604656fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceDisabledException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceDisabledException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceDisabledException.java new file mode 100644 index 0000000..99dead2 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceDisabledException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.service; + +public class ControllerServiceDisabledException extends IllegalStateException { + private final String serviceId; + + public ControllerServiceDisabledException(final String serviceId, final String message) { + super(message); + this.serviceId = serviceId; + } + + public String getControllerServiceId() { + return serviceId; + } +}
