This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 88d434f9ec NIFI-12735 Adding the option to execute flow analysis
before committing snapshot to registry
88d434f9ec is described below
commit 88d434f9eca41b939e2eb7213f45b8bb8ae00ed3
Author: Bence Simon <[email protected]>
AuthorDate: Thu Feb 8 15:41:27 2024 +0100
NIFI-12735 Adding the option to execute flow analysis before committing
snapshot to registry
This closes #8377.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../flow/FlowRegistryPreCommitException.java | 23 ++
.../java/org/apache/nifi/util/NiFiProperties.java | 20 +
.../controller/flowanalysis/FlowAnalysisUtil.java | 0
.../flow/FlowAnalyzingRegistryClientNode.java | 453 +++++++++++++++++++++
.../flow/FlowAnalyzingRegistryClientNodeTest.java | 139 +++++++
.../apache/nifi/controller/ExtensionBuilder.java | 23 +-
.../nifi/controller/flow/StandardFlowManager.java | 4 +-
.../nifi-framework/nifi-resources/pom.xml | 3 +
.../src/main/resources/conf/nifi.properties | 3 +
.../apache/nifi/web/StandardNiFiServiceFacade.java | 22 +-
10 files changed, 672 insertions(+), 18 deletions(-)
diff --git
a/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryPreCommitException.java
b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryPreCommitException.java
new file mode 100644
index 0000000000..46e4f43f18
--- /dev/null
+++
b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryPreCommitException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+public class FlowRegistryPreCommitException extends FlowRegistryException {
+ public FlowRegistryPreCommitException(final String message) {
+ super(message);
+ }
+}
diff --git
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 2ddee1d959..2e3829fe36 100644
---
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -300,6 +300,9 @@ public class NiFiProperties extends ApplicationProperties {
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME =
"nifi.analytics.connection.model.score.name";
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD =
"nifi.analytics.connection.model.score.threshold";
+ // registry client properties
+ public static final String
FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT =
"nifi.registry.check.for.rule.violations.before.commit";
+
// runtime monitoring properties
public static final String MONITOR_LONG_RUNNING_TASK_SCHEDULE =
"nifi.monitor.long.running.task.schedule";
public static final String MONITOR_LONG_RUNNING_TASK_THRESHOLD =
"nifi.monitor.long.running.task.threshold";
@@ -399,6 +402,7 @@ public class NiFiProperties extends ApplicationProperties {
private static final String DEFAULT_SECURITY_USER_JWS_KEY_ROTATION_PERIOD
= "PT1H";
public static final String DEFAULT_WEB_SHOULD_SEND_SERVER_VERSION = "true";
public static final int DEFAULT_LISTENER_BOOTSTRAP_PORT = 0;
+ public static final Boolean
DEFAULT_FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT = false;
// cluster common defaults
public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL =
"5 sec";
@@ -1844,6 +1848,22 @@ public class NiFiProperties extends
ApplicationProperties {
.collect(Collectors.toSet());
}
+ /**
+ * @return Returns true if NiFi should execute flow analysis on the
process group before committing it to a registry. Returns
+ * false otherwise.
+ */
+ public boolean flowRegistryCheckForRuleViolationsBeforeCommit() {
+ final String flowRegistryCheckForRuleViolationsBeforeCommit =
getProperty(
+ FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT,
+
DEFAULT_FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT.toString());
+
+ if
(!"true".equalsIgnoreCase(flowRegistryCheckForRuleViolationsBeforeCommit) &&
!"false".equalsIgnoreCase(flowRegistryCheckForRuleViolationsBeforeCommit)) {
+ throw new RuntimeException(String.format("%s was '%s', expected
true or false", FLOW_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT,
flowRegistryCheckForRuleViolationsBeforeCommit));
+ }
+
+ return
Boolean.parseBoolean(flowRegistryCheckForRuleViolationsBeforeCommit);
+ }
+
/**
* Creates an instance of NiFiProperties. This should likely not be called
* by any classes outside of the NiFi framework but can be useful by the
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalysisUtil.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalysisUtil.java
similarity index 100%
rename from
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalysisUtil.java
rename to
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flowanalysis/FlowAnalysisUtil.java
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
new file mode 100644
index 0000000000..8ce3942778
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.nifi.authorization.Resource;
+import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
+import org.apache.nifi.controller.LoggableComponent;
+import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.TerminationAwareLogger;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.flow.ExternalControllerServiceReference;
+import org.apache.nifi.flow.ParameterProviderReference;
+import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flowanalysis.EnforcementPolicy;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterLookup;
+import org.apache.nifi.parameter.ParameterUpdate;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.validation.RuleViolation;
+import org.apache.nifi.validation.RuleViolationsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public final class FlowAnalyzingRegistryClientNode implements
FlowRegistryClientNode {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FlowAnalyzingRegistryClientNode.class);
+
+ private final FlowRegistryClientNode node;
+ private final ControllerServiceProvider serviceProvider;
+ private final FlowAnalyzer flowAnalyzer;
+ private final RuleViolationsManager ruleViolationsManager;
+ private final FlowManager flowManager;
+ private final NiFiRegistryFlowMapper flowMapper;
+
+ public FlowAnalyzingRegistryClientNode(
+ final FlowRegistryClientNode node,
+ final ControllerServiceProvider serviceProvider,
+ final FlowAnalyzer flowAnalyzer,
+ final RuleViolationsManager ruleViolationsManager,
+ final FlowManager flowManager,
+ final NiFiRegistryFlowMapper flowMapper
+ ) {
+ this.node = Objects.requireNonNull(node);
+ this.serviceProvider = Objects.requireNonNull(serviceProvider);
+ this.flowAnalyzer = Objects.requireNonNull(flowAnalyzer);
+ this.ruleViolationsManager =
Objects.requireNonNull(ruleViolationsManager);
+ this.flowManager = Objects.requireNonNull(flowManager);
+ this.flowMapper = Objects.requireNonNull(flowMapper);
+ }
+
+ @Override
+ public RegisteredFlowSnapshot registerFlowSnapshot(
+ final FlowRegistryClientUserContext context,
+ final RegisteredFlow flow,
+ final VersionedProcessGroup snapshot,
+ final Map<String, ExternalControllerServiceReference>
externalControllerServices,
+ final Map<String, VersionedParameterContext> parameterContexts,
+ final Map<String, ParameterProviderReference>
parameterProviderReferences,
+ final String comments,
+ final int expectedVersion
+ ) throws FlowRegistryException, IOException {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Snapshot for flow {} is checked for violations
before commit", snapshot.getInstanceIdentifier());
+ }
+
+ if (analyzeProcessGroupToRegister(snapshot)) {
+ return node.registerFlowSnapshot(context, flow, snapshot,
externalControllerServices, parameterContexts, parameterProviderReferences,
comments, expectedVersion);
+ } else {
+ throw new FlowRegistryPreCommitException("There are unresolved
rule violations");
+ }
+ }
+
+ private boolean analyzeProcessGroupToRegister(final VersionedProcessGroup
snapshot) {
+ final InstantiatedVersionedProcessGroup nonVersionedProcessGroup =
flowMapper.mapNonVersionedProcessGroup(flowManager.getGroup(snapshot.getInstanceIdentifier()),
serviceProvider);
+
+ flowAnalyzer.analyzeProcessGroup(nonVersionedProcessGroup);
+ final List<RuleViolation> ruleViolations =
ruleViolationsManager.getRuleViolationsForGroup(snapshot.getInstanceIdentifier()).stream()
+ .filter(ruleViolation ->
EnforcementPolicy.ENFORCE.equals(ruleViolation.getEnforcementPolicy()))
+ .toList();
+
+ final boolean result = ruleViolations.isEmpty();
+
+ if (LOGGER.isDebugEnabled()) {
+ final String violations =
ruleViolations.stream().map(RuleViolation::getViolationMessage).collect(Collectors.joining(",
"));
+ LOGGER.debug("Snapshot for {} has following violation(s): {}",
snapshot.getInstanceIdentifier(), violations);
+ }
+
+ return result;
+ }
+
+ @Override
+ public Authorizable getParentAuthorizable() {
+ return node.getParentAuthorizable();
+ }
+
+ @Override
+ public Resource getResource() {
+ return node.getResource();
+ }
+
+ @Override
+ public String getProcessGroupIdentifier() {
+ return node.getProcessGroupIdentifier();
+ }
+
+ @Override
+ public String getIdentifier() {
+ return node.getIdentifier();
+ }
+
+ @Override
+ public String getName() {
+ return node.getName();
+ }
+
+ @Override
+ public void setName(final String name) {
+ node.setName(name);
+ }
+
+ @Override
+ public String getAnnotationData() {
+ return node.getAnnotationData();
+ }
+
+ @Override
+ public void setAnnotationData(final String data) {
+ node.setAnnotationData(data);
+ }
+
+ @Override
+ public void setProperties(final Map<String, String> properties, final
boolean allowRemovalOfRequiresProperties, final Set<String>
sensitiveDynamicPropertyNames) {
+ node.setProperties(properties, allowRemovalOfRequiresProperties,
sensitiveDynamicPropertyNames);
+ }
+
+ @Override
+ public void verifyCanUpdateProperties(final Map<String, String>
properties) {
+ node.verifyCanUpdateProperties(properties);
+ }
+
+ @Override
+ public Set<String> getReferencedParameterNames() {
+ return node.getReferencedParameterNames();
+ }
+
+ @Override
+ public boolean isReferencingParameter() {
+ return node.isReferencingParameter();
+ }
+
+ @Override
+ public boolean isReferencingParameter(final String parameterName) {
+ return node.isReferencingParameter(parameterName);
+ }
+
+ @Override
+ public void onParametersModified(final Map<String, ParameterUpdate>
parameterUpdates) {
+ node.onParametersModified(parameterUpdates);
+ }
+
+ @Override
+ public Set<String> getReferencedAttributeNames() {
+ return node.getReferencedAttributeNames();
+ }
+
+ @Override
+ public void pauseValidationTrigger() {
+ node.pauseValidationTrigger();
+ }
+
+ @Override
+ public void resumeValidationTrigger() {
+ node.resumeValidationTrigger();
+ }
+
+ @Override
+ public Map<PropertyDescriptor, String> getRawPropertyValues() {
+ return node.getRawPropertyValues();
+ }
+
+ @Override
+ public Map<PropertyDescriptor, String> getEffectivePropertyValues() {
+ return node.getEffectivePropertyValues();
+ }
+
+ @Override
+ public PropertyConfiguration getProperty(final PropertyDescriptor
property) {
+ return node.getProperty(property);
+ }
+
+ @Override
+ public String getEffectivePropertyValue(final PropertyDescriptor property)
{
+ return node.getEffectivePropertyValue(property);
+ }
+
+ @Override
+ public String getRawPropertyValue(final PropertyDescriptor property) {
+ return node.getRawPropertyValue(property);
+ }
+
+ @Override
+ public Map<PropertyDescriptor, PropertyConfiguration> getProperties() {
+ return node.getProperties();
+ }
+
+ @Override
+ public void reload(final Set<URL> additionalUrls) throws Exception {
+ node.reload(additionalUrls);
+ }
+
+ @Override
+ public void refreshProperties() {
+ node.refreshProperties();
+ }
+
+ @Override
+ public Set<URL> getAdditionalClasspathResources(final
List<PropertyDescriptor> propertyDescriptors) {
+ return node.getAdditionalClasspathResources(propertyDescriptors);
+ }
+
+ @Override
+ public BundleCoordinate getBundleCoordinate() {
+ return node.getBundleCoordinate();
+ }
+
+ @Override
+ public ConfigurableComponent getComponent() {
+ return node.getComponent();
+ }
+
+ @Override
+ public TerminationAwareLogger getLogger() {
+ return node.getLogger();
+ }
+
+ @Override
+ public boolean isExtensionMissing() {
+ return node.isExtensionMissing();
+ }
+
+ @Override
+ public void setExtensionMissing(final boolean extensionMissing) {
+ node.setExtensionMissing(extensionMissing);
+ }
+
+ @Override
+ public void verifyCanUpdateBundle(final BundleCoordinate bundleCoordinate)
throws IllegalStateException {
+ node.verifyCanUpdateBundle(bundleCoordinate);
+ }
+
+ @Override
+ public boolean isReloadAdditionalResourcesNecessary() {
+ return node.isReloadAdditionalResourcesNecessary();
+ }
+
+ @Override
+ public void reloadAdditionalResourcesIfNecessary() {
+ node.reloadAdditionalResourcesIfNecessary();
+ }
+
+ @Override
+ public void resetValidationState() {
+ node.resetValidationState();
+ }
+
+ @Override
+ public Collection<ValidationResult> getValidationErrors() {
+ return node.getValidationErrors();
+ }
+
+ @Override
+ public String getComponentType() {
+ return node.getComponentType();
+ }
+
+ @Override
+ public Class<?> getComponentClass() {
+ return node.getComponentClass();
+ }
+
+ @Override
+ public String getCanonicalClassName() {
+ return node.getCanonicalClassName();
+ }
+
+ @Override
+ public boolean isRestricted() {
+ return node.isRestricted();
+ }
+
+ @Override
+ public boolean isDeprecated() {
+ return node.isDeprecated();
+ }
+
+ @Override
+ public boolean isValidationNecessary() {
+ return node.isValidationNecessary();
+ }
+
+ @Override
+ public ValidationStatus getValidationStatus() {
+ return node.getValidationStatus();
+ }
+
+ @Override
+ public ValidationStatus getValidationStatus(final long timeout, final
TimeUnit unit) {
+ return node.getValidationStatus(timeout, unit);
+ }
+
+ @Override
+ public ValidationStatus performValidation() {
+ return node.performValidation();
+ }
+
+ @Override
+ public ValidationState performValidation(final ValidationContext
validationContext) {
+ return node.performValidation(validationContext);
+ }
+
+ @Override
+ public ValidationState performValidation(final Map<PropertyDescriptor,
PropertyConfiguration> properties, final String annotationData, final
ParameterContext parameterContext) {
+ return node.performValidation(properties, annotationData,
parameterContext);
+ }
+
+ @Override
+ public List<PropertyDescriptor> getPropertyDescriptors() {
+ return node.getPropertyDescriptors();
+ }
+
+ @Override
+ public PropertyDescriptor getPropertyDescriptor(final String name) {
+ return node.getPropertyDescriptor(name);
+ }
+
+ @Override
+ public boolean isSensitiveDynamicProperty(final String name) {
+ return node.isSensitiveDynamicProperty(name);
+ }
+
+ @Override
+ public Optional<ProcessGroup> getParentProcessGroup() {
+ return node.getParentProcessGroup();
+ }
+
+ @Override
+ public ParameterLookup getParameterLookup() {
+ return node.getParameterLookup();
+ }
+
+ @Override
+ public String getDescription() {
+ return node.getDescription();
+ }
+
+ @Override
+ public void setDescription(final String description) {
+ node.setDescription(description);
+ }
+
+ @Override
+ public boolean isStorageLocationApplicable(final String location) {
+ return node.isStorageLocationApplicable(location);
+ }
+
+ @Override
+ public Set<FlowRegistryBucket> getBuckets(final
FlowRegistryClientUserContext context) throws FlowRegistryException,
IOException {
+ return node.getBuckets(context);
+ }
+
+ @Override
+ public FlowRegistryBucket getBucket(final FlowRegistryClientUserContext
context, final String bucketId) throws FlowRegistryException, IOException {
+ return node.getBucket(context, bucketId);
+ }
+
+ @Override
+ public RegisteredFlow registerFlow(final FlowRegistryClientUserContext
context, final RegisteredFlow flow) throws FlowRegistryException, IOException {
+ return node.registerFlow(context, flow);
+ }
+
+ @Override
+ public RegisteredFlow deregisterFlow(final FlowRegistryClientUserContext
context, final String bucketId, final String flowId) throws
FlowRegistryException, IOException {
+ return node.deregisterFlow(context, bucketId, flowId);
+ }
+
+ @Override
+ public RegisteredFlow getFlow(final FlowRegistryClientUserContext context,
final String bucketId, final String flowId) throws FlowRegistryException,
IOException {
+ return node.getFlow(context, bucketId, flowId);
+ }
+
+ @Override
+ public Set<RegisteredFlow> getFlows(final FlowRegistryClientUserContext
context, final String bucketId) throws FlowRegistryException, IOException {
+ return node.getFlows(context, bucketId);
+ }
+
+ @Override
+ public FlowSnapshotContainer getFlowContents(
+ final FlowRegistryClientUserContext context, final String
bucketId, final String flowId, final int version, final boolean fetchRemoteFlows
+ ) throws FlowRegistryException, IOException {
+ return node.getFlowContents(context, bucketId, flowId, version,
fetchRemoteFlows);
+ }
+
+ @Override
+ public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(final
FlowRegistryClientUserContext context, final String bucketId, final String
flowId) throws FlowRegistryException, IOException {
+ return node.getFlowVersions(context, bucketId, flowId);
+ }
+
+ @Override
+ public int getLatestVersion(final FlowRegistryClientUserContext context,
final String bucketId, final String flowId) throws FlowRegistryException,
IOException {
+ return node.getLatestVersion(context, bucketId, flowId);
+ }
+
+ @Override
+ public void setComponent(final LoggableComponent<FlowRegistryClient>
component) {
+ node.setComponent(component);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java
new file mode 100644
index 0000000000..9ff0991dea
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flowanalysis.EnforcementPolicy;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.validation.RuleViolation;
+import org.apache.nifi.validation.RuleViolationsManager;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class FlowAnalyzingRegistryClientNodeTest {
+ private final static String INSTANCE_IDENTIFIER =
UUID.randomUUID().toString();
+ private final static String COMMENT_TEXT = "comment";
+ private final static int EXPECTED_VERSION = 3;
+
+ @Mock
+ FlowRegistryClientNode node;
+
+ @Mock
+ ControllerServiceProvider serviceProvider;
+
+ @Mock
+ FlowAnalyzer flowAnalyzer;
+
+ @Mock
+ RuleViolationsManager ruleViolationsManager;
+
+ @Mock
+ FlowManager flowManager;
+
+ @Mock
+ NiFiRegistryFlowMapper flowMapper;
+
+ @Mock
+ InstantiatedVersionedProcessGroup nonVersionedProcessGroup;
+
+ @Mock
+ ProcessGroup processGroup;
+
+ @Mock
+ VersionedProcessGroup versionedProcessGroup;
+
+ @Mock
+ RuleViolation ruleViolation1;
+
+ @Mock
+ RuleViolation ruleViolation2;
+
+ @Mock
+ RuleViolation ruleViolation3;
+
+ private final FlowRegistryClientUserContext context = new
StandardFlowRegistryClientUserContext();
+ private final RegisteredFlow flow = new RegisteredFlow();
+
+ @BeforeEach
+ public void setUp() {
+
Mockito.when(versionedProcessGroup.getInstanceIdentifier()).thenReturn(INSTANCE_IDENTIFIER);
+
Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(processGroup);
+
Mockito.when(flowMapper.mapNonVersionedProcessGroup(Mockito.same(processGroup),
Mockito.same(serviceProvider))).thenReturn(nonVersionedProcessGroup);
+
Mockito.when(ruleViolation1.getEnforcementPolicy()).thenReturn(EnforcementPolicy.ENFORCE);
+
Mockito.when(ruleViolation2.getEnforcementPolicy()).thenReturn(EnforcementPolicy.ENFORCE);
+
Mockito.when(ruleViolation3.getEnforcementPolicy()).thenReturn(EnforcementPolicy.WARN);
+ }
+
+ @Test
+ public void allowFlowRegistrationWhenNoEnforcingViolationFound() throws
IOException, FlowRegistryException {
+
Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.emptyList());
+ final FlowAnalyzingRegistryClientNode testSubject = new
FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer,
ruleViolationsManager, flowManager, flowMapper);
+
+ testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup,
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION);
+
+ Mockito
+ .verify(node, Mockito.only())
+ .registerFlowSnapshot(context, flow, versionedProcessGroup,
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION);
+ }
+
+ @Test
+ public void allowFlowRegistrationWhenWarningViolationFound() throws
IOException, FlowRegistryException {
+
Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.singletonList(ruleViolation3));
+ final FlowAnalyzingRegistryClientNode testSubject = new
FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer,
ruleViolationsManager, flowManager, flowMapper);
+
+ testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup,
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION);
+
+ Mockito
+ .verify(node, Mockito.only())
+ .registerFlowSnapshot(context, flow, versionedProcessGroup,
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION);
+ }
+
+ @Test
+ public void preventFlowRegistrationWhenEnforcingViolationFound() throws
IOException, FlowRegistryException {
+
Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Arrays.asList(ruleViolation1,
ruleViolation2));
+ final FlowAnalyzingRegistryClientNode testSubject = new
FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer,
ruleViolationsManager, flowManager, flowMapper);
+
+ Assertions.assertThrows(
+ FlowRegistryPreCommitException.class,
+ () -> testSubject.registerFlowSnapshot(context, flow,
versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION)
+ );
+
+ Mockito
+ .verify(node, Mockito.never())
+ .registerFlowSnapshot(context, flow, versionedProcessGroup,
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
COMMENT_TEXT, EXPECTED_VERSION);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
index c50f968683..e5969a0bf8 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -30,6 +30,7 @@ import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import
org.apache.nifi.controller.flowanalysis.FlowAnalysisRuleInstantiationException;
+import org.apache.nifi.controller.flowanalysis.FlowAnalysisUtil;
import org.apache.nifi.controller.flowanalysis.FlowAnalyzer;
import
org.apache.nifi.controller.flowanalysis.StandardFlowAnalysisInitializationContext;
import org.apache.nifi.controller.flowanalysis.StandardFlowAnalysisRuleNode;
@@ -67,6 +68,7 @@ import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessorInitializationContext;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.python.PythonBridge;
+import org.apache.nifi.registry.flow.FlowAnalyzingRegistryClientNode;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
@@ -111,6 +113,7 @@ public class ExtensionBuilder {
private StateManagerProvider stateManagerProvider;
private RuleViolationsManager ruleViolationsManager;
private FlowAnalyzer flowAnalyzer;
+ private boolean flowAnalysisAtRegistryCommit;
private String classloaderIsolationKey;
private SSLContext systemSslContext;
private PythonBridge pythonBridge;
@@ -178,6 +181,11 @@ public class ExtensionBuilder {
return this;
}
+ public ExtensionBuilder flowAnalysisAtRegistryCommit(final boolean
flowAnalysisAtRegistryCommit) {
+ this.flowAnalysisAtRegistryCommit = flowAnalysisAtRegistryCommit;
+ return this;
+ }
+
public ExtensionBuilder stateManagerProvider(final StateManagerProvider
stateManagerProvider) {
this.stateManagerProvider = stateManagerProvider;
return this;
@@ -534,7 +542,7 @@ public class ExtensionBuilder {
private FlowRegistryClientNode createFlowRegistryClientNode(final
LoggableComponent<FlowRegistryClient> client, final boolean creationSuccessful)
{
final ValidationContextFactory validationContextFactory = new
StandardValidationContextFactory(serviceProvider, ruleViolationsManager,
flowAnalyzer);
- final FlowRegistryClientNode clientNode;
+ final StandardFlowRegistryClientNode clientNode;
if (creationSuccessful) {
clientNode = new StandardFlowRegistryClientNode(
@@ -570,7 +578,18 @@ public class ExtensionBuilder {
true);
}
- return clientNode;
+ if (flowAnalysisAtRegistryCommit) {
+ return new FlowAnalyzingRegistryClientNode(
+ clientNode,
+ serviceProvider,
+ flowAnalyzer,
+ ruleViolationsManager,
+ flowController.getFlowManager(),
+ FlowAnalysisUtil.createMapper(extensionManager)
+ );
+ } else {
+ return clientNode;
+ }
}
private void applyDefaultSettings(final ProcessorNode processorNode) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index 9eb800b2b2..d23414fc78 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -415,10 +415,12 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
.reloadComponent(flowController.getReloadComponent())
.addClasspathUrls(additionalUrls)
.kerberosConfig(flowController.createKerberosConfig(nifiProperties))
- .flowController(flowController)
.systemSslContext(systemSslContext)
.extensionManager(extensionManager)
.classloaderIsolationKey(classloaderIsolationKey)
+
.flowAnalysisAtRegistryCommit(nifiProperties.flowRegistryCheckForRuleViolationsBeforeCommit())
+ .flowAnalyzer(getFlowAnalyzer().orElse(null))
+ .ruleViolationsManager(getRuleViolationsManager().orElse(null))
.buildFlowRegistryClient();
LogRepositoryFactory.getRepository(clientNode.getIdentifier()).setLogger(clientNode.getLogger());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 49fc414acb..db4502c77d 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -253,6 +253,9 @@
<nifi.analytics.connection.model.score.name>rSquared</nifi.analytics.connection.model.score.name>
<nifi.analytics.connection.model.score.threshold>.90</nifi.analytics.connection.model.score.threshold>
+ <!-- nifi.properties: flow analysis properties -->
+ <nifi.registry.check.for.rule.violations.before.commit />
+
<!-- nifi.properties: runtime monitoring properties -->
<nifi.monitor.long.running.task.schedule>1
min</nifi.monitor.long.running.task.schedule>
<nifi.monitor.long.running.task.threshold>5
mins</nifi.monitor.long.running.task.threshold>
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index f021c71a2f..72de9d3f22 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -337,6 +337,9 @@
nifi.analytics.connection.model.score.threshold=${nifi.analytics.connection.mode
# kubernetes #
nifi.cluster.leader.election.kubernetes.lease.prefix=${nifi.cluster.leader.election.kubernetes.lease.prefix}
+# flow analysis properties
+nifi.registry.check.for.rule.violations.before.commit=${nifi.registry.check.for.rule.violations.before.commit}
+
# runtime monitoring properties
nifi.monitor.long.running.task.schedule=
nifi.monitor.long.running.task.threshold=
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index d2c0f1cd26..6d62c62208 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -5026,21 +5026,13 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
// If the flow has been created, but failed to add a snapshot,
// then we need to capture the created versioned flow information
as a partial successful result.
if (registerNewFlow) {
- logger.error("The flow has been created, but failed to add a
snapshot. Returning the created flow information.", e);
- final VersionControlInformationDTO vci = new
VersionControlInformationDTO();
- vci.setBucketId(registeredFlow.getBucketIdentifier());
- vci.setBucketName(registeredFlow.getBucketName());
- vci.setFlowId(registeredFlow.getIdentifier());
- vci.setFlowName(registeredFlow.getName());
- vci.setFlowDescription(registeredFlow.getDescription());
- vci.setGroupId(groupId);
- vci.setRegistryId(registryId);
- vci.setRegistryName(getFlowRegistryName(registryId));
- vci.setVersion(0);
- vci.setState(VersionedFlowState.SYNC_FAILURE.name());
- vci.setStateExplanation(e.getLocalizedMessage());
-
- return createVersionControlComponentMappingEntity(groupId,
versionedProcessGroup, vci);
+ try {
+ flowRegistryDAO
+ .getFlowRegistryClient(registryId)
+
.deregisterFlow(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()),
versionedFlowDto.getBucketId(), flowId);
+ } catch (final IOException | FlowRegistryException e2) {
+ throw new NiFiCoreException("Failed to remove flow from
Flow Registry due to " + e2.getMessage(), e2);
+ }
}
throw e;