This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 88d434f9ec NIFI-12735 Adding the option to execute flow analysis 
before committing snapshot to registry
88d434f9ec is described below

commit 88d434f9eca41b939e2eb7213f45b8bb8ae00ed3
Author: Bence Simon <[email protected]>
AuthorDate: Thu Feb 8 15:41:27 2024 +0100

    NIFI-12735 Adding the option to execute flow analysis before committing 
snapshot to registry
    
    This closes #8377.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../flow/FlowRegistryPreCommitException.java       |  23 ++
 .../java/org/apache/nifi/util/NiFiProperties.java  |  20 +
 .../controller/flowanalysis/FlowAnalysisUtil.java  |   0
 .../flow/FlowAnalyzingRegistryClientNode.java      | 453 +++++++++++++++++++++
 .../flow/FlowAnalyzingRegistryClientNodeTest.java  | 139 +++++++
 .../apache/nifi/controller/ExtensionBuilder.java   |  23 +-
 .../nifi/controller/flow/StandardFlowManager.java  |   4 +-
 .../nifi-framework/nifi-resources/pom.xml          |   3 +
 .../src/main/resources/conf/nifi.properties        |   3 +
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  22 +-
 10 files changed, 672 insertions(+), 18 deletions(-)

diff --git 
a/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryPreCommitException.java
 
b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryPreCommitException.java
new file mode 100644
index 0000000000..46e4f43f18
--- /dev/null
+++ 
b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryPreCommitException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+public class FlowRegistryPreCommitException extends FlowRegistryException {
+    public FlowRegistryPreCommitException(final String message) {
+        super(message);
+    }
+}
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 2ddee1d959..2e3829fe36 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -300,6 +300,9 @@ public class NiFiProperties extends ApplicationProperties {
     public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME = 
"nifi.analytics.connection.model.score.name";
     public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = 
"nifi.analytics.connection.model.score.threshold";
 
+    // registry client properties
+    public static final String 
FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT = 
"nifi.registry.check.for.rule.violations.before.commit";
+
     // runtime monitoring properties
     public static final String MONITOR_LONG_RUNNING_TASK_SCHEDULE = 
"nifi.monitor.long.running.task.schedule";
     public static final String MONITOR_LONG_RUNNING_TASK_THRESHOLD = 
"nifi.monitor.long.running.task.threshold";
@@ -399,6 +402,7 @@ public class NiFiProperties extends ApplicationProperties {
     private static final String DEFAULT_SECURITY_USER_JWS_KEY_ROTATION_PERIOD 
= "PT1H";
     public static final String DEFAULT_WEB_SHOULD_SEND_SERVER_VERSION = "true";
     public static final int DEFAULT_LISTENER_BOOTSTRAP_PORT = 0;
+    public static final Boolean 
DEFAULT_FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT = false;
 
     // cluster common defaults
     public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = 
"5 sec";
@@ -1844,6 +1848,22 @@ public class NiFiProperties extends 
ApplicationProperties {
                 .collect(Collectors.toSet());
     }
 
+    /**
+     * @return Returns true if NiFi should execute flow analysis on the 
process group before committing it to a registry. Returns
+     * false otherwise.
+     */
+    public boolean flowRegistryCheckForRuleViolationsBeforeCommit() {
+        final String flowRegistryCheckForRuleViolationsBeforeCommit = 
getProperty(
+                FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT,
+                
DEFAULT_FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT.toString());
+
+        if 
(!"true".equalsIgnoreCase(flowRegistryCheckForRuleViolationsBeforeCommit) && 
!"false".equalsIgnoreCase(flowRegistryCheckForRuleViolationsBeforeCommit)) {
+            throw new RuntimeException(String.format("%s was '%s', expected 
true or false", FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT, 
flowRegistryCheckForRuleViolationsBeforeCommit));
+        }
+
+        return 
Boolean.parseBoolean(flowRegistryCheckForRuleViolationsBeforeCommit);
+    }
+
     /**
      * Creates an instance of NiFiProperties. This should likely not be called
      * by any classes outside of the NiFi framework but can be useful by the
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalysisUtil.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalysisUtil.java
similarity index 100%
rename from 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalysisUtil.java
rename to 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalysisUtil.java
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
new file mode 100644
index 0000000000..8ce3942778
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.nifi.authorization.Resource;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.TerminationAwareLogger;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.ParameterProviderReference;
+import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flowanalysis.EnforcementPolicy;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterLookup;
+import org.apache.nifi.parameter.ParameterUpdate;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.validation.RuleViolation;
+import org.apache.nifi.validation.RuleViolationsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public final class FlowAnalyzingRegistryClientNode implements 
FlowRegistryClientNode {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FlowAnalyzingRegistryClientNode.class);
+
+    private final FlowRegistryClientNode node;
+    private final ControllerServiceProvider serviceProvider;
+    private final FlowAnalyzer flowAnalyzer;
+    private final RuleViolationsManager ruleViolationsManager;
+    private final FlowManager flowManager;
+    private final NiFiRegistryFlowMapper flowMapper;
+
+    public FlowAnalyzingRegistryClientNode(
+            final FlowRegistryClientNode node,
+            final ControllerServiceProvider serviceProvider,
+            final FlowAnalyzer flowAnalyzer,
+            final RuleViolationsManager ruleViolationsManager,
+            final FlowManager flowManager,
+            final NiFiRegistryFlowMapper flowMapper
+    ) {
+        this.node = Objects.requireNonNull(node);
+        this.serviceProvider = Objects.requireNonNull(serviceProvider);
+        this.flowAnalyzer = Objects.requireNonNull(flowAnalyzer);
+        this.ruleViolationsManager = 
Objects.requireNonNull(ruleViolationsManager);
+        this.flowManager = Objects.requireNonNull(flowManager);
+        this.flowMapper = Objects.requireNonNull(flowMapper);
+    }
+
+    @Override
+    public RegisteredFlowSnapshot registerFlowSnapshot(
+            final FlowRegistryClientUserContext context,
+            final RegisteredFlow flow,
+            final VersionedProcessGroup snapshot,
+            final Map<String, ExternalControllerServiceReference> 
externalControllerServices,
+            final Map<String, VersionedParameterContext> parameterContexts,
+            final Map<String, ParameterProviderReference> 
parameterProviderReferences,
+            final String comments,
+            final int expectedVersion
+    ) throws FlowRegistryException, IOException {
+        if (LOGGER.isTraceEnabled()) {
+            LOGGER.trace("Snapshot for flow {} is checked for violations 
before commit", snapshot.getInstanceIdentifier());
+        }
+
+        if (analyzeProcessGroupToRegister(snapshot)) {
+            return node.registerFlowSnapshot(context, flow, snapshot, 
externalControllerServices, parameterContexts, parameterProviderReferences, 
comments, expectedVersion);
+        } else {
+            throw new FlowRegistryPreCommitException("There are unresolved 
rule violations");
+        }
+    }
+
+    private boolean analyzeProcessGroupToRegister(final VersionedProcessGroup 
snapshot) {
+        final InstantiatedVersionedProcessGroup nonVersionedProcessGroup = 
flowMapper.mapNonVersionedProcessGroup(flowManager.getGroup(snapshot.getInstanceIdentifier()),
 serviceProvider);
+
+        flowAnalyzer.analyzeProcessGroup(nonVersionedProcessGroup);
+        final List<RuleViolation> ruleViolations = 
ruleViolationsManager.getRuleViolationsForGroup(snapshot.getInstanceIdentifier()).stream()
+                .filter(ruleViolation -> 
EnforcementPolicy.ENFORCE.equals(ruleViolation.getEnforcementPolicy()))
+                .toList();
+
+        final boolean result = ruleViolations.isEmpty();
+
+        if (LOGGER.isDebugEnabled()) {
+            final String violations = 
ruleViolations.stream().map(RuleViolation::getViolationMessage).collect(Collectors.joining(",
 "));
+            LOGGER.debug("Snapshot for {} has following violation(s): {}", 
snapshot.getInstanceIdentifier(), violations);
+        }
+
+        return result;
+    }
+
+    @Override
+    public Authorizable getParentAuthorizable() {
+        return node.getParentAuthorizable();
+    }
+
+    @Override
+    public Resource getResource() {
+        return node.getResource();
+    }
+
+    @Override
+    public String getProcessGroupIdentifier() {
+        return node.getProcessGroupIdentifier();
+    }
+
+    @Override
+    public String getIdentifier() {
+        return node.getIdentifier();
+    }
+
+    @Override
+    public String getName() {
+        return node.getName();
+    }
+
+    @Override
+    public void setName(final String name) {
+        node.setName(name);
+    }
+
+    @Override
+    public String getAnnotationData() {
+        return node.getAnnotationData();
+    }
+
+    @Override
+    public void setAnnotationData(final String data) {
+        node.setAnnotationData(data);
+    }
+
+    @Override
+    public void setProperties(final Map<String, String> properties, final 
boolean allowRemovalOfRequiresProperties, final Set<String> 
sensitiveDynamicPropertyNames) {
+        node.setProperties(properties, allowRemovalOfRequiresProperties, 
sensitiveDynamicPropertyNames);
+    }
+
+    @Override
+    public void verifyCanUpdateProperties(final Map<String, String> 
properties) {
+        node.verifyCanUpdateProperties(properties);
+    }
+
+    @Override
+    public Set<String> getReferencedParameterNames() {
+        return node.getReferencedParameterNames();
+    }
+
+    @Override
+    public boolean isReferencingParameter() {
+        return node.isReferencingParameter();
+    }
+
+    @Override
+    public boolean isReferencingParameter(final String parameterName) {
+        return node.isReferencingParameter(parameterName);
+    }
+
+    @Override
+    public void onParametersModified(final Map<String, ParameterUpdate> 
parameterUpdates) {
+        node.onParametersModified(parameterUpdates);
+    }
+
+    @Override
+    public Set<String> getReferencedAttributeNames() {
+        return node.getReferencedAttributeNames();
+    }
+
+    @Override
+    public void pauseValidationTrigger() {
+        node.pauseValidationTrigger();
+    }
+
+    @Override
+    public void resumeValidationTrigger() {
+        node.resumeValidationTrigger();
+    }
+
+    @Override
+    public Map<PropertyDescriptor, String> getRawPropertyValues() {
+        return node.getRawPropertyValues();
+    }
+
+    @Override
+    public Map<PropertyDescriptor, String> getEffectivePropertyValues() {
+        return node.getEffectivePropertyValues();
+    }
+
+    @Override
+    public PropertyConfiguration getProperty(final PropertyDescriptor 
property) {
+        return node.getProperty(property);
+    }
+
+    @Override
+    public String getEffectivePropertyValue(final PropertyDescriptor property) 
{
+        return node.getEffectivePropertyValue(property);
+    }
+
+    @Override
+    public String getRawPropertyValue(final PropertyDescriptor property) {
+        return node.getRawPropertyValue(property);
+    }
+
+    @Override
+    public Map<PropertyDescriptor, PropertyConfiguration> getProperties() {
+        return node.getProperties();
+    }
+
+    @Override
+    public void reload(final Set<URL> additionalUrls) throws Exception {
+        node.reload(additionalUrls);
+    }
+
+    @Override
+    public void refreshProperties() {
+        node.refreshProperties();
+    }
+
+    @Override
+    public Set<URL> getAdditionalClasspathResources(final 
List<PropertyDescriptor> propertyDescriptors) {
+        return node.getAdditionalClasspathResources(propertyDescriptors);
+    }
+
+    @Override
+    public BundleCoordinate getBundleCoordinate() {
+        return node.getBundleCoordinate();
+    }
+
+    @Override
+    public ConfigurableComponent getComponent() {
+        return node.getComponent();
+    }
+
+    @Override
+    public TerminationAwareLogger getLogger() {
+        return node.getLogger();
+    }
+
+    @Override
+    public boolean isExtensionMissing() {
+        return node.isExtensionMissing();
+    }
+
+    @Override
+    public void setExtensionMissing(final boolean extensionMissing) {
+        node.setExtensionMissing(extensionMissing);
+    }
+
+    @Override
+    public void verifyCanUpdateBundle(final BundleCoordinate bundleCoordinate) 
throws IllegalStateException {
+        node.verifyCanUpdateBundle(bundleCoordinate);
+    }
+
+    @Override
+    public boolean isReloadAdditionalResourcesNecessary() {
+        return node.isReloadAdditionalResourcesNecessary();
+    }
+
+    @Override
+    public void reloadAdditionalResourcesIfNecessary() {
+        node.reloadAdditionalResourcesIfNecessary();
+    }
+
+    @Override
+    public void resetValidationState() {
+        node.resetValidationState();
+    }
+
+    @Override
+    public Collection<ValidationResult> getValidationErrors() {
+        return node.getValidationErrors();
+    }
+
+    @Override
+    public String getComponentType() {
+        return node.getComponentType();
+    }
+
+    @Override
+    public Class<?> getComponentClass() {
+        return node.getComponentClass();
+    }
+
+    @Override
+    public String getCanonicalClassName() {
+        return node.getCanonicalClassName();
+    }
+
+    @Override
+    public boolean isRestricted() {
+        return node.isRestricted();
+    }
+
+    @Override
+    public boolean isDeprecated() {
+        return node.isDeprecated();
+    }
+
+    @Override
+    public boolean isValidationNecessary() {
+        return node.isValidationNecessary();
+    }
+
+    @Override
+    public ValidationStatus getValidationStatus() {
+        return node.getValidationStatus();
+    }
+
+    @Override
+    public ValidationStatus getValidationStatus(final long timeout, final 
TimeUnit unit) {
+        return node.getValidationStatus(timeout, unit);
+    }
+
+    @Override
+    public ValidationStatus performValidation() {
+        return node.performValidation();
+    }
+
+    @Override
+    public ValidationState performValidation(final ValidationContext 
validationContext) {
+        return node.performValidation(validationContext);
+    }
+
+    @Override
+    public ValidationState performValidation(final Map<PropertyDescriptor, 
PropertyConfiguration> properties, final String annotationData, final 
ParameterContext parameterContext) {
+        return node.performValidation(properties, annotationData, 
parameterContext);
+    }
+
+    @Override
+    public List<PropertyDescriptor> getPropertyDescriptors() {
+        return node.getPropertyDescriptors();
+    }
+
+    @Override
+    public PropertyDescriptor getPropertyDescriptor(final String name) {
+        return node.getPropertyDescriptor(name);
+    }
+
+    @Override
+    public boolean isSensitiveDynamicProperty(final String name) {
+        return node.isSensitiveDynamicProperty(name);
+    }
+
+    @Override
+    public Optional<ProcessGroup> getParentProcessGroup() {
+        return node.getParentProcessGroup();
+    }
+
+    @Override
+    public ParameterLookup getParameterLookup() {
+        return node.getParameterLookup();
+    }
+
+    @Override
+    public String getDescription() {
+        return node.getDescription();
+    }
+
+    @Override
+    public void setDescription(final String description) {
+        node.setDescription(description);
+    }
+
+    @Override
+    public boolean isStorageLocationApplicable(final String location) {
+        return node.isStorageLocationApplicable(location);
+    }
+
+    @Override
+    public Set<FlowRegistryBucket> getBuckets(final 
FlowRegistryClientUserContext context) throws FlowRegistryException, 
IOException {
+        return node.getBuckets(context);
+    }
+
+    @Override
+    public FlowRegistryBucket getBucket(final FlowRegistryClientUserContext 
context, final String bucketId) throws FlowRegistryException, IOException {
+        return node.getBucket(context, bucketId);
+    }
+
+    @Override
+    public RegisteredFlow registerFlow(final FlowRegistryClientUserContext 
context, final RegisteredFlow flow) throws FlowRegistryException, IOException {
+        return node.registerFlow(context, flow);
+    }
+
+    @Override
+    public RegisteredFlow deregisterFlow(final FlowRegistryClientUserContext 
context, final String bucketId, final String flowId) throws 
FlowRegistryException, IOException {
+        return node.deregisterFlow(context, bucketId, flowId);
+    }
+
+    @Override
+    public RegisteredFlow getFlow(final FlowRegistryClientUserContext context, 
final String bucketId, final String flowId) throws FlowRegistryException, 
IOException {
+        return node.getFlow(context, bucketId, flowId);
+    }
+
+    @Override
+    public Set<RegisteredFlow> getFlows(final FlowRegistryClientUserContext 
context, final String bucketId) throws FlowRegistryException, IOException {
+        return node.getFlows(context, bucketId);
+    }
+
+    @Override
+    public FlowSnapshotContainer getFlowContents(
+            final FlowRegistryClientUserContext context, final String 
bucketId, final String flowId, final int version, final boolean fetchRemoteFlows
+    ) throws FlowRegistryException, IOException {
+        return node.getFlowContents(context, bucketId, flowId, version, 
fetchRemoteFlows);
+    }
+
+    @Override
+    public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(final 
FlowRegistryClientUserContext context, final String bucketId, final String 
flowId) throws FlowRegistryException, IOException {
+        return node.getFlowVersions(context, bucketId, flowId);
+    }
+
+    @Override
+    public int getLatestVersion(final FlowRegistryClientUserContext context, 
final String bucketId, final String flowId) throws FlowRegistryException, 
IOException {
+        return node.getLatestVersion(context, bucketId, flowId);
+    }
+
+    @Override
+    public void setComponent(final LoggableComponent<FlowRegistryClient> 
component) {
+        node.setComponent(component);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java
new file mode 100644
index 0000000000..9ff0991dea
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flowanalysis.EnforcementPolicy;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.validation.RuleViolation;
+import org.apache.nifi.validation.RuleViolationsManager;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class FlowAnalyzingRegistryClientNodeTest {
+    private final static String INSTANCE_IDENTIFIER = 
UUID.randomUUID().toString();
+    private final static String COMMENT_TEXT = "comment";
+    private final static int EXPECTED_VERSION = 3;
+
+    @Mock
+    FlowRegistryClientNode node;
+
+    @Mock
+    ControllerServiceProvider serviceProvider;
+
+    @Mock
+    FlowAnalyzer flowAnalyzer;
+
+    @Mock
+    RuleViolationsManager ruleViolationsManager;
+
+    @Mock
+    FlowManager flowManager;
+
+    @Mock
+    NiFiRegistryFlowMapper flowMapper;
+
+    @Mock
+    InstantiatedVersionedProcessGroup nonVersionedProcessGroup;
+
+    @Mock
+    ProcessGroup processGroup;
+
+    @Mock
+    VersionedProcessGroup versionedProcessGroup;
+
+    @Mock
+    RuleViolation ruleViolation1;
+
+    @Mock
+    RuleViolation ruleViolation2;
+
+    @Mock
+    RuleViolation ruleViolation3;
+
+    private final FlowRegistryClientUserContext context = new 
StandardFlowRegistryClientUserContext();
+    private final RegisteredFlow flow = new RegisteredFlow();
+
+    @BeforeEach
+    public void setUp() {
+        
Mockito.when(versionedProcessGroup.getInstanceIdentifier()).thenReturn(INSTANCE_IDENTIFIER);
+        
Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(processGroup);
+        
Mockito.when(flowMapper.mapNonVersionedProcessGroup(Mockito.same(processGroup), 
Mockito.same(serviceProvider))).thenReturn(nonVersionedProcessGroup);
+        
Mockito.when(ruleViolation1.getEnforcementPolicy()).thenReturn(EnforcementPolicy.ENFORCE);
+        
Mockito.when(ruleViolation2.getEnforcementPolicy()).thenReturn(EnforcementPolicy.ENFORCE);
+        
Mockito.when(ruleViolation3.getEnforcementPolicy()).thenReturn(EnforcementPolicy.WARN);
+    }
+
+    @Test
+    public void allowFlowRegistrationWhenNoEnforcingViolationFound() throws 
IOException, FlowRegistryException {
+        
Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.emptyList());
+        final FlowAnalyzingRegistryClientNode testSubject = new 
FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer, 
ruleViolationsManager, flowManager, flowMapper);
+
+        testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, 
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 
COMMENT_TEXT, EXPECTED_VERSION);
+
+        Mockito
+            .verify(node, Mockito.only())
+            .registerFlowSnapshot(context, flow, versionedProcessGroup, 
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 
COMMENT_TEXT, EXPECTED_VERSION);
+    }
+
+    @Test
+    public void allowFlowRegistrationWhenWarningViolationFound() throws 
IOException, FlowRegistryException {
+        
Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.singletonList(ruleViolation3));
+        final FlowAnalyzingRegistryClientNode testSubject = new 
FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer, 
ruleViolationsManager, flowManager, flowMapper);
+
+        testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, 
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 
COMMENT_TEXT, EXPECTED_VERSION);
+
+        Mockito
+            .verify(node, Mockito.only())
+            .registerFlowSnapshot(context, flow, versionedProcessGroup, 
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 
COMMENT_TEXT, EXPECTED_VERSION);
+    }
+
+    @Test
+    public void preventFlowRegistrationWhenEnforcingViolationFound() throws 
IOException, FlowRegistryException {
+        
Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Arrays.asList(ruleViolation1,
 ruleViolation2));
+        final FlowAnalyzingRegistryClientNode testSubject = new 
FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer, 
ruleViolationsManager, flowManager, flowMapper);
+
+        Assertions.assertThrows(
+            FlowRegistryPreCommitException.class,
+            () -> testSubject.registerFlowSnapshot(context, flow, 
versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION)
+        );
+
+        Mockito
+            .verify(node, Mockito.never())
+            .registerFlowSnapshot(context, flow, versionedProcessGroup, 
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 
COMMENT_TEXT, EXPECTED_VERSION);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
index c50f968683..e5969a0bf8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -30,6 +30,7 @@ import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import 
org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleInstantiationException;
+import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
 import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
 import 
org.apache.nifi.controller.flowanalysis.StandardFlowAnalysisInitializationContext;
 import org.apache.nifi.controller.flowanalysis.StandardFlowAnalysisRuleNode;
@@ -67,6 +68,7 @@ import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardProcessorInitializationContext;
 import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.python.PythonBridge;
+import org.apache.nifi.registry.flow.FlowAnalyzingRegistryClientNode;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
 import org.apache.nifi.registry.flow.FlowRegistryClientNode;
@@ -111,6 +113,7 @@ public class ExtensionBuilder {
    private StateManagerProvider stateManagerProvider;
    private RuleViolationsManager ruleViolationsManager;
    private FlowAnalyzer flowAnalyzer;
+   private boolean flowAnalysisAtRegistryCommit;
    private String classloaderIsolationKey;
    private SSLContext systemSslContext;
    private PythonBridge pythonBridge;
@@ -178,6 +181,11 @@ public class ExtensionBuilder {
        return this;
    }
 
+   public ExtensionBuilder flowAnalysisAtRegistryCommit(final boolean 
flowAnalysisAtRegistryCommit) {
+       this.flowAnalysisAtRegistryCommit = flowAnalysisAtRegistryCommit;
+       return this;
+   }
+
    public ExtensionBuilder stateManagerProvider(final StateManagerProvider 
stateManagerProvider) {
        this.stateManagerProvider = stateManagerProvider;
        return this;
@@ -534,7 +542,7 @@ public class ExtensionBuilder {
 
    private FlowRegistryClientNode createFlowRegistryClientNode(final 
LoggableComponent<FlowRegistryClient> client, final boolean creationSuccessful) 
{
        final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(serviceProvider, ruleViolationsManager, 
flowAnalyzer);
-       final FlowRegistryClientNode clientNode;
+       final StandardFlowRegistryClientNode clientNode;
 
        if (creationSuccessful) {
            clientNode = new StandardFlowRegistryClientNode(
@@ -570,7 +578,18 @@ public class ExtensionBuilder {
                    true);
        }
 
-       return clientNode;
+       if (flowAnalysisAtRegistryCommit) {
+           return new FlowAnalyzingRegistryClientNode(
+                   clientNode,
+                   serviceProvider,
+                   flowAnalyzer,
+                   ruleViolationsManager,
+                   flowController.getFlowManager(),
+                   FlowAnalysisUtil.createMapper(extensionManager)
+           );
+       } else {
+           return clientNode;
+       }
    }
 
    private void applyDefaultSettings(final ProcessorNode processorNode) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index 9eb800b2b2..d23414fc78 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -415,10 +415,12 @@ public class StandardFlowManager extends 
AbstractFlowManager implements FlowMana
                 .reloadComponent(flowController.getReloadComponent())
                 .addClasspathUrls(additionalUrls)
                 
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
-                .flowController(flowController)
                 .systemSslContext(systemSslContext)
                 .extensionManager(extensionManager)
                 .classloaderIsolationKey(classloaderIsolationKey)
+                
.flowAnalysisAtRegistryCommit(nifiProperties.flowRegistryCheckForRuleViolationsBeforeCommit())
+                .flowAnalyzer(getFlowAnalyzer().orElse(null))
+                .ruleViolationsManager(getRuleViolationsManager().orElse(null))
                 .buildFlowRegistryClient();
 
         
LogRepositoryFactory.getRepository(clientNode.getIdentifier()).setLogger(clientNode.getLogger());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 49fc414acb..db4502c77d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -253,6 +253,9 @@
         
<nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name>
         
<nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold>
 
+        <!-- nifi.properties: flow analysis properties -->
+        <nifi.registry.check.for.rule.violations.before.commit />
+
         <!-- nifi.properties: runtime monitoring properties -->
         <nifi.monitor.long.running.task.schedule>1 
min</nifi.monitor.long.running.task.schedule>
         <nifi.monitor.long.running.task.threshold>5 
mins</nifi.monitor.long.running.task.threshold>
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index f021c71a2f..72de9d3f22 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -337,6 +337,9 @@ 
nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.mode
 # kubernetes #
 
nifi.cluster.leader.election.kubernetes.lease.prefix=${nifi.cluster.leader.election.kubernetes.lease.prefix}
 
+# flow analysis properties
+nifi.registry.check.for.rule.violations.before.commit=${nifi.registry.check.for.rule.violations.before.commit}
+
 # runtime monitoring properties
 nifi.monitor.long.running.task.schedule=
 nifi.monitor.long.running.task.threshold=
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index d2c0f1cd26..6d62c62208 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5026,21 +5026,13 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
             // If the flow has been created, but failed to add a snapshot,
             // then we need to capture the created versioned flow information 
as a partial successful result.
             if (registerNewFlow) {
-                logger.error("The flow has been created, but failed to add a 
snapshot. Returning the created flow information.", e);
-                final VersionControlInformationDTO vci = new 
VersionControlInformationDTO();
-                vci.setBucketId(registeredFlow.getBucketIdentifier());
-                vci.setBucketName(registeredFlow.getBucketName());
-                vci.setFlowId(registeredFlow.getIdentifier());
-                vci.setFlowName(registeredFlow.getName());
-                vci.setFlowDescription(registeredFlow.getDescription());
-                vci.setGroupId(groupId);
-                vci.setRegistryId(registryId);
-                vci.setRegistryName(getFlowRegistryName(registryId));
-                vci.setVersion(0);
-                vci.setState(VersionedFlowState.SYNC_FAILURE.name());
-                vci.setStateExplanation(e.getLocalizedMessage());
-
-                return createVersionControlComponentMappingEntity(groupId, 
versionedProcessGroup, vci);
+                try {
+                    flowRegistryDAO
+                        .getFlowRegistryClient(registryId)
+                        
.deregisterFlow(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()),
 versionedFlowDto.getBucketId(), flowId);
+                } catch (final IOException | FlowRegistryException e2) {
+                    throw new NiFiCoreException("Failed to remove flow from 
Flow Registry due to " + e2.getMessage(), e2);
+                }
             }
 
             throw e;


Reply via email to