This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 19993783e1 NIFI-12203: Ensure that when inheriting cluster flow on
startup, we remove any Controller Services, Reporting Tasks, Parameter
Contexts, etc. that are not in the proposed flow. Also removed overly
aggressive timeout from RunOnceIT and performed minor code cleanup
19993783e1 is described below
commit 19993783e1bbc3a4590f103d24922b31da870b31
Author: Mark Payne <[email protected]>
AuthorDate: Thu Oct 19 13:43:58 2023 -0400
NIFI-12203: Ensure that when inheriting cluster flow on startup, we remove
any Controller Services, Reporting Tasks, Parameter Contexts, etc. that are not
in the proposed flow. Also removed overly aggressive timeout from RunOnceIT and
performed minor code cleanup
Resolves #7907
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/controller/flow/AbstractFlowManager.java | 4 +-
.../apache/nifi/controller/flow/FlowManager.java | 4 +-
.../nifi/controller/flow/StandardFlowManager.java | 33 +++----
.../inheritance/MissingComponentsCheck.java | 59 +++++++++++-
.../serialization/VersionedFlowSynchronizer.java | 105 +++++++++++++++++++--
.../apache/nifi/controller/TestFlowController.java | 59 ------------
.../nifi/web/dao/impl/StandardFlowRegistryDAO.java | 2 +-
.../stateless/engine/StatelessFlowManager.java | 2 +-
.../org/apache/nifi/tests/system/NiFiSystemIT.java | 2 +-
.../system/clustering/FlowSynchronizationIT.java | 43 ---------
...ow.java => JoinClusterWithDifferentFlowIT.java} | 41 +++++---
.../nifi/tests/system/processor/RunOnceIT.java | 22 +----
.../resources/conf/clustered/node1/nifi.properties | 2 +-
.../resources/conf/clustered/node2/nifi.properties | 2 +-
.../resources/flows/mismatched-flows/flow1.json.gz | Bin 4273 -> 4273 bytes
.../resources/flows/mismatched-flows/flow2.json.gz | Bin 4252 -> 4251 bytes
16 files changed, 211 insertions(+), 169 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
index 176a0d01cf..0adf521042 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
@@ -25,8 +25,8 @@ import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
-import org.apache.nifi.controller.ParameterProviderNode;
import org.apache.nifi.controller.FlowAnalysisRuleNode;
+import org.apache.nifi.controller.ParameterProviderNode;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
@@ -304,7 +304,7 @@ public abstract class AbstractFlowManager implements
FlowManager {
getAllFlowAnalysisRules().forEach(this::removeFlowAnalysisRule);
getAllParameterProviders().forEach(this::removeParameterProvider);
-
getAllFlowRegistryClients().forEach(this::removeFlowRegistryClientNode);
+ getAllFlowRegistryClients().forEach(this::removeFlowRegistryClient);
for (final ParameterContext parameterContext :
parameterContextManager.getParameterContexts()) {
parameterContextManager.removeParameterContext(parameterContext.getIdentifier());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
index a7fb472b4a..c0543ef719 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
@@ -36,9 +36,9 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterContextManager;
-import org.apache.nifi.validation.RuleViolationsManager;
import org.apache.nifi.parameter.ParameterProviderConfiguration;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
+import org.apache.nifi.validation.RuleViolationsManager;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import java.net.URL;
@@ -331,7 +331,7 @@ public interface FlowManager extends
ParameterProviderLookup {
FlowRegistryClientNode getFlowRegistryClient(String id);
- void removeFlowRegistryClientNode(FlowRegistryClientNode clientNode);
+ void removeFlowRegistryClient(FlowRegistryClientNode clientNode);
Set<FlowRegistryClientNode> getAllFlowRegistryClients();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index 69599dc0fd..9eb800b2b2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -16,21 +16,6 @@
*/
package org.apache.nifi.controller.flow;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -108,6 +93,22 @@ import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLContext;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
import static java.util.Objects.requireNonNull;
public class StandardFlowManager extends AbstractFlowManager implements
FlowManager {
@@ -451,7 +452,7 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
}
@Override
- public void removeFlowRegistryClientNode(final FlowRegistryClientNode
clientNode) {
+ public void removeFlowRegistryClient(final FlowRegistryClientNode
clientNode) {
final FlowRegistryClientNode existing =
getFlowRegistryClient(clientNode.getIdentifier());
if (existing == null || existing != clientNode) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/inheritance/MissingComponentsCheck.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/inheritance/MissingComponentsCheck.java
index fd7984b0d8..dba0a619d3 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/inheritance/MissingComponentsCheck.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/inheritance/MissingComponentsCheck.java
@@ -19,7 +19,12 @@ package org.apache.nifi.controller.inheritance;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -33,7 +38,13 @@ public class MissingComponentsCheck implements
FlowInheritabilityCheck {
final Set<String> existingMissingComponents = new
HashSet<>(existingFlow.getMissingComponents());
existingMissingComponents.removeAll(proposedFlow.getMissingComponents());
- if (existingMissingComponents.size() > 0) {
+ if (!existingMissingComponents.isEmpty()) {
+ // Do not consider any components that are not present in the
proposed flow.
+ final Set<String> versionedComponentIds =
getAllComponentIds(proposedFlow.getVersionedDataflow());
+ existingMissingComponents.retainAll(versionedComponentIds);
+ }
+
+ if (!existingMissingComponents.isEmpty()) {
final String missingIds =
StringUtils.join(existingMissingComponents, ",");
return FlowInheritability.notInheritable("Current flow has missing
components that are not considered missing in the proposed flow (" + missingIds
+ ")");
}
@@ -41,11 +52,55 @@ public class MissingComponentsCheck implements
FlowInheritabilityCheck {
final Set<String> proposedMissingComponents = new
HashSet<>(proposedFlow.getMissingComponents());
proposedMissingComponents.removeAll(existingFlow.getMissingComponents());
- if (proposedMissingComponents.size() > 0) {
+ if (!proposedMissingComponents.isEmpty()) {
+ // Do not consider any components that are not present in the
current flow.
+ final Set<String> versionedComponentIds =
getAllComponentIds(existingFlow.getVersionedDataflow());
+ proposedMissingComponents.retainAll(versionedComponentIds);
+ }
+ if (!proposedMissingComponents.isEmpty()) {
final String missingIds =
StringUtils.join(proposedMissingComponents, ",");
return FlowInheritability.notInheritable("Proposed flow has
missing components that are not considered missing in the current flow (" +
missingIds + ")");
}
return FlowInheritability.inheritable();
}
+
+ private Set<String> getAllComponentIds(final VersionedDataflow dataflow) {
+ if (dataflow == null) {
+ return Collections.emptySet();
+ }
+
+ final Set<String> ids = new HashSet<>();
+ findAllComponentIds(dataflow, ids);
+ return ids;
+ }
+
+ private void findAllComponentIds(final VersionedDataflow dataflow, final
Set<String> ids) {
+ addAllIds(dataflow.getControllerServices(), ids);
+ addAllIds(dataflow.getRegistries(), ids);
+ addAllIds(dataflow.getFlowAnalysisRules(), ids);
+ addAllIds(dataflow.getParameterContexts(), ids);
+ addAllIds(dataflow.getParameterProviders(), ids);
+ addAllIds(dataflow.getReportingTasks(), ids);
+
+ findAllComponentIds(dataflow.getRootGroup(), ids);
+ }
+
+ private void findAllComponentIds(final VersionedProcessGroup group, final
Set<String> ids) {
+ if (group == null) {
+ return;
+ }
+
+ addAllIds(group.getControllerServices(), ids);
+ addAllIds(group.getProcessors(), ids);
+
+ group.getProcessGroups().forEach(child -> findAllComponentIds(child,
ids));
+ }
+
+ private void addAllIds(final Collection<? extends VersionedComponent>
components, final Set<String> ids) {
+ if (components != null) {
+ components.forEach(component ->
ids.add(component.getInstanceIdentifier()));
+ }
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 2f5b382bcc..0aaaafc73c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -119,7 +119,6 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
@@ -419,7 +418,7 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
inheritParameterContexts(controller, versionedFlow);
inheritReportingTasks(controller, versionedFlow,
affectedComponentSet);
inheritFlowAnalysisRules(controller, versionedFlow,
affectedComponentSet);
- inheritRegistries(controller, versionedFlow,
affectedComponentSet);
+ inheritRegistryClients(controller, versionedFlow,
affectedComponentSet);
final ComponentIdGenerator componentIdGenerator = (proposedId,
instanceId, destinationGroupId) -> instanceId;
@@ -467,6 +466,8 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
inheritSnippets(controller, proposedFlow);
inheritAuthorizations(existingFlow, proposedFlow, controller);
+
+ removeMissingParameterContexts(controller, versionedFlow);
} catch (final Exception ex) {
throw new FlowSynchronizationException(ex);
}
@@ -548,11 +549,13 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
return affectedComponentSet;
}
- private void inheritRegistries(final FlowController controller, final
VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) {
- final FlowManager flowManger = controller.getFlowManager();
+ private void inheritRegistryClients(final FlowController controller, final
VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) {
+ final FlowManager flowManager = controller.getFlowManager();
+ final Set<String> versionedClientIds = new HashSet<>();
for (final VersionedFlowRegistryClient versionedFlowRegistryClient :
dataflow.getRegistries()) {
- final FlowRegistryClientNode existing =
flowManger.getFlowRegistryClient(versionedFlowRegistryClient.getIdentifier());
+
versionedClientIds.add(versionedFlowRegistryClient.getInstanceIdentifier());
+ final FlowRegistryClientNode existing =
flowManager.getFlowRegistryClient(versionedFlowRegistryClient.getIdentifier());
if (existing == null) {
addFlowRegistryClient(controller, versionedFlowRegistryClient);
@@ -560,6 +563,12 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
updateRegistry(existing, versionedFlowRegistryClient,
controller);
}
}
+
+ for (final FlowRegistryClientNode clientNode :
flowManager.getAllFlowRegistryClients()) {
+ if (!versionedClientIds.contains(clientNode.getIdentifier())) {
+ flowManager.removeFlowRegistryClient(clientNode);
+ }
+ }
}
private void addFlowRegistryClient(final FlowController flowController,
final VersionedFlowRegistryClient versionedFlowRegistryClient) {
@@ -581,7 +590,9 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
}
private void inheritReportingTasks(final FlowController controller, final
VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet)
throws ReportingTaskInstantiationException {
+ final Set<String> versionedTaskIds = new HashSet<>();
for (final VersionedReportingTask versionedReportingTask :
dataflow.getReportingTasks()) {
+
versionedTaskIds.add(versionedReportingTask.getInstanceIdentifier());
final ReportingTaskNode existing =
controller.getReportingTaskNode(versionedReportingTask.getInstanceIdentifier());
if (existing == null) {
addReportingTask(controller, versionedReportingTask);
@@ -589,6 +600,12 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
updateReportingTask(existing, versionedReportingTask,
controller);
}
}
+
+ for (final ReportingTaskNode reportingTask :
controller.getAllReportingTasks()) {
+ if (!versionedTaskIds.contains(reportingTask.getIdentifier())) {
+ controller.removeReportingTask(reportingTask);
+ }
+ }
}
private void addReportingTask(final FlowController controller, final
VersionedReportingTask reportingTask) throws
ReportingTaskInstantiationException {
@@ -647,7 +664,10 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
return;
}
+ final Set<String> versionedAnalysisRuleIds = new HashSet<>();
for (final VersionedFlowAnalysisRule versionedFlowAnalysisRule :
dataflow.getFlowAnalysisRules()) {
+
versionedAnalysisRuleIds.add(versionedFlowAnalysisRule.getInstanceIdentifier());
+
final FlowAnalysisRuleNode existing =
controller.getFlowAnalysisRuleNode(versionedFlowAnalysisRule.getInstanceIdentifier());
if (existing == null) {
addFlowAnalysisRule(controller, versionedFlowAnalysisRule);
@@ -655,6 +675,12 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
updateFlowAnalysisRule(existing, versionedFlowAnalysisRule,
controller);
}
}
+
+ for (final FlowAnalysisRuleNode ruleNode :
controller.getAllFlowAnalysisRules()) {
+ if (!versionedAnalysisRuleIds.contains(ruleNode.getIdentifier())) {
+ controller.removeFlowAnalysisRule(ruleNode);
+ }
+ }
}
private void addFlowAnalysisRule(final FlowController controller, final
VersionedFlowAnalysisRule flowAnalysisRule) throws
FlowAnalysisRuleInstantiationException {
@@ -692,8 +718,13 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
return;
}
+ final FlowManager flowManager = controller.getFlowManager();
+
+ final Set<String> versionedProviderIds = new HashSet<>();
for (final VersionedParameterProvider versionedParameterProvider :
dataflow.getParameterProviders()) {
- final ParameterProviderNode existing =
controller.getFlowManager().getParameterProvider(versionedParameterProvider.getInstanceIdentifier());
+
versionedProviderIds.add(versionedParameterProvider.getInstanceIdentifier());
+
+ final ParameterProviderNode existing =
flowManager.getParameterProvider(versionedParameterProvider.getInstanceIdentifier());
if (existing == null) {
addParameterProvider(controller, versionedParameterProvider,
controller.getEncryptor());
} else if
(affectedComponentSet.isParameterProviderAffected(existing.getIdentifier())) {
@@ -701,6 +732,11 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
}
}
+ for (final ParameterProviderNode parameterProvider :
flowManager.getAllParameterProviders()) {
+ if
(!versionedProviderIds.contains(parameterProvider.getIdentifier())) {
+ flowManager.removeParameterProvider(parameterProvider);
+ }
+ }
}
private void addParameterProvider(final FlowController controller, final
VersionedParameterProvider parameterProvider, final PropertyEncryptor
encryptor) {
@@ -721,15 +757,40 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
parameterProviderNode.setProperties(decryptedProperties);
}
+ private void removeMissingParameterContexts(final FlowController
controller, final VersionedDataflow dataflow) {
+ controller.getFlowManager().withParameterContextResolution(() -> {
+ final ParameterContextManager contextManager =
controller.getFlowManager().getParameterContextManager();
+ removeMissingParameterContexts(contextManager, dataflow);
+ });
+ }
+
+ private void removeMissingParameterContexts(final ParameterContextManager
contextManager, final VersionedDataflow dataflow) {
+ if (dataflow == null) {
+ return;
+ }
+
+ // Build mapping of name to context for resolution of inherited
contexts
+ final List<VersionedParameterContext> proposedParameterContexts =
dataflow.getParameterContexts();
+ final Set<String> proposedContextNames =
proposedParameterContexts.stream()
+ .map(VersionedParameterContext::getName)
+ .collect(Collectors.toSet());
+
+ final Map<String, ParameterContext> existingContextsByName =
contextManager.getParameterContextNameMapping();
+ for (final Map.Entry<String, ParameterContext> entry :
existingContextsByName.entrySet()) {
+ if (!proposedContextNames.contains(entry.getKey())) {
+
contextManager.removeParameterContext(entry.getValue().getIdentifier());
+ }
+ }
+ }
+
private void inheritParameterContexts(final FlowController controller,
final VersionedDataflow dataflow) {
controller.getFlowManager().withParameterContextResolution(() -> {
final List<VersionedParameterContext> parameterContexts =
dataflow.getParameterContexts();
// Build mapping of name to context for resolution of inherited
contexts
- final Map<String, VersionedParameterContext>
namedParameterContexts = parameterContexts.stream()
- .collect(
-
Collectors.toMap(VersionedParameterContext::getName, Function.identity())
- );
+ final Map<String, VersionedParameterContext>
namedParameterContexts = new HashMap<>();
+ parameterContexts.forEach(context ->
namedParameterContexts.put(context.getName(), context));
+
for (final VersionedParameterContext versionedParameterContext :
parameterContexts) {
inheritParameterContext(versionedParameterContext,
controller.getFlowManager(), namedParameterContexts, controller.getEncryptor());
}
@@ -937,6 +998,30 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
if (!toDisable.isEmpty()) {
controller.getControllerServiceProvider().disableControllerServicesAsync(toDisable);
}
+
+ removeMissingServices(controller, dataflow);
+ }
+
+ private void removeMissingServices(final FlowController controller, final
VersionedDataflow dataflow) {
+ if (dataflow == null) {
+ return;
+ }
+
+ final Set<String> retainedServiceIds =
dataflow.getControllerServices().stream()
+ .map(VersionedControllerService::getInstanceIdentifier)
+ .collect(Collectors.toSet());
+
+ final List<ControllerServiceNode> toRemove =
controller.getFlowManager().getRootControllerServices().stream()
+ .filter(service ->
!retainedServiceIds.contains(service.getIdentifier()))
+ .toList();
+
+ for (final ControllerServiceNode serviceToRemove : toRemove) {
+ try {
+
controller.getFlowManager().removeRootControllerService(serviceToRemove);
+ } catch (final Exception e) {
+ throw new IllegalStateException("Inherited Dataflow does not
have Controller-Level Controller Service %s but failed to remove it from
flow".formatted(serviceToRemove), e);
+ }
+ }
}
private ControllerServiceNode addRootControllerService(final
FlowController controller, final VersionedControllerService
versionedControllerService) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index f4b504e774..963f7fd9c6 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -34,7 +34,6 @@ import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
-import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
import
org.apache.nifi.controller.parameter.ParameterProviderInstantiationException;
@@ -103,7 +102,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -611,63 +609,6 @@ public class TestFlowController {
assertEquals(authFingerprint, authorizer.getFingerprint());
}
- @Test
- public void testSynchronizeFlowWhenProposedMissingComponentsAreDifferent()
{
- final Set<String> missingComponents = new HashSet<>();
- missingComponents.add("1");
- missingComponents.add("2");
-
- final DataFlow proposedDataFlow = mock(DataFlow.class);
-
when(proposedDataFlow.getMissingComponents()).thenReturn(missingComponents);
-
- UninheritableFlowException uninheritableFlowException =
- assertThrows(UninheritableFlowException.class,
- () -> controller.synchronize(flowSynchronizer,
proposedDataFlow, mock(FlowService.class), BundleUpdateStrategy.IGNORE_BUNDLE));
- assertTrue(uninheritableFlowException.getMessage().contains("Proposed
flow has missing components " +
- "that are not considered missing in the current flow (1,2)"),
uninheritableFlowException.getMessage());
- }
-
- @Test
- public void testSynchronizeFlowWhenExistingMissingComponentsAreDifferent()
throws IOException {
- final ProcessorNode mockProcessorNode = mock(ProcessorNode.class);
- when(mockProcessorNode.getIdentifier()).thenReturn("1");
- when(mockProcessorNode.isExtensionMissing()).thenReturn(true);
-
- final ControllerServiceNode mockControllerServiceNode =
mock(ControllerServiceNode.class);
- when(mockControllerServiceNode.getIdentifier()).thenReturn("2");
- when(mockControllerServiceNode.isExtensionMissing()).thenReturn(true);
-
- final ReportingTaskNode mockReportingTaskNode =
mock(ReportingTaskNode.class);
- when(mockReportingTaskNode.getIdentifier()).thenReturn("3");
- when(mockReportingTaskNode.isExtensionMissing()).thenReturn(true);
-
- final ProcessGroup mockRootGroup = mock(ProcessGroup.class);
-
when(mockRootGroup.findAllProcessors()).thenReturn(Collections.singletonList(mockProcessorNode));
-
- final SnippetManager mockSnippetManager = mock(SnippetManager.class);
- when(mockSnippetManager.export()).thenReturn(new byte[0]);
-
- final FlowManager flowManager = mock(FlowManager.class);
-
- final FlowController mockFlowController = mock(FlowController.class);
- when(mockFlowController.getFlowManager()).thenReturn(flowManager);
-
- when(flowManager.getRootGroup()).thenReturn(mockRootGroup);
- when(flowManager.getAllControllerServices()).thenReturn(new
HashSet<>(Arrays.asList(mockControllerServiceNode)));
- when(flowManager.getAllReportingTasks()).thenReturn(new
HashSet<>(Arrays.asList(mockReportingTaskNode)));
- when(mockFlowController.getAuthorizer()).thenReturn(authorizer);
-
when(mockFlowController.getSnippetManager()).thenReturn(mockSnippetManager);
-
- final DataFlow proposedDataFlow = mock(DataFlow.class);
- when(proposedDataFlow.getMissingComponents()).thenReturn(new
HashSet<>());
- UninheritableFlowException uninheritableFlowException =
- assertThrows(UninheritableFlowException.class,
- () -> flowSynchronizer.sync(mockFlowController,
proposedDataFlow,
- mock(FlowService.class),
BundleUpdateStrategy.IGNORE_BUNDLE));
- assertTrue(uninheritableFlowException.getMessage().contains("Current
flow has missing components that are not" +
- " considered missing in the proposed flow (1,2,3)"),
uninheritableFlowException.getMessage());
- }
-
@Test
public void testSynchronizeFlowWhenBundlesAreSame() throws IOException {
final LogRepository logRepository =
LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df");
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
index 00671acd51..a9a24bffb1 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
@@ -177,7 +177,7 @@ public class StandardFlowRegistryDAO extends ComponentDAO
implements FlowRegistr
throw new IllegalArgumentException("The specified registry id is
unknown to this NiFi.");
}
-
flowController.getFlowManager().removeFlowRegistryClientNode(flowRegistry);
+ flowController.getFlowManager().removeFlowRegistryClient(flowRegistry);
return flowRegistry;
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
index a6d7bffa13..92e133cca5 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
@@ -352,7 +352,7 @@ public class StatelessFlowManager extends
AbstractFlowManager implements FlowMan
}
@Override
- public void removeFlowRegistryClientNode(FlowRegistryClientNode
clientNode) {
+ public void removeFlowRegistryClient(FlowRegistryClientNode clientNode) {
throw new UnsupportedOperationException("Removing Flow Registry Client
is not supported in Stateless NiFi");
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 227bcf9647..e816c2d000 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -255,7 +255,7 @@ public abstract class NiFiSystemIT implements
NiFiInstanceProvider {
}
protected void waitForAllNodesConnected(final int expectedNumberOfNodes) {
- waitForAllNodesConnected(expectedNumberOfNodes, 1000L);
+ waitForAllNodesConnected(expectedNumberOfNodes, 500L);
}
protected void waitForAllNodesConnected(final int expectedNumberOfNodes,
final long sleepMillis) {
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index b7e93d355d..4d1b98ca83 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -24,11 +24,9 @@ import
org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.tests.system.NiFiInstance;
import org.apache.nifi.tests.system.NiFiInstanceFactory;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
-import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
@@ -418,47 +416,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
}
- @Test
- public void testRestartWithFlowXmlGzNoJson() throws NiFiClientException,
IOException {
- restartWithOnlySingleFlowPersistenceFile("flow.json.gz");
- }
-
- @Test
- public void testRestartWithFlowJsonGzNoXml() throws NiFiClientException,
IOException {
- restartWithOnlySingleFlowPersistenceFile("flow.json.gz");
- }
-
- private void restartWithOnlySingleFlowPersistenceFile(final String
filenameToDelete) throws NiFiClientException, IOException {
- final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
- final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
- final ConnectionEntity connection =
getClientUtil().createConnection(generate, terminate, "success");
-
- final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
- node2.stop();
-
- final File confDir = new File(node2.getInstanceDirectory(), "conf");
- assertEquals(1, confDir.listFiles(file ->
file.getName().equals("flow.json.gz")).length);
-
- final File jsonFile = new File(confDir, filenameToDelete);
- assertTrue(jsonFile.delete());
-
- node2.start(true);
- waitForAllNodesConnected();
-
- switchClientToNode(2);
-
- // Ensure it still has the components
- final ProcessorClient processorClient =
getNifiClient().getProcessorClient(DO_NOT_REPLICATE);
- final ProcessorEntity restartGenerate =
processorClient.getProcessor(generate.getId());
- assertNotNull(restartGenerate);
-
- final ProcessorEntity restartTerminate =
processorClient.getProcessor(terminate.getId());
- assertNotNull(restartTerminate);
-
- final ConnectionEntity restartConnection =
getNifiClient().getConnectionClient(DO_NOT_REPLICATE).getConnection(connection.getId());
- assertNotNull(restartConnection);
- }
-
@Test
public void testComponentsRecreatedOnRestart() throws NiFiClientException,
IOException, InterruptedException {
// Build dataflow with processors at root level and an inner group
that contains an input port, output port, and a processor, as well as a
Controller Service that the processor will use.
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlowIT.java
similarity index 84%
rename from
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
rename to
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlowIT.java
index 7f652712a8..02539a10c7 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlowIT.java
@@ -27,7 +27,9 @@ import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
import org.apache.nifi.toolkit.cli.impl.client.nifi.FlowClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.ReportingTasksClient;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ParameterContextDTO;
import org.apache.nifi.web.api.dto.ParameterContextReferenceDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -37,7 +39,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.ParameterEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
-import org.junit.jupiter.api.Disabled;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
@@ -48,7 +50,6 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -58,9 +59,10 @@ import java.util.zip.GZIPInputStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
-@Disabled("https://issues.apache.org/jira/browse/NIFI-12203")
-public class JoinClusterWithDifferentFlow extends NiFiSystemIT {
+public class JoinClusterWithDifferentFlowIT extends NiFiSystemIT {
@Override
public NiFiInstanceFactory getInstanceFactory() {
final Map<String, String> propertyOverrides =
Collections.singletonMap("nifi.cluster.flow.serialization.format", "JSON");
@@ -83,6 +85,12 @@ public class JoinClusterWithDifferentFlow extends
NiFiSystemIT {
@Test
public void testStartupWithDifferentFlow() throws IOException,
NiFiClientException, InterruptedException {
+ // Ensure that the root-level controller service is enabled
+ waitFor(() -> {
+ final ControllerServiceDTO rootService =
getNifiClient().getControllerServicesClient().getControllerService("65b6f2b0-016e-1000-1bfa-6bc482d8cd2b").getComponent();
+ return "ENABLED".equals(rootService.getState());
+ });
+
// Once we've started up, we want to have node 2 startup with a
different flow. We cannot simply startup both nodes at the same time with
// different flows because then either flow could be elected the
"correct flow" and as a result, we don't know which node to look at to ensure
// that the proper flow resolution occurred.
@@ -108,7 +116,8 @@ public class JoinClusterWithDifferentFlow extends
NiFiSystemIT {
private List<File> getFlowJsonFiles(final File confDir) {
final File[] flowJsonFileArray = confDir.listFiles(file ->
file.getName().startsWith("flow") && file.getName().endsWith(".json.gz"));
- final List<File> flowJsonFiles = new
ArrayList<>(Arrays.asList(flowJsonFileArray));
+ assertNotNull(flowJsonFileArray);
+ final List<File> flowJsonFiles = Arrays.asList(flowJsonFileArray);
return flowJsonFiles;
}
@@ -118,7 +127,7 @@ public class JoinClusterWithDifferentFlow extends
NiFiSystemIT {
final List<File> flowJsonFiles = getFlowJsonFiles(confDir);
assertEquals(1, flowJsonFiles.size());
- return flowJsonFiles.iterator().next();
+ return flowJsonFiles.get(0);
}
private void verifyFlowContentsOnDisk(final File backupFile) throws
IOException {
@@ -157,7 +166,8 @@ public class JoinClusterWithDifferentFlow extends
NiFiSystemIT {
assertEquals("1", generateProperties.get("Batch Size"));
assertEquals("1 hour",
generateFlowFileEntity.getComponent().getConfig().getSchedulingPeriod());
- final ParameterContextDTO contextDto =
getNifiClient().getParamContextClient().getParamContext(paramContextReference.getId(),
false).getComponent();
+ assertEquals(1,
getNifiClient().getParamContextClient(DO_NOT_REPLICATE).getParamContexts().getParameterContexts().size());
+ final ParameterContextDTO contextDto =
getNifiClient().getParamContextClient(DO_NOT_REPLICATE).getParamContext(paramContextReference.getId(),
false).getComponent();
assertEquals(2, contextDto.getBoundProcessGroups().size());
assertEquals(1, contextDto.getParameters().size());
final ParameterEntity parameterEntity =
contextDto.getParameters().iterator().next();
@@ -176,12 +186,18 @@ public class JoinClusterWithDifferentFlow extends
NiFiSystemIT {
final ControllerServiceEntity firstService =
controllerLevelServices.getControllerServices().iterator().next();
assertFalse(firstService.getId().endsWith("00"));
+
+ final ReportingTasksClient reportingTasksClient =
getNifiClient().getReportingTasksClient(DO_NOT_REPLICATE);
+ final ReportingTaskEntity taskEntity =
reportingTasksClient.getReportingTask("65b75baf-016e-1000-13f9-cbcfa0a26576");
+ assertNotNull(taskEntity);
+
+ // Service with ID ending in 00 should no longer exist
+ assertThrows(NiFiClientException.class, () ->
reportingTasksClient.getReportingTask("65b75baf-016e-1000-13f9-cbcfa0a2657600"));
}
private String readFlow(final File file) throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- try (final InputStream fis = new FileInputStream(file);
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final InputStream fis = new FileInputStream(file);
final InputStream gzipIn = new GZIPInputStream(fis)) {
final byte[] buffer = new byte[4096];
@@ -189,9 +205,8 @@ public class JoinClusterWithDifferentFlow extends
NiFiSystemIT {
while ((len = gzipIn.read(buffer)) > 0) {
baos.write(buffer, 0, len);
}
- }
- final byte[] bytes = baos.toByteArray();
- return new String(bytes, StandardCharsets.UTF_8);
+ return baos.toString(StandardCharsets.UTF_8);
+ }
}
}
\ No newline at end of file
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
index bda3ac58ed..e3abbdca1e 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
@@ -21,7 +21,6 @@ import
org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
import java.io.IOException;
@@ -29,24 +28,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class RunOnceIT extends NiFiSystemIT {
- @Timeout(10)
@Test
public void testRunOnce() throws NiFiClientException, IOException,
InterruptedException {
- ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
- getClientUtil().updateProcessorSchedulingPeriod(generate, "1 sec");
-
- ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
-
- ConnectionEntity generateToTerminate =
getClientUtil().createConnection(generate, terminate, "success");
+ final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
+ final ProcessorEntity terminate =
getClientUtil().createProcessor("TerminateFlowFile");
+ final ConnectionEntity generateToTerminate =
getClientUtil().createConnection(generate, terminate, "success");
getNifiClient().getProcessorClient().runProcessorOnce(generate);
-
waitForQueueCount(generateToTerminate.getId(), 1);
- ProcessorEntity actualGenerate =
getNifiClient().getProcessorClient().getProcessor(generate.getId());
- String actualRunStatus = actualGenerate.getStatus().getRunStatus();
-
- assertEquals("Stopped", actualRunStatus);
+ getClientUtil().waitForStoppedProcessor(generate.getId());
assertEquals(1, getConnectionQueueSize(generateToTerminate.getId()));
// Test CRON_DRIVEN Strategy
@@ -54,12 +45,9 @@ public class RunOnceIT extends NiFiSystemIT {
getClientUtil().updateProcessorSchedulingPeriod(generate, "* * * * *
?");
getNifiClient().getProcessorClient().runProcessorOnce(generate);
-
waitForQueueCount(generateToTerminate.getId(), 2);
- actualRunStatus = actualGenerate.getStatus().getRunStatus();
-
- assertEquals("Stopped", actualRunStatus);
+ getClientUtil().waitForStoppedProcessor(generate.getId());
assertEquals(2, getConnectionQueueSize(generateToTerminate.getId()));
}
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
index 2ef362d5ef..ce4aa9faf1 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
@@ -203,7 +203,7 @@ nifi.security.user.knox.audiences=
# nifi.security.group.mapping.transform.anygroup=LOWER
# cluster common properties (all nodes must have same values) #
-nifi.cluster.protocol.heartbeat.interval=5 sec
+nifi.cluster.protocol.heartbeat.interval=2 sec
nifi.cluster.protocol.is.secure=false
# cluster node properties (only configure for cluster nodes) #
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
index e5e505b2af..d507b92b22 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
@@ -203,7 +203,7 @@ nifi.security.user.knox.audiences=
# nifi.security.group.mapping.transform.anygroup=LOWER
# cluster common properties (all nodes must have same values) #
-nifi.cluster.protocol.heartbeat.interval=5 sec
+nifi.cluster.protocol.heartbeat.interval=2 sec
nifi.cluster.protocol.is.secure=false
# cluster node properties (only configure for cluster nodes) #
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow1.json.gz
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow1.json.gz
index 334d720c69..7ab675905b 100644
Binary files
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow1.json.gz
and
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow1.json.gz
differ
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow2.json.gz
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow2.json.gz
index 6116ecdbcd..ecad5801bc 100644
Binary files
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow2.json.gz
and
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow2.json.gz
differ