tpalfy commented on code in PR #8377: URL: https://github.com/apache/nifi/pull/8377#discussion_r1525306917
########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java: ########## @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.ValidationState; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.controller.PropertyConfiguration; +import org.apache.nifi.controller.TerminationAwareLogger; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.flowanalysis.FlowAnalyzer; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ParameterProviderReference; +import org.apache.nifi.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.parameter.ParameterLookup; +import org.apache.nifi.parameter.ParameterUpdate; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; +import org.apache.nifi.validation.RuleViolation; +import org.apache.nifi.validation.RuleViolationsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public final class FlowAnalyzingRegistryClientNode implements FlowRegistryClientNode { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowAnalyzingRegistryClientNode.class); + + private final FlowRegistryClientNode node; + private final ControllerServiceProvider serviceProvider; + private final FlowAnalyzer flowAnalyzer; + private final RuleViolationsManager ruleViolationsManager; + private final FlowManager flowManager; + private final Supplier<NiFiRegistryFlowMapper> flowMapperSupplier; + + public FlowAnalyzingRegistryClientNode( + final FlowRegistryClientNode node, + final ControllerServiceProvider serviceProvider, + final FlowAnalyzer flowAnalyzer, + final RuleViolationsManager ruleViolationsManager, + final FlowManager flowManager, + final Supplier<NiFiRegistryFlowMapper> flowMapperSupplier Review Comment: I don't see a reason for this to be a Supplier. The mapper the flow analysis requires an implementation that doesn't rely on any configuration that can change runtime. So we can just create it and pass on. ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java: ########## @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.ValidationState; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.controller.PropertyConfiguration; +import org.apache.nifi.controller.TerminationAwareLogger; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.flowanalysis.FlowAnalyzer; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ParameterProviderReference; +import org.apache.nifi.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.parameter.ParameterLookup; +import org.apache.nifi.parameter.ParameterUpdate; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; +import org.apache.nifi.validation.RuleViolation; +import org.apache.nifi.validation.RuleViolationsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public final class FlowAnalyzingRegistryClientNode implements FlowRegistryClientNode { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowAnalyzingRegistryClientNode.class); + + private final FlowRegistryClientNode node; + private final ControllerServiceProvider serviceProvider; + private final FlowAnalyzer flowAnalyzer; + private final RuleViolationsManager ruleViolationsManager; + private final FlowManager flowManager; + private final Supplier<NiFiRegistryFlowMapper> flowMapperSupplier; + + public FlowAnalyzingRegistryClientNode( + final FlowRegistryClientNode node, + final ControllerServiceProvider serviceProvider, + final FlowAnalyzer flowAnalyzer, + final RuleViolationsManager ruleViolationsManager, + final FlowManager flowManager, + final Supplier<NiFiRegistryFlowMapper> flowMapperSupplier + ) { + this.node = Objects.requireNonNull(node); + this.serviceProvider = Objects.requireNonNull(serviceProvider); + this.flowAnalyzer = Objects.requireNonNull(flowAnalyzer); + this.ruleViolationsManager = Objects.requireNonNull(ruleViolationsManager); + this.flowManager = Objects.requireNonNull(flowManager); + this.flowMapperSupplier = Objects.requireNonNull(flowMapperSupplier); + } + + @Override + public RegisteredFlowSnapshot registerFlowSnapshot( + final FlowRegistryClientUserContext context, + final RegisteredFlow flow, + final VersionedProcessGroup snapshot, + final Map<String, ExternalControllerServiceReference> externalControllerServices, + final Map<String, VersionedParameterContext> parameterContexts, + final Map<String, ParameterProviderReference> parameterProviderReferences, + final String comments, + final int expectedVersion + ) throws FlowRegistryException, IOException { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Snapshot for flow {} is checked for violations before commit", snapshot.getInstanceIdentifier()); + } + + if (analyzeProcessGroupToRegister(snapshot)) { + return node.registerFlowSnapshot(context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion); + } else { + throw new ViolatingFlowException("Cannot store flow version to registry due to rules violations"); Review Comment: Currently on the UI we get this error message: ``` Failed to register flow with Flow Registry due to Cannot store flow version to registry due to rules violations ``` which is a bit convoluted, something like this sounds more natural: ``` Failed to register flow with Flow Registry due to There are unresolved rule violations ``` ```suggestion throw new InvalidFlowException("There are unresolved rule violations"); ``` ########## nifi-api/src/main/java/org/apache/nifi/registry/flow/ViolatingFlowException.java: ########## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +public class ViolatingFlowException extends FlowRegistryException { + public ViolatingFlowException(final String message) { Review Comment: 'Violating' is not really an expression we use normally as an adjective so it's use like this feels unnatural. I propose the name `InvalidFlowException` that has the added benefit of being more general, in case we later add other methods to validate flows. ```suggestion public InvalidFlowException(final String message) { ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java: ########## @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +import org.apache.nifi.authorization.Resource; +import org.apache.nifi.authorization.resource.Authorizable; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.validation.ValidationState; +import org.apache.nifi.components.validation.ValidationStatus; +import org.apache.nifi.controller.LoggableComponent; +import org.apache.nifi.controller.PropertyConfiguration; +import org.apache.nifi.controller.TerminationAwareLogger; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.flowanalysis.FlowAnalyzer; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.flow.ExternalControllerServiceReference; +import org.apache.nifi.flow.ParameterProviderReference; +import org.apache.nifi.flow.VersionedParameterContext; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.parameter.ParameterLookup; +import org.apache.nifi.parameter.ParameterUpdate; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; +import org.apache.nifi.validation.RuleViolation; +import org.apache.nifi.validation.RuleViolationsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public final class FlowAnalyzingRegistryClientNode implements FlowRegistryClientNode { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowAnalyzingRegistryClientNode.class); + + private final FlowRegistryClientNode node; + private final ControllerServiceProvider serviceProvider; + private final FlowAnalyzer flowAnalyzer; + private final RuleViolationsManager ruleViolationsManager; + private final FlowManager flowManager; + private final Supplier<NiFiRegistryFlowMapper> flowMapperSupplier; + + public FlowAnalyzingRegistryClientNode( + final FlowRegistryClientNode node, + final ControllerServiceProvider serviceProvider, + final FlowAnalyzer flowAnalyzer, + final RuleViolationsManager ruleViolationsManager, + final FlowManager flowManager, + final Supplier<NiFiRegistryFlowMapper> flowMapperSupplier + ) { + this.node = Objects.requireNonNull(node); + this.serviceProvider = Objects.requireNonNull(serviceProvider); + this.flowAnalyzer = Objects.requireNonNull(flowAnalyzer); + this.ruleViolationsManager = Objects.requireNonNull(ruleViolationsManager); + this.flowManager = Objects.requireNonNull(flowManager); + this.flowMapperSupplier = Objects.requireNonNull(flowMapperSupplier); + } + + @Override + public RegisteredFlowSnapshot registerFlowSnapshot( + final FlowRegistryClientUserContext context, + final RegisteredFlow flow, + final VersionedProcessGroup snapshot, + final Map<String, ExternalControllerServiceReference> externalControllerServices, + final Map<String, VersionedParameterContext> parameterContexts, + final Map<String, ParameterProviderReference> parameterProviderReferences, + final String comments, + final int expectedVersion + ) throws FlowRegistryException, IOException { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Snapshot for flow {} is checked for violations before commit", snapshot.getInstanceIdentifier()); + } + + if (analyzeProcessGroupToRegister(snapshot)) { + return node.registerFlowSnapshot(context, flow, snapshot, externalControllerServices, parameterContexts, parameterProviderReferences, comments, expectedVersion); + } else { + throw new ViolatingFlowException("Cannot store flow version to registry due to rules violations"); + } + } + + private boolean analyzeProcessGroupToRegister(final VersionedProcessGroup snapshot) { + final NiFiRegistryFlowMapper mapper = flowMapperSupplier.get(); + final InstantiatedVersionedProcessGroup nonVersionedProcessGroup = mapper.mapNonVersionedProcessGroup(flowManager.getGroup(snapshot.getInstanceIdentifier()), serviceProvider); + + flowAnalyzer.analyzeProcessGroup(nonVersionedProcessGroup); + final Collection<RuleViolation> ruleViolations = ruleViolationsManager.getRuleViolationsForGroup(snapshot.getInstanceIdentifier()); Review Comment: We probably want only violations with ENFORCE policy to block committing to Registry. ```suggestion final List<RuleViolation> enforcingRuleViolations = ruleViolationsManager.getRuleViolationsForGroup(snapshot.getInstanceIdentifier()).stream() .filter(ruleViolation -> EnforcementPolicy.ENFORCE.equals(ruleViolation.getEnforcementPolicy())) .collect(Collectors.toList()); ``` ########## nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java: ########## @@ -1826,6 +1830,17 @@ public Set<String> getDirectSubsequentTokens(final String prefix) { .collect(Collectors.toSet()); } + /** + * @return Returns true if NiFi should execute flow analysis check on the process group before committing it to a registry. Returns + * false otherwise. + */ + public boolean shouldCheckForRulesViolationWhenRegistryCommit() { + final String shouldCheckForRulesViolationWhenRegistryCommit = getProperty( + CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT, + DEFAULT_CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT.toString()); + return Boolean.parseBoolean(shouldCheckForRulesViolationWhenRegistryCommit); + } Review Comment: ```suggestion public boolean nifiRegistryCheckForRuleViolationsBeforeCommit() { final String nifiRegistryCheckForRuleViolationsBeforeCommit = getProperty( NIFI_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT, DEFAULT_NIFI_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT.toString()); if (!"true".equalsIgnoreCase(nifiRegistryCheckForRuleViolationsBeforeCommit) && !"false".equalsIgnoreCase(nifiRegistryCheckForRuleViolationsBeforeCommit)) { throw new RuntimeException(String.format("%s was '%s', expected true or false", NiFiProperties.ZOOKEEPER_CLIENT_ENSEMBLE_TRACKER, nifiRegistryCheckForRuleViolationsBeforeCommit)); } return Boolean.parseBoolean(nifiRegistryCheckForRuleViolationsBeforeCommit); } ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.flowanalysis.FlowAnalyzer; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; +import org.apache.nifi.validation.RuleViolation; +import org.apache.nifi.validation.RuleViolationsManager; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.UUID; +import java.util.function.Supplier; + + +@ExtendWith(MockitoExtension.class) +class FlowAnalyzingRegistryClientNodeTest { + private final static String INSTANCE_IDENTIFIER = UUID.randomUUID().toString(); + private final static String COMMENT_TEXT = "comment"; + private final static int EXPECTED_VERSION = 3; + + @Mock + FlowRegistryClientNode node; + + @Mock + ControllerServiceProvider serviceProvider; + + @Mock + FlowAnalyzer flowAnalyzer; + + @Mock + RuleViolationsManager ruleViolationsManager; + + @Mock + FlowManager flowManager; + + @Mock + Supplier<NiFiRegistryFlowMapper> flowMapperSupplier; + + @Mock + NiFiRegistryFlowMapper flowMapper; + + @Mock + InstantiatedVersionedProcessGroup nonVersionedProcessGroup; + + @Mock + ProcessGroup processGroup; + + @Mock + VersionedProcessGroup versionedProcessGroup; + + private final FlowRegistryClientUserContext context = new StandardFlowRegistryClientUserContext(); + private final RegisteredFlow flow = new RegisteredFlow(); + + @BeforeEach + public void setUp() { + Mockito.when(versionedProcessGroup.getInstanceIdentifier()).thenReturn(INSTANCE_IDENTIFIER); + Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(processGroup); + Mockito.when(flowMapperSupplier.get()).thenReturn(flowMapper); + Mockito.when(flowMapper.mapNonVersionedProcessGroup(Mockito.same(processGroup), Mockito.same(serviceProvider))).thenReturn(nonVersionedProcessGroup); + } + + @Test + public void testWhenNoViolationFound() throws IOException, FlowRegistryException { Review Comment: Test reports can help a lot understanding the various use cases if the method names describe the cases properly. The 'test' prefix on the other hand has no meaningful purpose and although doesn't hurt much, if the method name would become too long, it's better to omit it. ```suggestion public void allowFlowRegistrationWhenNoEnforcingViolationFound() throws IOException, FlowRegistryException { ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java: ########## @@ -415,11 +415,21 @@ public FlowRegistryClientNode createFlowRegistryClient( .reloadComponent(flowController.getReloadComponent()) .addClasspathUrls(additionalUrls) .kerberosConfig(flowController.createKerberosConfig(nifiProperties)) - .flowController(flowController) .systemSslContext(systemSslContext) .extensionManager(extensionManager) .classloaderIsolationKey(classloaderIsolationKey) - .buildFlowRegistryClient(); + .flowAnalysisAtRegistryCommit(nifiProperties.shouldCheckForRulesViolationWhenRegistryCommit()); + + if (getFlowAnalyzer().isPresent()) { + extensionBuilder.flowAnalyzer(getFlowAnalyzer().get()); + } + + if (getRuleViolationsManager().isPresent()) { + extensionBuilder.ruleViolationsManager(getRuleViolationsManager().get()); + } + + final FlowRegistryClientNode clientNode = + extensionBuilder.buildFlowRegistryClient(); Review Comment: ```suggestion .flowAnalyzer(getFlowAnalyzer().orElse(null)) .ruleViolationsManager(getRuleViolationsManager().orElse(null)) ``` ########## nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java: ########## @@ -1826,6 +1830,17 @@ public Set<String> getDirectSubsequentTokens(final String prefix) { .collect(Collectors.toSet()); } + /** + * @return Returns true if NiFi should execute flow analysis check on the process group before committing it to a registry. Returns Review Comment: ```suggestion * @return Returns true if NiFi should execute flow analysis on the process group before committing it to a registry. Returns ``` ########## nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java: ########## @@ -300,6 +300,9 @@ public class NiFiProperties extends ApplicationProperties { // flow analysis properties public static final String BACKGROUND_FLOW_ANALYSIS_SCHEDULE = "nifi.flow.analysis.background.task.schedule"; + // registry client properties + public static final String CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT = "nifi.registry.check.for.rules.violation"; Review Comment: This doesn't seem to grasp the concept properly. It indicates only the fact that violations can be important but doesn't tell why or what the consequences are. ```suggestion public static final String NIFI_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT = "nifi.registry.check.for.rule.violations.before.commit"; ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNodeTest.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.flow; + +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.flowanalysis.FlowAnalyzer; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup; +import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper; +import org.apache.nifi.validation.RuleViolation; +import org.apache.nifi.validation.RuleViolationsManager; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.UUID; +import java.util.function.Supplier; + + +@ExtendWith(MockitoExtension.class) +class FlowAnalyzingRegistryClientNodeTest { + private final static String INSTANCE_IDENTIFIER = UUID.randomUUID().toString(); + private final static String COMMENT_TEXT = "comment"; + private final static int EXPECTED_VERSION = 3; + + @Mock + FlowRegistryClientNode node; + + @Mock + ControllerServiceProvider serviceProvider; + + @Mock + FlowAnalyzer flowAnalyzer; + + @Mock + RuleViolationsManager ruleViolationsManager; + + @Mock + FlowManager flowManager; + + @Mock + Supplier<NiFiRegistryFlowMapper> flowMapperSupplier; + + @Mock + NiFiRegistryFlowMapper flowMapper; + + @Mock + InstantiatedVersionedProcessGroup nonVersionedProcessGroup; + + @Mock + ProcessGroup processGroup; + + @Mock + VersionedProcessGroup versionedProcessGroup; + + private final FlowRegistryClientUserContext context = new StandardFlowRegistryClientUserContext(); + private final RegisteredFlow flow = new RegisteredFlow(); + + @BeforeEach + public void setUp() { + Mockito.when(versionedProcessGroup.getInstanceIdentifier()).thenReturn(INSTANCE_IDENTIFIER); + Mockito.when(flowManager.getGroup(Mockito.anyString())).thenReturn(processGroup); + Mockito.when(flowMapperSupplier.get()).thenReturn(flowMapper); + Mockito.when(flowMapper.mapNonVersionedProcessGroup(Mockito.same(processGroup), Mockito.same(serviceProvider))).thenReturn(nonVersionedProcessGroup); + } + + @Test + public void testWhenNoViolationFound() throws IOException, FlowRegistryException { + Mockito.when(ruleViolationsManager.getRuleViolationsForGroup(Mockito.anyString())).thenReturn(Collections.emptyList()); + final FlowAnalyzingRegistryClientNode testSubject = new FlowAnalyzingRegistryClientNode(node, serviceProvider, flowAnalyzer, ruleViolationsManager, flowManager, flowMapperSupplier); + + testSubject.registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); + + Mockito + .verify(node, Mockito.only()) + .registerFlowSnapshot(context, flow, versionedProcessGroup, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), COMMENT_TEXT, EXPECTED_VERSION); + } + + @Test + public void testWhenViolationFound() throws IOException, FlowRegistryException { Review Comment: ```suggestion public void preventFlowRegistrationWhenEnforcingViolationFound() throws IOException, FlowRegistryException { ``` ########## nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java: ########## @@ -401,6 +404,7 @@ public class NiFiProperties extends ApplicationProperties { private static final String DEFAULT_SECURITY_USER_JWS_KEY_ROTATION_PERIOD = "PT1H"; public static final String DEFAULT_WEB_SHOULD_SEND_SERVER_VERSION = "true"; public static final int DEFAULT_LISTENER_BOOTSTRAP_PORT = 0; + public static final Boolean DEFAULT_CHECK_FOL_RULES_VIOLATION_WHEN_REGISTRY_COMMIT = false; Review Comment: ```suggestion public static final Boolean DEFAULT_NIFI_REGISTRY_CHECK_FOR_RULE_VIOLATIONS_BEFORE_COMMIT = false; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
