exceptionfactory commented on code in PR #7191: URL: https://github.com/apache/nifi/pull/7191#discussion_r1184169425
########## nifi-api/src/main/java/org/apache/nifi/flowanalysis/ComponentAnalysisResult.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; + +/** + * Holds information about a component violating a {@link FlowAnalysisRule} + */ +public class ComponentAnalysisResult extends AbstractAnalysisResult { + private ComponentAnalysisResult(String issueId, String message, String explanation) { + super(issueId, message, explanation); + } + + /** + * @param issueId A rule-defined id that corresponds to a unique type of issue recognized by the rule. + * Newer analysis runs may produce a result with the same issueId in which case the old one will + * be overwritten (or recreated if it is the same in other aspects as well). + * However, if the previous result was disabled the new one will be disabled as well. + * @param message A violation message + * @return a new result instance + */ + public static ComponentAnalysisResult newResult(String issueId, String message) { Review Comment: The static methods do not follow the general pattern of of `nifi-api` model classes. Recommend creating an inner `Builder` class instead to follow the general pattern. ########## nifi-api/src/main/java/org/apache/nifi/flowanalysis/GroupAnalysisResult.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; + +import org.apache.nifi.flow.VersionedComponent; + +import java.util.Optional; +import java.util.StringJoiner; + +/** + * Holds information about a {@link FlowAnalysisRule} violation after analyzing (a part of) the flow, represented by a process group. + * One such analysis can result in multiple instances of this class. + */ +public class GroupAnalysisResult extends AbstractAnalysisResult { + private final Optional<VersionedComponent> component; + + private GroupAnalysisResult(String issueId, String message, String explanation) { + this(issueId, message, explanation, Optional.empty()); + } + + private GroupAnalysisResult(VersionedComponent component, String issueId, String message, String explanation) { + this(issueId, message, explanation, Optional.of(component)); + } + + private GroupAnalysisResult(String issueId, String message, String explanation, Optional<VersionedComponent> component) { + super(issueId, message, explanation); + this.component = component; + } + + /** + * Create a new analysis result tied to the currently analyzed process group + * + * @param issueId A rule-defined id that corresponds to a unique type of issue recognized by the rule. + * Newer analysis runs may produce a result with the same issueId in which case the old one will + * be overwritten (or recreated if it is the same in other aspects as well). + * However, if the previous result was disabled the new one will be disabled as well. + * @param message A violation message + * @return a new analysis result instance tied to the currently analyzed process group + */ + public static GroupAnalysisResult newResultForGroup(String issueId, String message) { Review Comment: As noted on the ComponentAnalysisResult, recommend creating a static inner `Builder` class to support different construction options. ########## nifi-framework-api/src/main/java/org/apache/nifi/web/UiExtensionType.java: ########## @@ -29,6 +29,7 @@ public enum UiExtensionType { ProcessorConfiguration, ControllerServiceConfiguration, ReportingTaskConfiguration, + FlowAnalysisRuleConfiguration, Review Comment: Is it necessary to support custom UI extensions for Flow Analysis Rules? ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flowanalysis/AbstractFlowAnalysisRuleNode.java: ########## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.flowanalysis; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.AbstractComponentNode; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.FlowAnalysisRuleNode; +import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.controller.ReloadComponent; +import org.apache.nifi.controller.TerminationAwareLogger; +import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.StandardConfigurationContext; +import org.apache.nifi.flowanalysis.FlowAnalysisRule; +import org.apache.nifi.flowanalysis.FlowAnalysisRuleState; +import org.apache.nifi.flowanalysis.EnforcementPolicy; +import org.apache.nifi.flowanalysis.VerifiableFlowAnalysisRule; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.InstanceClassLoader; +import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.parameter.ParameterLookup; +import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.util.CharacterFilterUtils; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.ReflectionUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.apache.nifi.validation.RuleViolationsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractFlowAnalysisRuleNode extends AbstractComponentNode implements FlowAnalysisRuleNode { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFlowAnalysisRuleNode.class); + + private final AtomicReference<FlowAnalysisRuleDetails> flowAnalysisRuleRef; + private final ControllerServiceLookup serviceLookup; + private final RuleViolationsManager ruleViolationsManager; + + private volatile String comment; + private EnforcementPolicy enforcementPolicy; + + private volatile FlowAnalysisRuleState state = FlowAnalysisRuleState.DISABLED; + + public AbstractFlowAnalysisRuleNode(final LoggableComponent<FlowAnalysisRule> flowAnalysisRule, final String id, + final ControllerServiceProvider controllerServiceProvider, + final ValidationContextFactory validationContextFactory, + final RuleViolationsManager ruleViolationsManager, + final ComponentVariableRegistry variableRegistry, + final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger) { + + this(flowAnalysisRule, id, controllerServiceProvider, validationContextFactory, ruleViolationsManager, + flowAnalysisRule.getComponent().getClass().getSimpleName(), flowAnalysisRule.getComponent().getClass().getCanonicalName(), + variableRegistry, reloadComponent, extensionManager, validationTrigger, false); + } + + + public AbstractFlowAnalysisRuleNode(final LoggableComponent<FlowAnalysisRule> flowAnalysisRule, final String id, final ControllerServiceProvider controllerServiceProvider, + final ValidationContextFactory validationContextFactory, final RuleViolationsManager ruleViolationsManager, + final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, + final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger, + final boolean isExtensionMissing) { + + super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, + extensionManager, validationTrigger, isExtensionMissing); + this.flowAnalysisRuleRef = new AtomicReference<>(new FlowAnalysisRuleDetails(flowAnalysisRule)); + this.serviceLookup = controllerServiceProvider; + this.ruleViolationsManager = ruleViolationsManager; + this.enforcementPolicy = EnforcementPolicy.WARN; + } + + @Override + public EnforcementPolicy getEnforcementPolicy() { + return enforcementPolicy; + } + + @Override + public void setEnforcementPolicy(EnforcementPolicy enforcementPolicy) { + this.enforcementPolicy = enforcementPolicy; + } + + @Override + public ConfigurableComponent getComponent() { + return flowAnalysisRuleRef.get().getFlowAnalysisRule(); + } + + @Override + public BundleCoordinate getBundleCoordinate() { + return flowAnalysisRuleRef.get().getBundleCoordinate(); + } + + @Override + public TerminationAwareLogger getLogger() { + return flowAnalysisRuleRef.get().getComponentLog(); + } + + @Override + public FlowAnalysisRule getFlowAnalysisRule() { + return flowAnalysisRuleRef.get().getFlowAnalysisRule(); + } + + @Override + public void setFlowAnalysisRule(final LoggableComponent<FlowAnalysisRule> flowAnalysisRule) { + if (isEnabled()) { + throw new IllegalStateException("Cannot modify Flow Analysis Rule configuration while it is enabled"); + } + this.flowAnalysisRuleRef.set(new FlowAnalysisRuleDetails(flowAnalysisRule)); + } + + @Override + public void reload(final Set<URL> additionalUrls) throws FlowAnalysisRuleInstantiationException { + if (isEnabled()) { + throw new IllegalStateException("Cannot reload Flow Analysis Rule while it is enabled"); + } + String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey()); + setAdditionalResourcesFingerprint(additionalResourcesFingerprint); + getReloadComponent().reload(this, getCanonicalClassName(), getBundleCoordinate(), additionalUrls); + } + + @Override + public boolean isEnabled() { + return FlowAnalysisRuleState.ENABLED.equals(state); + } + + @Override + public boolean isValidationNecessary() { + return !isEnabled() || getValidationStatus() != ValidationStatus.VALID; + } + + @Override + public ConfigurationContext getConfigurationContext() { + return new StandardConfigurationContext(this, serviceLookup, null, getVariableRegistry()); + } + + @Override + public void verifyModifiable() throws IllegalStateException { + if (isEnabled()) { + throw new IllegalStateException("Cannot modify Flow Analysis Rule while it is enabled"); + } + } + + @Override + public FlowAnalysisRuleState getState() { + return state; + } + + @Override + public String getComments() { + return comment; + } + + @Override + public void setComments(final String comment) { + this.comment = CharacterFilterUtils.filterInvalidXmlCharacters(comment); + } + + @Override + public void verifyCanDelete() { + if (isEnabled()) { + throw new IllegalStateException("Cannot delete " + getFlowAnalysisRule().getIdentifier() + " because it is enabled"); + } + } + + @Override + public void verifyCanDisable() { + if (!isEnabled()) { + throw new IllegalStateException("Cannot disable " + getFlowAnalysisRule().getIdentifier() + " because it is already disabled"); + } + } + + @Override + public void verifyCanEnable() { + if (getValidationStatus() == ValidationStatus.INVALID) { + throw new IllegalStateException("Cannot enable " + getFlowAnalysisRule().getIdentifier() + " because it is in INVALID status"); + } + + if (isEnabled()) { + throw new IllegalStateException("Cannot enable " + getFlowAnalysisRule().getIdentifier() + " because it is not disabled"); + } + } + + @Override + public void verifyCanEnable(final Set<ControllerServiceNode> ignoredServices) { + if (isEnabled()) { + throw new IllegalStateException("Cannot enable " + getFlowAnalysisRule().getIdentifier() + " because it is not disabled"); + } + + final Collection<ValidationResult> validationResults = getValidationErrors(ignoredServices); + if (!validationResults.isEmpty()) { + throw new IllegalStateException(this + " cannot be enabled because it is not currently valid"); + } + } + + @Override + public void verifyCanUpdate() { + if (isEnabled()) { + throw new IllegalStateException("Cannot update " + getFlowAnalysisRule().getIdentifier() + " because it is currently enabled"); + } + } + + @Override + public void verifyCanClearState() { + verifyCanUpdate(); + } + + @Override + public String getProcessGroupIdentifier() { + return null; + } + + @Override + public ParameterLookup getParameterLookup() { + return ParameterLookup.EMPTY; + } + + @Override + public String toString() { + FlowAnalysisRule flowAnalysisRule = flowAnalysisRuleRef.get().getFlowAnalysisRule(); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), flowAnalysisRule.getClass(), flowAnalysisRule.getIdentifier())) { + return getFlowAnalysisRule().toString(); + } + } + + @Override + public void enable() { + verifyCanEnable(); + setState(FlowAnalysisRuleState.ENABLED, OnEnabled.class); + } + + @Override + public void disable() { + verifyCanDisable(); + setState(FlowAnalysisRuleState.DISABLED, OnDisabled.class); + + ruleViolationsManager.removeRuleViolationsForRule(getIdentifier()); + ruleViolationsManager.cleanUp(); + } + + private void setState(FlowAnalysisRuleState newState, Class<? extends Annotation> annotation) { + final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceLookup, null, getVariableRegistry()); + + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getFlowAnalysisRule().getClass(), getIdentifier())) { + ReflectionUtils.invokeMethodsWithAnnotation(annotation, getFlowAnalysisRule(), configContext); + + this.state = newState; + + LOG.debug("Successfully {} {}", newState.toString().toLowerCase(), this); Review Comment: The `toLowerCase()` seems unnecessary: ```suggestion LOG.debug("Successfully {} {}", newState, this); ``` ########## nifi-api/src/main/java/org/apache/nifi/flowanalysis/FlowAnalysisRuleContext.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.VersionedControllerServiceLookup; + +import java.util.Map; + +/** + * This interface provides a bridge between the NiFi Framework and a + * {@link FlowAnalysisRule}. This context allows a FlowAnalysisRule to access + * configuration supplied by the user. + */ +public interface FlowAnalysisRuleContext extends PropertyContext { + /** + * @return the name of the rule that is being triggered + */ + String getRuleName(); + + /** + * @return a Map of all known {@link PropertyDescriptor}s to their + * configured properties. This Map will contain a <code>null</code> for any + * Property that has not been configured by the user, even if the + * PropertyDescriptor has a default value + */ + Map<PropertyDescriptor, String> getProperties(); + + /** + * @return the {@link VersionedControllerServiceLookup} which can be used to obtain + * Versioned Controller Services during flow analysis + */ + VersionedControllerServiceLookup getVersionedControllerServiceLookup(); + + /** + * @return the StateManager that can be used to store and retrieve state for this component + */ + StateManager getStateManager(); + + /** + * @return <code>true</code> if this instance of NiFi is configured to be part of a cluster, <code>false</code> + * if this instance of NiFi is a standalone instance + */ + boolean isClustered(); + + /** + * @return the currently configured maximum number of threads that can be + * used for executing processors at any given time. + */ + int getMaxTimerDrivenThreadCount(); Review Comment: Is it necessary to provide this method? It has a very narrow use case that does not seem generally applicable. ########## nifi-api/src/main/java/org/apache/nifi/flow/VersionedFlowAnalysisRule.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.flow; + +import io.swagger.annotations.ApiModelProperty; +import org.apache.nifi.flowanalysis.EnforcementPolicy; + +public class VersionedFlowAnalysisRule extends VersionedConfigurableExtension { + + private String annotationData; Review Comment: Is this `annotationData` property necessary? The ApiModelProperty comments mention a custom UI, but that does not seem necessary for Flow Analysis. ########## nifi-api/src/main/java/org/apache/nifi/flowanalysis/AbstractFlowAnalysisRule.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; + +import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.InitializationException; + +public abstract class AbstractFlowAnalysisRule extends AbstractConfigurableComponent implements FlowAnalysisRule { + private String identifier; + private String description; + + private ComponentLog logger; + + @Override + public void initialize(FlowAnalysisRuleInitializationContext config) throws InitializationException { Review Comment: ```suggestion public void initialize(FlowAnalysisRuleInitializationContext context) throws InitializationException { ``` ########## nifi-api/src/main/java/org/apache/nifi/flowanalysis/FlowAnalysisRule.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; + +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.reporting.InitializationException; + +import java.util.Collection; +import java.util.Collections; + +/** + * A single rule that can analyze components or a flow (represented by a process group) + */ +public interface FlowAnalysisRule extends ConfigurableComponent { + /** + * Provides the Flow Analysis Rule with access to objects that may be of use + * throughout its lifecycle + * + * @param config see {@link FlowAnalysisRuleInitializationContext} + * @throws org.apache.nifi.reporting.InitializationException if unable to initialize + */ + void initialize(FlowAnalysisRuleInitializationContext config) throws InitializationException; Review Comment: ```suggestion void initialize(FlowAnalysisRuleInitializationContext context) throws InitializationException; ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/analyzeflow/ruleimpl/DisallowComponentType.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.analyzeflow.ruleimpl; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedExtensionComponent; +import org.apache.nifi.flowanalysis.AbstractFlowAnalysisRule; +import org.apache.nifi.flowanalysis.ComponentAnalysisResult; +import org.apache.nifi.flowanalysis.FlowAnalysisRuleContext; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +@Tags({"component", "processor", "controller service", "type"}) +@CapabilityDescription("Produces rule violations for each component (i.e. processors or controller services) of a given type.") +public class DisallowComponentType extends AbstractFlowAnalysisRule { + public static final PropertyDescriptor COMPONENT_TYPE = new PropertyDescriptor.Builder() + .name("component-type") + .displayName("Component Type") + .description("Components of the given type will produce a rule violation (i.e. they shouldn't exist).") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue(null) + .build(); + + private final static List<PropertyDescriptor> propertyDescriptors; + + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(COMPONENT_TYPE); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public Collection<ComponentAnalysisResult> analyzeComponent(VersionedComponent component, FlowAnalysisRuleContext context) { + Collection<ComponentAnalysisResult> results = new HashSet<>(); + + String componentType = context.getProperty(COMPONENT_TYPE).getValue(); + + if (component instanceof VersionedExtensionComponent) { + VersionedExtensionComponent versionedExtensionComponent = (VersionedExtensionComponent) component; + + String encounteredComponentType = versionedExtensionComponent.getType(); + String encounteredSimpleComponentType = encounteredComponentType.substring(encounteredComponentType.lastIndexOf(".") + 1); + + if (encounteredComponentType.equals(componentType) || encounteredSimpleComponentType.equals(componentType)) { + ComponentAnalysisResult result = ComponentAnalysisResult.newResult( + "default", + "'" + componentType + "' is not allowed!" Review Comment: Exclamation marks should not be used in messages. ```suggestion "'" + componentType + "' is not allowed" ``` ########## nifi-docs/src/main/asciidoc/images/configure-flow-analysis-rule-settings.png: ########## Review Comment: This screenshot for `Rule Type` needs to be updated since `Recommendation` is not a valid option. ########## nifi-api/src/main/java/org/apache/nifi/flowanalysis/AbstractAnalysisResult.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; Review Comment: Instead of the compound word package name `flowanalysis`, I recommend either `analysis` or `flow.analysis`. ########## nifi-api/src/main/java/org/apache/nifi/flowanalysis/GroupAnalysisResult.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; + +import org.apache.nifi.flow.VersionedComponent; + +import java.util.Optional; +import java.util.StringJoiner; + +/** + * Holds information about a {@link FlowAnalysisRule} violation after analyzing (a part of) the flow, represented by a process group. + * One such analysis can result in multiple instances of this class. + */ +public class GroupAnalysisResult extends AbstractAnalysisResult { + private final Optional<VersionedComponent> component; + + private GroupAnalysisResult(String issueId, String message, String explanation) { Review Comment: In general, the `final` keyword should be used for all constructor arguments: ```suggestion private GroupAnalysisResult(final String issueId, final String message, final String explanation) { ``` ########## nifi-framework-api/src/main/java/org/apache/nifi/flowanalysis/AnalyzeFlowState.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Represents the state of an AnalyzeFlowRequest + */ +public enum AnalyzeFlowState { + FAILURE("Failed", true), + CANCELED("Canceled by user", true), + ANALYZING("Analyzing flow", false), + WAITING("Waiting for flow analyzer to become available", false), + COMPLETE("Completed successfully", true); Review Comment: ```suggestion COMPLETED("Completed successfully", true); ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/endpoints/AnalyzeFlowRequestEndpointMergerTest.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.flowanalysis.AnalyzeFlowState; +import org.apache.nifi.util.EqualsWrapper; +import org.apache.nifi.web.api.dto.AnalyzeFlowRequestDTO; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; Review Comment: These references should be updated to JUnit 5 ########## nifi-api/src/main/java/org/apache/nifi/flowanalysis/FlowAnalysisRuleContext.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.VersionedControllerServiceLookup; + +import java.util.Map; + +/** + * This interface provides a bridge between the NiFi Framework and a + * {@link FlowAnalysisRule}. This context allows a FlowAnalysisRule to access + * configuration supplied by the user. + */ +public interface FlowAnalysisRuleContext extends PropertyContext { + /** + * @return the name of the rule that is being triggered + */ + String getRuleName(); + + /** + * @return a Map of all known {@link PropertyDescriptor}s to their + * configured properties. This Map will contain a <code>null</code> for any + * Property that has not been configured by the user, even if the + * PropertyDescriptor has a default value + */ + Map<PropertyDescriptor, String> getProperties(); + + /** + * @return the {@link VersionedControllerServiceLookup} which can be used to obtain + * Versioned Controller Services during flow analysis + */ + VersionedControllerServiceLookup getVersionedControllerServiceLookup(); + + /** + * @return the StateManager that can be used to store and retrieve state for this component + */ + StateManager getStateManager(); + + /** + * @return <code>true</code> if this instance of NiFi is configured to be part of a cluster, <code>false</code> + * if this instance of NiFi is a standalone instance + */ + boolean isClustered(); + + /** + * @return the currently configured maximum number of threads that can be + * used for executing processors at any given time. + */ + int getMaxTimerDrivenThreadCount(); + + /** + * @return the ID of this node in the cluster, or <code>null</code> if either this node is not clustered or the Node Identifier + * has not yet been established + */ + String getClusterNodeIdentifier(); Review Comment: Recommend changing this interface to return `Optional<String>` instead of relying on documentation to indicate that is could be null. ```suggestion Optional<String> getClusterNodeIdentifier(); ``` ########## nifi-framework-api/src/main/java/org/apache/nifi/flowanalysis/AnalyzeFlowState.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; Review Comment: This package should be different from the `nifi-api` module package to differentiate it from the core API. ```suggestion package org.apache.nifi.framework.flow.analysis; ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/analyzeflow/AbstractFlowAnalysisIT.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.analyzeflow; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.FlowAnalysisRuleNode; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flowanalysis.FlowAnalysisRule; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.integration.FrameworkIntegrationTest; +import org.apache.nifi.integration.flowanalysis.DelegateFlowAnalysisRule; +import org.apache.nifi.integration.processors.NopProcessor; +import org.apache.nifi.nar.SystemBundle; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; +import org.junit.jupiter.api.BeforeEach; + +import java.util.Collection; +import java.util.Collections; +import java.util.UUID; + +public abstract class AbstractFlowAnalysisIT extends FrameworkIntegrationTest { Review Comment: Instead of introducing these framework integration tests, recommend creating tests in the `nifi-system-test-suite` module for automated execution. These integration tests are not run automatically and should be removed. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java: ########## @@ -519,6 +524,74 @@ public ReportingTaskNode createReportingTask(final String type, final String id, return taskNode; } + @Override + public FlowAnalysisRuleNode createFlowAnalysisRule( + String type, + String id, + BundleCoordinate bundleCoordinate, + Set<URL> additionalUrls, + boolean firstTimeAdded, + boolean register, + String classloaderIsolationKey + ) { + if (type == null || id == null || bundleCoordinate == null) { + throw new NullPointerException(); Review Comment: An empty `NullPointerException` usually connotes an error that should be corrected with better code handling. Recommend using `IllegalArgumentException` with a message, or `Objects.requireNonNull()` with a message. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java: ########## @@ -4597,6 +4611,246 @@ public Response deleteReplaceProcessGroupRequest( return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue()); } + // ------------- + // flow-analysis + // ------------- + + /** + * Submits a request to run a flow analysis. + * + * @param processGroupId The id of the process group representing (a part of) the flow to be analyzed + * @return An AnalyzeFlowRequestEntity + */ + @POST + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("flow-analysis/{processGroupId}") + @ApiOperation( + value = "Executes a flow analysis for components within a given process group", + response = AnalyzeFlowRequestEntity.class, + authorizations = { + @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response submitAnalyzeFlowRequest( + @ApiParam( + value = "The id of the process group representing (a part of) the flow to be analyzed.", + required = true + ) + @PathParam("processGroupId") + final String processGroupId + ) { + if (isReplicateRequest()) { + return replicate(HttpMethod.POST); + } + + NiFiUser user = NiFiUserUtils.getNiFiUser(); + + ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity(); + requestProcessGroupEntity.setId(processGroupId); + + return withWriteLock( + serviceFacade, + requestProcessGroupEntity, + lookup -> { + final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId); + processGroup.getAuthorizable().authorize(authorizer, RequestAction.READ, user); + }, + null, + (processGroupEntity) -> { + String analyzedGroupId = processGroupEntity.getId(); + + final String requestId = generateUuid(); + final AsynchronousWebRequest<String, Void> analyzeFlowAsyncWebRequest = new StandardAsynchronousWebRequest<>( + requestId, + analyzedGroupId, + analyzedGroupId, + user, + Collections.singletonList(new StandardUpdateStep("Analyze Process Group")) + ); + + // Submit the request to be performed in the background + final Consumer<AsynchronousWebRequest<String, Void>> analyzeFlowTask = asyncRequest -> { + try { + serviceFacade.analyzeProcessGroup(analyzedGroupId); + asyncRequest.markStepComplete(); + } catch (final Exception e) { + logger.error("Failed to run flow analysis on process group " + processGroupId, e); + asyncRequest.fail("Failed to run flow analysis on process group " + processGroupId + " due to " + e); + } + }; + flowAnalysisAsyncRequestManager.submitRequest( + FLOW_ANALYSIS_REQUEST_TYPE, + requestId, + analyzeFlowAsyncWebRequest, + analyzeFlowTask + ); + + return generateOkResponse(createAnalyzeFlowRequestEntity(analyzeFlowAsyncWebRequest, requestId)).build(); + } + ); + } + + /** + * Checks the status of an outstanding request for a flow analysis. + * + * @param requestId The id of flow analysis request + * @return An analyzeFlowRequestEntity + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("flow-analysis/{requestId}") + @ApiOperation( + value = "Gets the current status of a flow analysis request.", + response = AnalyzeFlowRequestEntity.class, + authorizations = { + @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getAnalyzeFlowRequest( + @ApiParam( + value = "The id of the process group representing (a part of) the flow to be analyzed.", + required = true + ) + @PathParam("requestId") + final String requestId + ) { + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // request manager will ensure that the current is the user that submitted this request + final AsynchronousWebRequest<String, Void> asyncRequest = + flowAnalysisAsyncRequestManager.getRequest(FLOW_ANALYSIS_REQUEST_TYPE, requestId, user); + + return generateOkResponse(createAnalyzeFlowRequestEntity(asyncRequest, requestId)).build(); + } + + /** + * Cancels the specified flow analysis request. + * + * @param httpServletRequest request + * @param requestId The id of the flow analysis request + * @return An analyzeFlowRequestEntity + */ + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("flow-analysis/{requestId}") Review Comment: ```suggestion @Path("{id}/flow-analysis-requests/{requestId}") ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowAnalysisRuleDTO.java: ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +@XmlType(name = "flowAnalysisRule") +public class FlowAnalysisRuleDTO extends ComponentDTO { + public static final String VALID = "VALID"; + public static final String INVALID = "INVALID"; + public static final String VALIDATING = "VALIDATING"; Review Comment: Recommend defining a separate `enum` for those values instead of public static members. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/FlowAnalysisRuleDTO.java: ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlType; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +@XmlType(name = "flowAnalysisRule") +public class FlowAnalysisRuleDTO extends ComponentDTO { + public static final String VALID = "VALID"; + public static final String INVALID = "INVALID"; + public static final String VALIDATING = "VALIDATING"; + + private String name; + private String type; + private BundleDTO bundle; + private String state; + private String comments; + private Boolean persistsState; + private Boolean restricted; + private Boolean deprecated; + private Boolean isExtensionMissing; + private Boolean multipleVersionsAvailable; + private Boolean supportsSensitiveDynamicProperties; + + private String enforcementPolicy; + + private Map<String, String> properties; + private Map<String, PropertyDescriptorDTO> descriptors; + private Set<String> sensitiveDynamicPropertyNames; + + private String customUiUrl; + private String annotationData; + + private Collection<String> validationErrors; + private String validationStatus; + + /** + * @return user-defined name of the flow analysis rule + */ + @ApiModelProperty( + value = "The name of the flow analysis rule." + ) + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + /** + * @return user-defined comments for the flow analysis rule + */ + @ApiModelProperty( + value = "The comments of the flow analysis rule." + ) + public String getComments() { + return comments; + } + + public void setComments(String comments) { + this.comments = comments; + } + + /** + * @return type of flow analysis rule + */ + @ApiModelProperty( + value = "The fully qualified type of the flow analysis rule." + ) + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + /** + * The details of the artifact that bundled this flow analysis rule type. + * + * @return The bundle details + */ + @ApiModelProperty( + value = "The details of the artifact that bundled this flow analysis rule type." + ) + public BundleDTO getBundle() { + return bundle; + } + + public void setBundle(BundleDTO bundle) { + this.bundle = bundle; + } + /** + * @return whether this flow analysis rule persists state + */ + @ApiModelProperty( + value = "Whether the flow analysis rule persists state." + ) + public Boolean getPersistsState() { + return persistsState; + } + + public void setPersistsState(Boolean persistsState) { + this.persistsState = persistsState; + } + + /** + * @return whether this flow analysis rule requires elevated privileges + */ + @ApiModelProperty( + value = "Whether the flow analysis rule requires elevated privileges." + ) + public Boolean getRestricted() { + return restricted; + } + + public void setRestricted(Boolean restricted) { + this.restricted = restricted; + } + + /** + * @return Whether the flow analysis rule has been deprecated. + */ + @ApiModelProperty( + value = "Whether the flow analysis rule has been deprecated." + ) + public Boolean getDeprecated() { + return deprecated; + } + + public void setDeprecated(Boolean deprecated) { + this.deprecated = deprecated; + } + + /** + * @return whether the underlying extension is missing + */ + @ApiModelProperty( + value = "Whether the underlying extension is missing." + ) + public Boolean getExtensionMissing() { + return isExtensionMissing; + } + + public void setExtensionMissing(Boolean extensionMissing) { + isExtensionMissing = extensionMissing; + } + + /** + * @return whether this flow analysis rule has multiple versions available + */ + @ApiModelProperty( + value = "Whether the flow analysis rule has multiple versions available." + ) + public Boolean getMultipleVersionsAvailable() { + return multipleVersionsAvailable; + } + + public void setMultipleVersionsAvailable(Boolean multipleVersionsAvailable) { + this.multipleVersionsAvailable = multipleVersionsAvailable; + } + + /** + * @return whether this flow analysis rule supports sensitive dynamic properties + */ + @ApiModelProperty( + value = "Whether the flow analysis rule supports sensitive dynamic properties." + ) + public Boolean getSupportsSensitiveDynamicProperties() { + return supportsSensitiveDynamicProperties; + } + + public void setSupportsSensitiveDynamicProperties(final Boolean supportsSensitiveDynamicProperties) { + this.supportsSensitiveDynamicProperties = supportsSensitiveDynamicProperties; + } + + /** + * @return current scheduling state of the flow analysis rule + */ + @ApiModelProperty( + value = "The state of the reporting task.", + allowableValues = "RUNNING, STOPPED, DISABLED" Review Comment: This appears to be copied and pasted from Reporting Task definitions. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java: ########## @@ -4597,6 +4611,246 @@ public Response deleteReplaceProcessGroupRequest( return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue()); } + // ------------- + // flow-analysis + // ------------- + + /** + * Submits a request to run a flow analysis. + * + * @param processGroupId The id of the process group representing (a part of) the flow to be analyzed + * @return An AnalyzeFlowRequestEntity + */ + @POST + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("flow-analysis/{processGroupId}") + @ApiOperation( + value = "Executes a flow analysis for components within a given process group", + response = AnalyzeFlowRequestEntity.class, + authorizations = { + @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response submitAnalyzeFlowRequest( + @ApiParam( + value = "The id of the process group representing (a part of) the flow to be analyzed.", + required = true + ) + @PathParam("processGroupId") + final String processGroupId + ) { + if (isReplicateRequest()) { + return replicate(HttpMethod.POST); + } + + NiFiUser user = NiFiUserUtils.getNiFiUser(); + + ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity(); + requestProcessGroupEntity.setId(processGroupId); + + return withWriteLock( + serviceFacade, + requestProcessGroupEntity, + lookup -> { + final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId); + processGroup.getAuthorizable().authorize(authorizer, RequestAction.READ, user); + }, + null, + (processGroupEntity) -> { + String analyzedGroupId = processGroupEntity.getId(); + + final String requestId = generateUuid(); + final AsynchronousWebRequest<String, Void> analyzeFlowAsyncWebRequest = new StandardAsynchronousWebRequest<>( + requestId, + analyzedGroupId, + analyzedGroupId, + user, + Collections.singletonList(new StandardUpdateStep("Analyze Process Group")) + ); + + // Submit the request to be performed in the background + final Consumer<AsynchronousWebRequest<String, Void>> analyzeFlowTask = asyncRequest -> { + try { + serviceFacade.analyzeProcessGroup(analyzedGroupId); + asyncRequest.markStepComplete(); + } catch (final Exception e) { + logger.error("Failed to run flow analysis on process group " + processGroupId, e); Review Comment: ```suggestion logger.error("Failed to run flow analysis on process group {}", processGroupId, e); ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flowanalysis/AbstractFlowAnalysisRuleNode.java: ########## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.flowanalysis; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.AbstractComponentNode; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.FlowAnalysisRuleNode; +import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.controller.ReloadComponent; +import org.apache.nifi.controller.TerminationAwareLogger; +import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.StandardConfigurationContext; +import org.apache.nifi.flowanalysis.FlowAnalysisRule; +import org.apache.nifi.flowanalysis.FlowAnalysisRuleState; +import org.apache.nifi.flowanalysis.EnforcementPolicy; +import org.apache.nifi.flowanalysis.VerifiableFlowAnalysisRule; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.InstanceClassLoader; +import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.parameter.ParameterLookup; +import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.util.CharacterFilterUtils; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.ReflectionUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.apache.nifi.validation.RuleViolationsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractFlowAnalysisRuleNode extends AbstractComponentNode implements FlowAnalysisRuleNode { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFlowAnalysisRuleNode.class); Review Comment: This static `LOG` should be replaces with a non-static reference that class `getLogger(getClass())` so that the Logger instance references the concrete implementation class instead of the abstract class. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java: ########## @@ -4597,6 +4611,246 @@ public Response deleteReplaceProcessGroupRequest( return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue()); } + // ------------- + // flow-analysis + // ------------- + + /** + * Submits a request to run a flow analysis. + * + * @param processGroupId The id of the process group representing (a part of) the flow to be analyzed + * @return An AnalyzeFlowRequestEntity + */ + @POST + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("flow-analysis/{processGroupId}") Review Comment: In the context of this Process Group Resource, the path should include the ID first, followed by `flow-analysis-requests`: ```suggestion @Path("{id}/flow-analysis-requests") ``` ########## nifi-api/src/main/java/org/apache/nifi/flowanalysis/EnforcementPolicy.java: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.flowanalysis; + +public enum EnforcementPolicy { Review Comment: Can this be moved to the `nifi-framework-api`? It is more of a configuration option as opposed to a developer-facing property. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flowanalysis/AbstractFlowAnalysisRuleNode.java: ########## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.flowanalysis; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.AbstractComponentNode; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.FlowAnalysisRuleNode; +import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.controller.ReloadComponent; +import org.apache.nifi.controller.TerminationAwareLogger; +import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.StandardConfigurationContext; +import org.apache.nifi.flowanalysis.FlowAnalysisRule; +import org.apache.nifi.flowanalysis.FlowAnalysisRuleState; +import org.apache.nifi.flowanalysis.EnforcementPolicy; +import org.apache.nifi.flowanalysis.VerifiableFlowAnalysisRule; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.InstanceClassLoader; +import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.parameter.ParameterLookup; +import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.util.CharacterFilterUtils; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.ReflectionUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.apache.nifi.validation.RuleViolationsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractFlowAnalysisRuleNode extends AbstractComponentNode implements FlowAnalysisRuleNode { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFlowAnalysisRuleNode.class); Review Comment: ```suggestion private final Logger log = LoggerFactory.getLogger(getClass()); ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java: ########## @@ -776,6 +779,23 @@ protected Collection<ValidationResult> computeValidationErrors(final ValidationC final Collection<ValidationResult> referencedServiceValidationResults = validateReferencedControllerServices(validationContext); validationResults.addAll(referencedServiceValidationResults); + analyze(); + + Optional.ofNullable(getValidationContextFactory().getRuleViolationsManager()) + .map(ruleViolationsManager -> ruleViolationsManager.getRuleViolationsForSubject(getIdentifier())) + .map(Collection::stream) + .ifPresent(ruleViolationStream -> ruleViolationStream + .filter(ruleViolation -> ruleViolation.getEnforcementPolicy() == EnforcementPolicy.ENFORCE) + .filter(RuleViolation::isEnabled) + .forEach(ruleViolation -> validationResults.add( + new ValidationResult.Builder() + .subject(getComponent().getClass().getSimpleName()) + .valid(false) + .explanation(ruleViolation.getViolationMessage()) + .build() + ) + )); Review Comment: It seems like Flow Analysis results should be separate from property validation failures. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalysisRuleInstantiationException.java: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.flowanalysis; + +public class FlowAnalysisRuleInstantiationException extends Exception { + + private static final long serialVersionUID = 189234789237L; Review Comment: Recommend using `1L` as a starting value instead of this random value. ```suggestion private static final long serialVersionUID = 1L; ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flowanalysis/AbstractFlowAnalysisRuleNode.java: ########## @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.flowanalysis; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.AbstractComponentNode; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.controller.FlowAnalysisRuleNode; +import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.controller.ReloadComponent; +import org.apache.nifi.controller.TerminationAwareLogger; +import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.StandardConfigurationContext; +import org.apache.nifi.flowanalysis.FlowAnalysisRule; +import org.apache.nifi.flowanalysis.FlowAnalysisRuleState; +import org.apache.nifi.flowanalysis.EnforcementPolicy; +import org.apache.nifi.flowanalysis.VerifiableFlowAnalysisRule; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.nar.InstanceClassLoader; +import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.parameter.ParameterLookup; +import org.apache.nifi.processor.SimpleProcessLogger; +import org.apache.nifi.registry.ComponentVariableRegistry; +import org.apache.nifi.util.CharacterFilterUtils; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.ReflectionUtils; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.apache.nifi.validation.RuleViolationsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractFlowAnalysisRuleNode extends AbstractComponentNode implements FlowAnalysisRuleNode { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFlowAnalysisRuleNode.class); + + private final AtomicReference<FlowAnalysisRuleDetails> flowAnalysisRuleRef; + private final ControllerServiceLookup serviceLookup; + private final RuleViolationsManager ruleViolationsManager; + + private volatile String comment; + private EnforcementPolicy enforcementPolicy; + + private volatile FlowAnalysisRuleState state = FlowAnalysisRuleState.DISABLED; + + public AbstractFlowAnalysisRuleNode(final LoggableComponent<FlowAnalysisRule> flowAnalysisRule, final String id, + final ControllerServiceProvider controllerServiceProvider, + final ValidationContextFactory validationContextFactory, + final RuleViolationsManager ruleViolationsManager, + final ComponentVariableRegistry variableRegistry, + final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger) { + + this(flowAnalysisRule, id, controllerServiceProvider, validationContextFactory, ruleViolationsManager, + flowAnalysisRule.getComponent().getClass().getSimpleName(), flowAnalysisRule.getComponent().getClass().getCanonicalName(), + variableRegistry, reloadComponent, extensionManager, validationTrigger, false); + } + + + public AbstractFlowAnalysisRuleNode(final LoggableComponent<FlowAnalysisRule> flowAnalysisRule, final String id, final ControllerServiceProvider controllerServiceProvider, + final ValidationContextFactory validationContextFactory, final RuleViolationsManager ruleViolationsManager, + final String componentType, final String componentCanonicalClass, final ComponentVariableRegistry variableRegistry, + final ReloadComponent reloadComponent, final ExtensionManager extensionManager, final ValidationTrigger validationTrigger, + final boolean isExtensionMissing) { + + super(id, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, reloadComponent, + extensionManager, validationTrigger, isExtensionMissing); + this.flowAnalysisRuleRef = new AtomicReference<>(new FlowAnalysisRuleDetails(flowAnalysisRule)); + this.serviceLookup = controllerServiceProvider; + this.ruleViolationsManager = ruleViolationsManager; + this.enforcementPolicy = EnforcementPolicy.WARN; + } + + @Override + public EnforcementPolicy getEnforcementPolicy() { + return enforcementPolicy; + } + + @Override + public void setEnforcementPolicy(EnforcementPolicy enforcementPolicy) { + this.enforcementPolicy = enforcementPolicy; + } + + @Override + public ConfigurableComponent getComponent() { + return flowAnalysisRuleRef.get().getFlowAnalysisRule(); + } + + @Override + public BundleCoordinate getBundleCoordinate() { + return flowAnalysisRuleRef.get().getBundleCoordinate(); + } + + @Override + public TerminationAwareLogger getLogger() { + return flowAnalysisRuleRef.get().getComponentLog(); + } + + @Override + public FlowAnalysisRule getFlowAnalysisRule() { + return flowAnalysisRuleRef.get().getFlowAnalysisRule(); + } + + @Override + public void setFlowAnalysisRule(final LoggableComponent<FlowAnalysisRule> flowAnalysisRule) { + if (isEnabled()) { + throw new IllegalStateException("Cannot modify Flow Analysis Rule configuration while it is enabled"); + } + this.flowAnalysisRuleRef.set(new FlowAnalysisRuleDetails(flowAnalysisRule)); + } + + @Override + public void reload(final Set<URL> additionalUrls) throws FlowAnalysisRuleInstantiationException { + if (isEnabled()) { + throw new IllegalStateException("Cannot reload Flow Analysis Rule while it is enabled"); + } + String additionalResourcesFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey()); + setAdditionalResourcesFingerprint(additionalResourcesFingerprint); + getReloadComponent().reload(this, getCanonicalClassName(), getBundleCoordinate(), additionalUrls); + } + + @Override + public boolean isEnabled() { + return FlowAnalysisRuleState.ENABLED.equals(state); + } + + @Override + public boolean isValidationNecessary() { + return !isEnabled() || getValidationStatus() != ValidationStatus.VALID; + } + + @Override + public ConfigurationContext getConfigurationContext() { + return new StandardConfigurationContext(this, serviceLookup, null, getVariableRegistry()); + } + + @Override + public void verifyModifiable() throws IllegalStateException { + if (isEnabled()) { + throw new IllegalStateException("Cannot modify Flow Analysis Rule while it is enabled"); + } + } + + @Override + public FlowAnalysisRuleState getState() { + return state; + } + + @Override + public String getComments() { + return comment; + } + + @Override + public void setComments(final String comment) { + this.comment = CharacterFilterUtils.filterInvalidXmlCharacters(comment); + } + + @Override + public void verifyCanDelete() { + if (isEnabled()) { + throw new IllegalStateException("Cannot delete " + getFlowAnalysisRule().getIdentifier() + " because it is enabled"); + } + } + + @Override + public void verifyCanDisable() { + if (!isEnabled()) { + throw new IllegalStateException("Cannot disable " + getFlowAnalysisRule().getIdentifier() + " because it is already disabled"); + } + } + + @Override + public void verifyCanEnable() { + if (getValidationStatus() == ValidationStatus.INVALID) { + throw new IllegalStateException("Cannot enable " + getFlowAnalysisRule().getIdentifier() + " because it is in INVALID status"); + } + + if (isEnabled()) { + throw new IllegalStateException("Cannot enable " + getFlowAnalysisRule().getIdentifier() + " because it is not disabled"); + } + } + + @Override + public void verifyCanEnable(final Set<ControllerServiceNode> ignoredServices) { + if (isEnabled()) { + throw new IllegalStateException("Cannot enable " + getFlowAnalysisRule().getIdentifier() + " because it is not disabled"); + } + + final Collection<ValidationResult> validationResults = getValidationErrors(ignoredServices); + if (!validationResults.isEmpty()) { + throw new IllegalStateException(this + " cannot be enabled because it is not currently valid"); + } + } + + @Override + public void verifyCanUpdate() { + if (isEnabled()) { + throw new IllegalStateException("Cannot update " + getFlowAnalysisRule().getIdentifier() + " because it is currently enabled"); + } + } + + @Override + public void verifyCanClearState() { + verifyCanUpdate(); + } + + @Override + public String getProcessGroupIdentifier() { + return null; + } + + @Override + public ParameterLookup getParameterLookup() { + return ParameterLookup.EMPTY; + } + + @Override + public String toString() { + FlowAnalysisRule flowAnalysisRule = flowAnalysisRuleRef.get().getFlowAnalysisRule(); + try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(getExtensionManager(), flowAnalysisRule.getClass(), flowAnalysisRule.getIdentifier())) { + return getFlowAnalysisRule().toString(); + } + } + + @Override + public void enable() { + verifyCanEnable(); + setState(FlowAnalysisRuleState.ENABLED, OnEnabled.class); + } + + @Override + public void disable() { + verifyCanDisable(); + setState(FlowAnalysisRuleState.DISABLED, OnDisabled.class); + + ruleViolationsManager.removeRuleViolationsForRule(getIdentifier()); + ruleViolationsManager.cleanUp(); + } + + private void setState(FlowAnalysisRuleState newState, Class<? extends Annotation> annotation) { + final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceLookup, null, getVariableRegistry()); + + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getFlowAnalysisRule().getClass(), getIdentifier())) { + ReflectionUtils.invokeMethodsWithAnnotation(annotation, getFlowAnalysisRule(), configContext); + + this.state = newState; + + LOG.debug("Successfully {} {}", newState.toString().toLowerCase(), this); + } catch (Exception e) { + final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; + + final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), getFlowAnalysisRule()); + + componentLog.error("Failed to invoke {} method due to {}", annotation.getSimpleName(), cause); Review Comment: The `due to {}` placeholder portion should be removed since exceptions will be logged separate with the stack trace. ```suggestion componentLog.error("Failed to invoke method [{}]", annotation.getSimpleName(), cause); ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java: ########## @@ -1156,11 +1214,14 @@ public void trigger(final ComponentNode component) { } }; + new TriggerFlowAnalysisTask(flowAnalyzer, rootProcessGroupSupplier).run(); new TriggerValidationTask(flowManager, triggerIfValidating).run(); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); LOG.info("Performed initial validation of all components in {} milliseconds", millis); + // Trigger flow analysis to occur every 5 minutes. + flowAnalysisThreadPool.scheduleWithFixedDelay(new TriggerFlowAnalysisTask(flowManager.getFlowAnalyzer(), rootProcessGroupSupplier), 5, 5, TimeUnit.MINUTES); Review Comment: With the potential impact on performance, this interval should be promoted to a configurable value in nifi.properties. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flowanalysis/AnalyzeFlowRequest.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.flowanalysis; + +import org.apache.nifi.flowanalysis.AnalyzeFlowState; +import org.apache.nifi.flowanalysis.AnalyzeFlowStatus; + +public class AnalyzeFlowRequest implements AnalyzeFlowStatus { + private final String processGroupId; + + private final long submissionTime = System.currentTimeMillis(); Review Comment: Recommend aligning the property name with the accessor name: ```suggestion private final long requestSubmissionTime = System.currentTimeMillis(); ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/validation/RuleViolation.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.validation; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.nifi.flowanalysis.EnforcementPolicy; + +import java.util.StringJoiner; + +/** + * A result of a rule violation produced during a flow analysis. + * Violations produced by previous analysis runs may be overridden by new ones. + * A violation is identified by the scope, subjectId, ruleId and issueId properties. + */ +public class RuleViolation { + private final EnforcementPolicy enforcementPolicy; + private final String scope; + private final String subjectId; + private final String subjectDisplayName; + private final String groupId; + private final String ruleId; + private final String issueId; + private final String violationMessage; + private final String violationExplanation; + + private boolean enabled; Review Comment: The `enabled` property seems unclear in this context. If a Flow Analysis Rule is disabled, it should not generate Rule Violations, correct? ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java: ########## @@ -403,10 +406,50 @@ ParameterContext createParameterContext(String id, String name, Map<String, Para * Controller Services * Templates * Reporting Tasks + * Flow Analysis Rules * Parameter Contexts * Flow Registries * * @throws IllegalStateException if any of the components is not in a state that it can be deleted. */ void purge(); + + // Flow Analysis + FlowAnalysisRuleNode createFlowAnalysisRule( + final String type, + final BundleCoordinate bundleCoordinate + ); + + FlowAnalysisRuleNode createFlowAnalysisRule( + final String type, + final BundleCoordinate bundleCoordinate, + final boolean firstTimeAdded + ); + + FlowAnalysisRuleNode createFlowAnalysisRule( + final String type, + final String id, + final BundleCoordinate bundleCoordinate, + final boolean firstTimeAdded + ); + + FlowAnalysisRuleNode createFlowAnalysisRule( + final String type, + final String id, + final BundleCoordinate bundleCoordinate, + final Set<URL> additionalUrls, + final boolean firstTimeAdded, + final boolean register, + final String classloaderIsolationKey + ); Review Comment: Is it necessary to define all of these overloaded methods as opposed to just one or two? ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/processor/StandardValidationContextFactory.java: ########## @@ -20,25 +20,53 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.PropertyConfiguration; import org.apache.nifi.controller.ValidationContextFactory; +import org.apache.nifi.controller.flowanalysis.FlowAnalyzer; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.validation.RuleViolationsManager; import java.util.Map; public class StandardValidationContextFactory implements ValidationContextFactory { private final ControllerServiceProvider serviceProvider; private final VariableRegistry variableRegistry; + private final RuleViolationsManager ruleViolationsManager; + private final FlowAnalyzer flowAnalyzer; - public StandardValidationContextFactory(final ControllerServiceProvider serviceProvider, final VariableRegistry variableRegistry) { + public StandardValidationContextFactory( + final ControllerServiceProvider serviceProvider, + final VariableRegistry variableRegistry + ) { + this(serviceProvider, variableRegistry, null, null); + } + + public StandardValidationContextFactory( + final ControllerServiceProvider serviceProvider, + final VariableRegistry variableRegistry, + final RuleViolationsManager ruleViolationsManager, + final FlowAnalyzer flowAnalyzer Review Comment: Should these properties be added to the ValidationContextFactory? Property Validation and Flow Analysis are different operations, so combining them does not seem like the right approach. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml: ########## @@ -323,6 +323,18 @@ </excludes> </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> Review Comment: Is this addition required? ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java: ########## @@ -432,10 +452,57 @@ public ControllerServiceNode buildControllerService() { } } + public FlowAnalysisRuleNode buildFlowAnalysisRuleNode() { + if (identifier == null) { + throw new IllegalStateException("FlowAnalysisRule ID must be specified"); + } + if (type == null) { + throw new IllegalStateException("FlowAnalysisRule Type must be specified"); + } + if (bundleCoordinate == null) { + throw new IllegalStateException("Bundle Coordinate must be specified"); + } + if (extensionManager == null) { + throw new IllegalStateException("Extension Manager must be specified"); + } + if (serviceProvider == null) { + throw new IllegalStateException("Controller Service Provider must be specified"); + } + if (nodeTypeProvider == null) { + throw new IllegalStateException("Node Type Provider must be specified"); + } + if (variableRegistry == null) { + throw new IllegalStateException("Variable Registry must be specified"); + } + if (reloadComponent == null) { + throw new IllegalStateException("Reload Component must be specified"); + } + if (flowController == null) { + throw new IllegalStateException("FlowController must be specified"); + } + + boolean creationSuccessful = true; + LoggableComponent<FlowAnalysisRule> loggableComponent; + try { + loggableComponent = createLoggableFlowAnalysisRule(); + } catch (final FlowAnalysisRuleInstantiationException rtie) { + logger.error("Could not create FlowAnalysisRule of type " + type + " for ID " + identifier + "; creating \"Ghost\" implementation", rtie); Review Comment: Placeholders should be used instead of string concatenation. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/analyzeflow/ruleimpl/DisallowComponentType.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.analyzeflow.ruleimpl; Review Comment: Package names should not include `impl`. As an extension component, this rule class should be moved to a separate NAR. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java: ########## @@ -4597,6 +4611,246 @@ public Response deleteReplaceProcessGroupRequest( return deleteFlowUpdateRequest("replace-requests", replaceRequestId, disconnectedNodeAcknowledged.booleanValue()); } + // ------------- + // flow-analysis + // ------------- + + /** + * Submits a request to run a flow analysis. + * + * @param processGroupId The id of the process group representing (a part of) the flow to be analyzed + * @return An AnalyzeFlowRequestEntity + */ + @POST + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("flow-analysis/{processGroupId}") + @ApiOperation( + value = "Executes a flow analysis for components within a given process group", + response = AnalyzeFlowRequestEntity.class, + authorizations = { + @Authorization(value = "Read - /process-groups/{uuid} - For this and all encapsulated process groups") + }) + @ApiResponses(value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + }) + public Response submitAnalyzeFlowRequest( + @ApiParam( + value = "The id of the process group representing (a part of) the flow to be analyzed.", + required = true + ) + @PathParam("processGroupId") + final String processGroupId + ) { + if (isReplicateRequest()) { + return replicate(HttpMethod.POST); + } + + NiFiUser user = NiFiUserUtils.getNiFiUser(); + + ProcessGroupEntity requestProcessGroupEntity = new ProcessGroupEntity(); + requestProcessGroupEntity.setId(processGroupId); + + return withWriteLock( + serviceFacade, + requestProcessGroupEntity, + lookup -> { + final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId); + processGroup.getAuthorizable().authorize(authorizer, RequestAction.READ, user); + }, + null, + (processGroupEntity) -> { + String analyzedGroupId = processGroupEntity.getId(); + + final String requestId = generateUuid(); + final AsynchronousWebRequest<String, Void> analyzeFlowAsyncWebRequest = new StandardAsynchronousWebRequest<>( + requestId, + analyzedGroupId, + analyzedGroupId, + user, + Collections.singletonList(new StandardUpdateStep("Analyze Process Group")) + ); + + // Submit the request to be performed in the background + final Consumer<AsynchronousWebRequest<String, Void>> analyzeFlowTask = asyncRequest -> { + try { + serviceFacade.analyzeProcessGroup(analyzedGroupId); + asyncRequest.markStepComplete(); + } catch (final Exception e) { + logger.error("Failed to run flow analysis on process group " + processGroupId, e); + asyncRequest.fail("Failed to run flow analysis on process group " + processGroupId + " due to " + e); + } + }; + flowAnalysisAsyncRequestManager.submitRequest( + FLOW_ANALYSIS_REQUEST_TYPE, + requestId, + analyzeFlowAsyncWebRequest, + analyzeFlowTask + ); + + return generateOkResponse(createAnalyzeFlowRequestEntity(analyzeFlowAsyncWebRequest, requestId)).build(); + } + ); + } + + /** + * Checks the status of an outstanding request for a flow analysis. + * + * @param requestId The id of flow analysis request + * @return An analyzeFlowRequestEntity + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("flow-analysis/{requestId}") Review Comment: This path overlaps with the `POST` request for Process Groups, perhaps changing the path to `flow-analysis-requests/{requestId}` would be better. ```suggestion @Path("{id}/flow-analysis-requests/{requestId}") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
