This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 19993783e1 NIFI-12203: Ensure that when inheriting cluster flow on 
startup, we remove any Controller Services, Reporting Tasks, Parameter 
Contexts, etc. that are not in the proposed flow. Also removed overly 
aggressive timeout from RunOnceIT and performed minor code cleanup
19993783e1 is described below

commit 19993783e1bbc3a4590f103d24922b31da870b31
Author: Mark Payne <[email protected]>
AuthorDate: Thu Oct 19 13:43:58 2023 -0400

    NIFI-12203: Ensure that when inheriting cluster flow on startup, we remove 
any Controller Services, Reporting Tasks, Parameter Contexts, etc. that are not 
in the proposed flow. Also removed overly aggressive timeout from RunOnceIT and 
performed minor code cleanup
    
    Resolves #7907
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/controller/flow/AbstractFlowManager.java  |   4 +-
 .../apache/nifi/controller/flow/FlowManager.java   |   4 +-
 .../nifi/controller/flow/StandardFlowManager.java  |  33 +++----
 .../inheritance/MissingComponentsCheck.java        |  59 +++++++++++-
 .../serialization/VersionedFlowSynchronizer.java   | 105 +++++++++++++++++++--
 .../apache/nifi/controller/TestFlowController.java |  59 ------------
 .../nifi/web/dao/impl/StandardFlowRegistryDAO.java |   2 +-
 .../stateless/engine/StatelessFlowManager.java     |   2 +-
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |   2 +-
 .../system/clustering/FlowSynchronizationIT.java   |  43 ---------
 ...ow.java => JoinClusterWithDifferentFlowIT.java} |  41 +++++---
 .../nifi/tests/system/processor/RunOnceIT.java     |  22 +----
 .../resources/conf/clustered/node1/nifi.properties |   2 +-
 .../resources/conf/clustered/node2/nifi.properties |   2 +-
 .../resources/flows/mismatched-flows/flow1.json.gz | Bin 4273 -> 4273 bytes
 .../resources/flows/mismatched-flows/flow2.json.gz | Bin 4252 -> 4251 bytes
 16 files changed, 211 insertions(+), 169 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
index 176a0d01cf..0adf521042 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
@@ -25,8 +25,8 @@ import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
-import org.apache.nifi.controller.ParameterProviderNode;
 import org.apache.nifi.controller.FlowAnalysisRuleNode;
+import org.apache.nifi.controller.ParameterProviderNode;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
@@ -304,7 +304,7 @@ public abstract class AbstractFlowManager implements 
FlowManager {
         getAllFlowAnalysisRules().forEach(this::removeFlowAnalysisRule);
         getAllParameterProviders().forEach(this::removeParameterProvider);
 
-        
getAllFlowRegistryClients().forEach(this::removeFlowRegistryClientNode);
+        getAllFlowRegistryClients().forEach(this::removeFlowRegistryClient);
 
         for (final ParameterContext parameterContext : 
parameterContextManager.getParameterContexts()) {
             
parameterContextManager.removeParameterContext(parameterContext.getIdentifier());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
index a7fb472b4a..c0543ef719 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
@@ -36,9 +36,9 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.parameter.Parameter;
 import org.apache.nifi.parameter.ParameterContext;
 import org.apache.nifi.parameter.ParameterContextManager;
-import org.apache.nifi.validation.RuleViolationsManager;
 import org.apache.nifi.parameter.ParameterProviderConfiguration;
 import org.apache.nifi.registry.flow.FlowRegistryClientNode;
+import org.apache.nifi.validation.RuleViolationsManager;
 import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 
 import java.net.URL;
@@ -331,7 +331,7 @@ public interface FlowManager extends 
ParameterProviderLookup {
 
     FlowRegistryClientNode getFlowRegistryClient(String id);
 
-    void removeFlowRegistryClientNode(FlowRegistryClientNode clientNode);
+    void removeFlowRegistryClient(FlowRegistryClientNode clientNode);
 
     Set<FlowRegistryClientNode> getAllFlowRegistryClients();
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index 69599dc0fd..9eb800b2b2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -16,21 +16,6 @@
  */
 package org.apache.nifi.controller.flow;
 
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import javax.net.ssl.SSLContext;
 import org.apache.nifi.annotation.documentation.DeprecationNotice;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -108,6 +93,22 @@ import org.apache.nifi.web.api.dto.FlowSnippetDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.SSLContext;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
 import static java.util.Objects.requireNonNull;
 
 public class StandardFlowManager extends AbstractFlowManager implements 
FlowManager {
@@ -451,7 +452,7 @@ public class StandardFlowManager extends 
AbstractFlowManager implements FlowMana
     }
 
     @Override
-    public void removeFlowRegistryClientNode(final FlowRegistryClientNode 
clientNode) {
+    public void removeFlowRegistryClient(final FlowRegistryClientNode 
clientNode) {
         final FlowRegistryClientNode existing = 
getFlowRegistryClient(clientNode.getIdentifier());
 
         if (existing == null || existing != clientNode) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/inheritance/MissingComponentsCheck.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/inheritance/MissingComponentsCheck.java
index fd7984b0d8..dba0a619d3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/inheritance/MissingComponentsCheck.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/inheritance/MissingComponentsCheck.java
@@ -19,7 +19,12 @@ package org.apache.nifi.controller.inheritance;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedProcessGroup;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -33,7 +38,13 @@ public class MissingComponentsCheck implements 
FlowInheritabilityCheck {
         final Set<String> existingMissingComponents = new 
HashSet<>(existingFlow.getMissingComponents());
         
existingMissingComponents.removeAll(proposedFlow.getMissingComponents());
 
-        if (existingMissingComponents.size() > 0) {
+        if (!existingMissingComponents.isEmpty()) {
+            // Do not consider any components that are not present in the 
proposed flow.
+            final Set<String> versionedComponentIds = 
getAllComponentIds(proposedFlow.getVersionedDataflow());
+            existingMissingComponents.retainAll(versionedComponentIds);
+        }
+
+        if (!existingMissingComponents.isEmpty()) {
             final String missingIds = 
StringUtils.join(existingMissingComponents, ",");
             return FlowInheritability.notInheritable("Current flow has missing 
components that are not considered missing in the proposed flow (" + missingIds 
+ ")");
         }
@@ -41,11 +52,55 @@ public class MissingComponentsCheck implements 
FlowInheritabilityCheck {
         final Set<String> proposedMissingComponents = new 
HashSet<>(proposedFlow.getMissingComponents());
         
proposedMissingComponents.removeAll(existingFlow.getMissingComponents());
 
-        if (proposedMissingComponents.size() > 0) {
+        if (!proposedMissingComponents.isEmpty()) {
+            // Do not consider any components that are not present in the 
current flow.
+            final Set<String> versionedComponentIds = 
getAllComponentIds(existingFlow.getVersionedDataflow());
+            proposedMissingComponents.retainAll(versionedComponentIds);
+        }
+        if (!proposedMissingComponents.isEmpty()) {
             final String missingIds = 
StringUtils.join(proposedMissingComponents, ",");
             return FlowInheritability.notInheritable("Proposed flow has 
missing components that are not considered missing in the current flow (" + 
missingIds + ")");
         }
 
         return FlowInheritability.inheritable();
     }
+
+    private Set<String> getAllComponentIds(final VersionedDataflow dataflow) {
+        if (dataflow == null) {
+            return Collections.emptySet();
+        }
+
+        final Set<String> ids = new HashSet<>();
+        findAllComponentIds(dataflow, ids);
+        return ids;
+    }
+
+    private void findAllComponentIds(final VersionedDataflow dataflow, final 
Set<String> ids) {
+        addAllIds(dataflow.getControllerServices(), ids);
+        addAllIds(dataflow.getRegistries(), ids);
+        addAllIds(dataflow.getFlowAnalysisRules(), ids);
+        addAllIds(dataflow.getParameterContexts(), ids);
+        addAllIds(dataflow.getParameterProviders(), ids);
+        addAllIds(dataflow.getReportingTasks(), ids);
+
+        findAllComponentIds(dataflow.getRootGroup(), ids);
+    }
+
+    private void findAllComponentIds(final VersionedProcessGroup group, final 
Set<String> ids) {
+        if (group == null) {
+            return;
+        }
+
+        addAllIds(group.getControllerServices(), ids);
+        addAllIds(group.getProcessors(), ids);
+
+        group.getProcessGroups().forEach(child -> findAllComponentIds(child, 
ids));
+    }
+
+    private void addAllIds(final Collection<? extends VersionedComponent> 
components, final Set<String> ids) {
+        if (components != null) {
+            components.forEach(component -> 
ids.add(component.getInstanceIdentifier()));
+        }
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 2f5b382bcc..0aaaafc73c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -119,7 +119,6 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
 
@@ -419,7 +418,7 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
                 inheritParameterContexts(controller, versionedFlow);
                 inheritReportingTasks(controller, versionedFlow, 
affectedComponentSet);
                 inheritFlowAnalysisRules(controller, versionedFlow, 
affectedComponentSet);
-                inheritRegistries(controller, versionedFlow, 
affectedComponentSet);
+                inheritRegistryClients(controller, versionedFlow, 
affectedComponentSet);
 
                 final ComponentIdGenerator componentIdGenerator = (proposedId, 
instanceId, destinationGroupId) -> instanceId;
 
@@ -467,6 +466,8 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
 
             inheritSnippets(controller, proposedFlow);
             inheritAuthorizations(existingFlow, proposedFlow, controller);
+
+            removeMissingParameterContexts(controller, versionedFlow);
         } catch (final Exception ex) {
             throw new FlowSynchronizationException(ex);
         }
@@ -548,11 +549,13 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
         return affectedComponentSet;
     }
 
-    private void inheritRegistries(final FlowController controller, final 
VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) {
-        final FlowManager flowManger = controller.getFlowManager();
+    private void inheritRegistryClients(final FlowController controller, final 
VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) {
+        final FlowManager flowManager = controller.getFlowManager();
 
+        final Set<String> versionedClientIds = new HashSet<>();
         for (final VersionedFlowRegistryClient versionedFlowRegistryClient : 
dataflow.getRegistries()) {
-            final FlowRegistryClientNode existing = 
flowManger.getFlowRegistryClient(versionedFlowRegistryClient.getIdentifier());
+            
versionedClientIds.add(versionedFlowRegistryClient.getInstanceIdentifier());
+            final FlowRegistryClientNode existing = 
flowManager.getFlowRegistryClient(versionedFlowRegistryClient.getIdentifier());
 
             if (existing == null) {
                 addFlowRegistryClient(controller, versionedFlowRegistryClient);
@@ -560,6 +563,12 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
                 updateRegistry(existing, versionedFlowRegistryClient, 
controller);
             }
         }
+
+        for (final FlowRegistryClientNode clientNode : 
flowManager.getAllFlowRegistryClients()) {
+            if (!versionedClientIds.contains(clientNode.getIdentifier())) {
+                flowManager.removeFlowRegistryClient(clientNode);
+            }
+        }
     }
 
     private void addFlowRegistryClient(final FlowController flowController, 
final VersionedFlowRegistryClient versionedFlowRegistryClient) {
@@ -581,7 +590,9 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
     }
 
     private void inheritReportingTasks(final FlowController controller, final 
VersionedDataflow dataflow, final AffectedComponentSet affectedComponentSet) 
throws ReportingTaskInstantiationException {
+        final Set<String> versionedTaskIds = new HashSet<>();
         for (final VersionedReportingTask versionedReportingTask : 
dataflow.getReportingTasks()) {
+            
versionedTaskIds.add(versionedReportingTask.getInstanceIdentifier());
             final ReportingTaskNode existing = 
controller.getReportingTaskNode(versionedReportingTask.getInstanceIdentifier());
             if (existing == null) {
                 addReportingTask(controller, versionedReportingTask);
@@ -589,6 +600,12 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
                 updateReportingTask(existing, versionedReportingTask, 
controller);
             }
         }
+
+        for (final ReportingTaskNode reportingTask : 
controller.getAllReportingTasks()) {
+            if (!versionedTaskIds.contains(reportingTask.getIdentifier())) {
+                controller.removeReportingTask(reportingTask);
+            }
+        }
     }
 
     private void addReportingTask(final FlowController controller, final 
VersionedReportingTask reportingTask) throws 
ReportingTaskInstantiationException {
@@ -647,7 +664,10 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
             return;
         }
 
+        final Set<String> versionedAnalysisRuleIds = new HashSet<>();
         for (final VersionedFlowAnalysisRule versionedFlowAnalysisRule : 
dataflow.getFlowAnalysisRules()) {
+            
versionedAnalysisRuleIds.add(versionedFlowAnalysisRule.getInstanceIdentifier());
+
             final FlowAnalysisRuleNode existing = 
controller.getFlowAnalysisRuleNode(versionedFlowAnalysisRule.getInstanceIdentifier());
             if (existing == null) {
                 addFlowAnalysisRule(controller, versionedFlowAnalysisRule);
@@ -655,6 +675,12 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
                 updateFlowAnalysisRule(existing, versionedFlowAnalysisRule, 
controller);
             }
         }
+
+        for (final FlowAnalysisRuleNode ruleNode : 
controller.getAllFlowAnalysisRules()) {
+            if (!versionedAnalysisRuleIds.contains(ruleNode.getIdentifier())) {
+                controller.removeFlowAnalysisRule(ruleNode);
+            }
+        }
     }
 
     private void addFlowAnalysisRule(final FlowController controller, final 
VersionedFlowAnalysisRule flowAnalysisRule) throws 
FlowAnalysisRuleInstantiationException {
@@ -692,8 +718,13 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
             return;
         }
 
+        final FlowManager flowManager = controller.getFlowManager();
+
+        final Set<String> versionedProviderIds = new HashSet<>();
         for (final VersionedParameterProvider versionedParameterProvider : 
dataflow.getParameterProviders()) {
-            final ParameterProviderNode existing = 
controller.getFlowManager().getParameterProvider(versionedParameterProvider.getInstanceIdentifier());
+            
versionedProviderIds.add(versionedParameterProvider.getInstanceIdentifier());
+
+            final ParameterProviderNode existing = 
flowManager.getParameterProvider(versionedParameterProvider.getInstanceIdentifier());
             if (existing == null) {
                 addParameterProvider(controller, versionedParameterProvider, 
controller.getEncryptor());
             } else if 
(affectedComponentSet.isParameterProviderAffected(existing.getIdentifier())) {
@@ -701,6 +732,11 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
             }
         }
 
+        for (final ParameterProviderNode parameterProvider : 
flowManager.getAllParameterProviders()) {
+            if 
(!versionedProviderIds.contains(parameterProvider.getIdentifier())) {
+                flowManager.removeParameterProvider(parameterProvider);
+            }
+        }
     }
 
     private void addParameterProvider(final FlowController controller, final 
VersionedParameterProvider parameterProvider, final PropertyEncryptor 
encryptor) {
@@ -721,15 +757,40 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
         parameterProviderNode.setProperties(decryptedProperties);
     }
 
+    private void removeMissingParameterContexts(final FlowController 
controller, final VersionedDataflow dataflow) {
+        controller.getFlowManager().withParameterContextResolution(() -> {
+            final ParameterContextManager contextManager = 
controller.getFlowManager().getParameterContextManager();
+            removeMissingParameterContexts(contextManager, dataflow);
+        });
+    }
+
+    private void removeMissingParameterContexts(final ParameterContextManager 
contextManager, final VersionedDataflow dataflow) {
+        if (dataflow == null) {
+            return;
+        }
+
+        // Build mapping of name to context for resolution of inherited 
contexts
+        final List<VersionedParameterContext> proposedParameterContexts = 
dataflow.getParameterContexts();
+        final Set<String> proposedContextNames = 
proposedParameterContexts.stream()
+            .map(VersionedParameterContext::getName)
+            .collect(Collectors.toSet());
+
+        final Map<String, ParameterContext> existingContextsByName = 
contextManager.getParameterContextNameMapping();
+        for (final Map.Entry<String, ParameterContext> entry : 
existingContextsByName.entrySet()) {
+            if (!proposedContextNames.contains(entry.getKey())) {
+                
contextManager.removeParameterContext(entry.getValue().getIdentifier());
+            }
+        }
+    }
+
     private void inheritParameterContexts(final FlowController controller, 
final VersionedDataflow dataflow) {
         controller.getFlowManager().withParameterContextResolution(() -> {
             final List<VersionedParameterContext> parameterContexts = 
dataflow.getParameterContexts();
 
             // Build mapping of name to context for resolution of inherited 
contexts
-            final Map<String, VersionedParameterContext> 
namedParameterContexts = parameterContexts.stream()
-                    .collect(
-                            
Collectors.toMap(VersionedParameterContext::getName, Function.identity())
-                    );
+            final Map<String, VersionedParameterContext> 
namedParameterContexts = new HashMap<>();
+            parameterContexts.forEach(context -> 
namedParameterContexts.put(context.getName(), context));
+
             for (final VersionedParameterContext versionedParameterContext : 
parameterContexts) {
                 inheritParameterContext(versionedParameterContext, 
controller.getFlowManager(), namedParameterContexts, controller.getEncryptor());
             }
@@ -937,6 +998,30 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
         if (!toDisable.isEmpty()) {
             
controller.getControllerServiceProvider().disableControllerServicesAsync(toDisable);
         }
+
+        removeMissingServices(controller, dataflow);
+    }
+
+    private void removeMissingServices(final FlowController controller, final 
VersionedDataflow dataflow) {
+        if (dataflow == null) {
+            return;
+        }
+
+        final Set<String> retainedServiceIds = 
dataflow.getControllerServices().stream()
+            .map(VersionedControllerService::getInstanceIdentifier)
+            .collect(Collectors.toSet());
+
+        final List<ControllerServiceNode> toRemove = 
controller.getFlowManager().getRootControllerServices().stream()
+            .filter(service -> 
!retainedServiceIds.contains(service.getIdentifier()))
+            .toList();
+
+        for (final ControllerServiceNode serviceToRemove : toRemove) {
+            try {
+                
controller.getFlowManager().removeRootControllerService(serviceToRemove);
+            } catch (final Exception e) {
+                throw new IllegalStateException("Inherited Dataflow does not 
have Controller-Level Controller Service %s but failed to remove it from 
flow".formatted(serviceToRemove), e);
+            }
+        }
     }
 
     private ControllerServiceNode addRootControllerService(final 
FlowController controller, final VersionedControllerService 
versionedControllerService) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index f4b504e774..963f7fd9c6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -34,7 +34,6 @@ import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
-import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.flow.VersionedDataflow;
 import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
 import 
org.apache.nifi.controller.parameter.ParameterProviderInstantiationException;
@@ -103,7 +102,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -611,63 +609,6 @@ public class TestFlowController {
         assertEquals(authFingerprint, authorizer.getFingerprint());
     }
 
-    @Test
-    public void testSynchronizeFlowWhenProposedMissingComponentsAreDifferent() 
{
-        final Set<String> missingComponents = new HashSet<>();
-        missingComponents.add("1");
-        missingComponents.add("2");
-
-        final DataFlow proposedDataFlow = mock(DataFlow.class);
-        
when(proposedDataFlow.getMissingComponents()).thenReturn(missingComponents);
-
-        UninheritableFlowException uninheritableFlowException =
-                assertThrows(UninheritableFlowException.class,
-                        () -> controller.synchronize(flowSynchronizer, 
proposedDataFlow, mock(FlowService.class), BundleUpdateStrategy.IGNORE_BUNDLE));
-        assertTrue(uninheritableFlowException.getMessage().contains("Proposed 
flow has missing components " +
-                "that are not considered missing in the current flow (1,2)"), 
uninheritableFlowException.getMessage());
-    }
-
-    @Test
-    public void testSynchronizeFlowWhenExistingMissingComponentsAreDifferent() 
throws IOException {
-        final ProcessorNode mockProcessorNode = mock(ProcessorNode.class);
-        when(mockProcessorNode.getIdentifier()).thenReturn("1");
-        when(mockProcessorNode.isExtensionMissing()).thenReturn(true);
-
-        final ControllerServiceNode mockControllerServiceNode = 
mock(ControllerServiceNode.class);
-        when(mockControllerServiceNode.getIdentifier()).thenReturn("2");
-        when(mockControllerServiceNode.isExtensionMissing()).thenReturn(true);
-
-        final ReportingTaskNode mockReportingTaskNode = 
mock(ReportingTaskNode.class);
-        when(mockReportingTaskNode.getIdentifier()).thenReturn("3");
-        when(mockReportingTaskNode.isExtensionMissing()).thenReturn(true);
-
-        final ProcessGroup mockRootGroup = mock(ProcessGroup.class);
-        
when(mockRootGroup.findAllProcessors()).thenReturn(Collections.singletonList(mockProcessorNode));
-
-        final SnippetManager mockSnippetManager = mock(SnippetManager.class);
-        when(mockSnippetManager.export()).thenReturn(new byte[0]);
-
-        final FlowManager flowManager = mock(FlowManager.class);
-
-        final FlowController mockFlowController = mock(FlowController.class);
-        when(mockFlowController.getFlowManager()).thenReturn(flowManager);
-
-        when(flowManager.getRootGroup()).thenReturn(mockRootGroup);
-        when(flowManager.getAllControllerServices()).thenReturn(new 
HashSet<>(Arrays.asList(mockControllerServiceNode)));
-        when(flowManager.getAllReportingTasks()).thenReturn(new 
HashSet<>(Arrays.asList(mockReportingTaskNode)));
-        when(mockFlowController.getAuthorizer()).thenReturn(authorizer);
-        
when(mockFlowController.getSnippetManager()).thenReturn(mockSnippetManager);
-
-        final DataFlow proposedDataFlow = mock(DataFlow.class);
-        when(proposedDataFlow.getMissingComponents()).thenReturn(new 
HashSet<>());
-        UninheritableFlowException uninheritableFlowException =
-                assertThrows(UninheritableFlowException.class,
-                        () -> flowSynchronizer.sync(mockFlowController, 
proposedDataFlow,
-                                mock(FlowService.class), 
BundleUpdateStrategy.IGNORE_BUNDLE));
-        assertTrue(uninheritableFlowException.getMessage().contains("Current 
flow has missing components that are not" +
-                        " considered missing in the proposed flow (1,2,3)"), 
uninheritableFlowException.getMessage());
-    }
-
     @Test
     public void testSynchronizeFlowWhenBundlesAreSame() throws IOException {
         final LogRepository logRepository = 
LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df");
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
index 00671acd51..a9a24bffb1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java
@@ -177,7 +177,7 @@ public class StandardFlowRegistryDAO extends ComponentDAO 
implements FlowRegistr
             throw new IllegalArgumentException("The specified registry id is 
unknown to this NiFi.");
         }
 
-        
flowController.getFlowManager().removeFlowRegistryClientNode(flowRegistry);
+        flowController.getFlowManager().removeFlowRegistryClient(flowRegistry);
 
         return flowRegistry;
     }
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
index a6d7bffa13..92e133cca5 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StatelessFlowManager.java
@@ -352,7 +352,7 @@ public class StatelessFlowManager extends 
AbstractFlowManager implements FlowMan
     }
 
     @Override
-    public void removeFlowRegistryClientNode(FlowRegistryClientNode 
clientNode) {
+    public void removeFlowRegistryClient(FlowRegistryClientNode clientNode) {
         throw new UnsupportedOperationException("Removing Flow Registry Client 
is not supported in Stateless NiFi");
     }
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 227bcf9647..e816c2d000 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -255,7 +255,7 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
     }
 
     protected void waitForAllNodesConnected(final int expectedNumberOfNodes) {
-        waitForAllNodesConnected(expectedNumberOfNodes, 1000L);
+        waitForAllNodesConnected(expectedNumberOfNodes, 500L);
     }
 
     protected void waitForAllNodesConnected(final int expectedNumberOfNodes, 
final long sleepMillis) {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index b7e93d355d..4d1b98ca83 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -24,11 +24,9 @@ import 
org.apache.nifi.controller.queue.LoadBalanceCompression;
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.tests.system.NiFiInstance;
 import org.apache.nifi.tests.system.NiFiInstanceFactory;
 import org.apache.nifi.tests.system.NiFiSystemIT;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
-import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.PortDTO;
 import org.apache.nifi.web.api.dto.ProcessorDTO;
@@ -418,47 +416,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
     }
 
 
-    @Test
-    public void testRestartWithFlowXmlGzNoJson() throws NiFiClientException, 
IOException {
-        restartWithOnlySingleFlowPersistenceFile("flow.json.gz");
-    }
-
-    @Test
-    public void testRestartWithFlowJsonGzNoXml() throws NiFiClientException, 
IOException {
-        restartWithOnlySingleFlowPersistenceFile("flow.json.gz");
-    }
-
-    private void restartWithOnlySingleFlowPersistenceFile(final String 
filenameToDelete) throws NiFiClientException, IOException {
-        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
-        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
-        final ConnectionEntity connection = 
getClientUtil().createConnection(generate, terminate, "success");
-
-        final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
-        node2.stop();
-
-        final File confDir = new File(node2.getInstanceDirectory(), "conf");
-        assertEquals(1, confDir.listFiles(file -> 
file.getName().equals("flow.json.gz")).length);
-
-        final File jsonFile = new File(confDir, filenameToDelete);
-        assertTrue(jsonFile.delete());
-
-        node2.start(true);
-        waitForAllNodesConnected();
-
-        switchClientToNode(2);
-
-        // Ensure it still has the components
-        final ProcessorClient processorClient = 
getNifiClient().getProcessorClient(DO_NOT_REPLICATE);
-        final ProcessorEntity restartGenerate = 
processorClient.getProcessor(generate.getId());
-        assertNotNull(restartGenerate);
-
-        final ProcessorEntity restartTerminate = 
processorClient.getProcessor(terminate.getId());
-        assertNotNull(restartTerminate);
-
-        final ConnectionEntity restartConnection = 
getNifiClient().getConnectionClient(DO_NOT_REPLICATE).getConnection(connection.getId());
-        assertNotNull(restartConnection);
-    }
-
     @Test
     public void testComponentsRecreatedOnRestart() throws NiFiClientException, 
IOException, InterruptedException {
         // Build dataflow with processors at root level and an inner group 
that contains an input port, output port, and a processor, as well as a 
Controller Service that the processor will use.
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlowIT.java
similarity index 84%
rename from 
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
rename to 
nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlowIT.java
index 7f652712a8..02539a10c7 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlow.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithDifferentFlowIT.java
@@ -27,7 +27,9 @@ import org.apache.nifi.tests.system.NiFiSystemIT;
 import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.FlowClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.ReportingTasksClient;
 import org.apache.nifi.web.api.dto.AffectedComponentDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ParameterContextDTO;
 import org.apache.nifi.web.api.dto.ParameterContextReferenceDTO;
 import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -37,7 +39,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.ParameterEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
-import org.junit.jupiter.api.Disabled;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -48,7 +50,6 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -58,9 +59,10 @@ import java.util.zip.GZIPInputStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-@Disabled("https://issues.apache.org/jira/browse/NIFI-12203";)
-public class JoinClusterWithDifferentFlow extends NiFiSystemIT {
+public class JoinClusterWithDifferentFlowIT extends NiFiSystemIT {
     @Override
     public NiFiInstanceFactory getInstanceFactory() {
         final Map<String, String> propertyOverrides = 
Collections.singletonMap("nifi.cluster.flow.serialization.format", "JSON");
@@ -83,6 +85,12 @@ public class JoinClusterWithDifferentFlow extends 
NiFiSystemIT {
 
     @Test
     public void testStartupWithDifferentFlow() throws IOException, 
NiFiClientException, InterruptedException {
+        // Ensure that the root-level controller service is enabled
+        waitFor(() -> {
+            final ControllerServiceDTO rootService = 
getNifiClient().getControllerServicesClient().getControllerService("65b6f2b0-016e-1000-1bfa-6bc482d8cd2b").getComponent();
+            return "ENABLED".equals(rootService.getState());
+        });
+
         // Once we've started up, we want to have node 2 startup with a 
different flow. We cannot simply startup both nodes at the same time with
         // different flows because then either flow could be elected the 
"correct flow" and as a result, we don't know which node to look at to ensure
         // that the proper flow resolution occurred.
@@ -108,7 +116,8 @@ public class JoinClusterWithDifferentFlow extends 
NiFiSystemIT {
 
     private List<File> getFlowJsonFiles(final File confDir) {
         final File[] flowJsonFileArray = confDir.listFiles(file -> 
file.getName().startsWith("flow") && file.getName().endsWith(".json.gz"));
-        final List<File> flowJsonFiles = new 
ArrayList<>(Arrays.asList(flowJsonFileArray));
+        assertNotNull(flowJsonFileArray);
+        final List<File> flowJsonFiles = Arrays.asList(flowJsonFileArray);
         return flowJsonFiles;
     }
 
@@ -118,7 +127,7 @@ public class JoinClusterWithDifferentFlow extends 
NiFiSystemIT {
         final List<File> flowJsonFiles = getFlowJsonFiles(confDir);
         assertEquals(1, flowJsonFiles.size());
 
-        return flowJsonFiles.iterator().next();
+        return flowJsonFiles.get(0);
     }
 
     private void verifyFlowContentsOnDisk(final File backupFile) throws 
IOException {
@@ -157,7 +166,8 @@ public class JoinClusterWithDifferentFlow extends 
NiFiSystemIT {
         assertEquals("1", generateProperties.get("Batch Size"));
         assertEquals("1 hour", 
generateFlowFileEntity.getComponent().getConfig().getSchedulingPeriod());
 
-        final ParameterContextDTO contextDto = 
getNifiClient().getParamContextClient().getParamContext(paramContextReference.getId(),
 false).getComponent();
+        assertEquals(1, 
getNifiClient().getParamContextClient(DO_NOT_REPLICATE).getParamContexts().getParameterContexts().size());
+        final ParameterContextDTO contextDto = 
getNifiClient().getParamContextClient(DO_NOT_REPLICATE).getParamContext(paramContextReference.getId(),
 false).getComponent();
         assertEquals(2, contextDto.getBoundProcessGroups().size());
         assertEquals(1, contextDto.getParameters().size());
         final ParameterEntity parameterEntity = 
contextDto.getParameters().iterator().next();
@@ -176,12 +186,18 @@ public class JoinClusterWithDifferentFlow extends 
NiFiSystemIT {
 
         final ControllerServiceEntity firstService = 
controllerLevelServices.getControllerServices().iterator().next();
         assertFalse(firstService.getId().endsWith("00"));
+
+        final ReportingTasksClient reportingTasksClient = 
getNifiClient().getReportingTasksClient(DO_NOT_REPLICATE);
+        final ReportingTaskEntity taskEntity = 
reportingTasksClient.getReportingTask("65b75baf-016e-1000-13f9-cbcfa0a26576");
+        assertNotNull(taskEntity);
+
+        // Service with ID ending in 00 should no longer exist
+        assertThrows(NiFiClientException.class, () -> 
reportingTasksClient.getReportingTask("65b75baf-016e-1000-13f9-cbcfa0a2657600"));
     }
 
     private String readFlow(final File file) throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
-        try (final InputStream fis = new FileInputStream(file);
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final InputStream fis = new FileInputStream(file);
              final InputStream gzipIn = new GZIPInputStream(fis)) {
 
             final byte[] buffer = new byte[4096];
@@ -189,9 +205,8 @@ public class JoinClusterWithDifferentFlow extends 
NiFiSystemIT {
             while ((len = gzipIn.read(buffer)) > 0) {
                 baos.write(buffer, 0, len);
             }
-        }
 
-        final byte[] bytes = baos.toByteArray();
-        return new String(bytes, StandardCharsets.UTF_8);
+            return baos.toString(StandardCharsets.UTF_8);
+        }
     }
 }
\ No newline at end of file
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
index bda3ac58ed..e3abbdca1e 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java
@@ -21,7 +21,6 @@ import 
org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 
@@ -29,24 +28,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class RunOnceIT extends NiFiSystemIT {
 
-    @Timeout(10)
     @Test
     public void testRunOnce() throws NiFiClientException, IOException, 
InterruptedException {
-        ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
-        getClientUtil().updateProcessorSchedulingPeriod(generate, "1 sec");
-
-        ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
-
-        ConnectionEntity generateToTerminate = 
getClientUtil().createConnection(generate, terminate, "success");
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateFlowFile");
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+        final ConnectionEntity generateToTerminate = 
getClientUtil().createConnection(generate, terminate, "success");
 
         getNifiClient().getProcessorClient().runProcessorOnce(generate);
-
         waitForQueueCount(generateToTerminate.getId(), 1);
 
-        ProcessorEntity actualGenerate = 
getNifiClient().getProcessorClient().getProcessor(generate.getId());
-        String actualRunStatus = actualGenerate.getStatus().getRunStatus();
-
-        assertEquals("Stopped", actualRunStatus);
+        getClientUtil().waitForStoppedProcessor(generate.getId());
         assertEquals(1, getConnectionQueueSize(generateToTerminate.getId()));
 
         // Test CRON_DRIVEN Strategy
@@ -54,12 +45,9 @@ public class RunOnceIT extends NiFiSystemIT {
         getClientUtil().updateProcessorSchedulingPeriod(generate, "* * * * * 
?");
 
         getNifiClient().getProcessorClient().runProcessorOnce(generate);
-
         waitForQueueCount(generateToTerminate.getId(), 2);
 
-        actualRunStatus = actualGenerate.getStatus().getRunStatus();
-
-        assertEquals("Stopped", actualRunStatus);
+        getClientUtil().waitForStoppedProcessor(generate.getId());
         assertEquals(2, getConnectionQueueSize(generateToTerminate.getId()));
     }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
index 2ef362d5ef..ce4aa9faf1 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
@@ -203,7 +203,7 @@ nifi.security.user.knox.audiences=
 # nifi.security.group.mapping.transform.anygroup=LOWER
 
 # cluster common properties (all nodes must have same values) #
-nifi.cluster.protocol.heartbeat.interval=5 sec
+nifi.cluster.protocol.heartbeat.interval=2 sec
 nifi.cluster.protocol.is.secure=false
 
 # cluster node properties (only configure for cluster nodes) #
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
index e5e505b2af..d507b92b22 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
@@ -203,7 +203,7 @@ nifi.security.user.knox.audiences=
 # nifi.security.group.mapping.transform.anygroup=LOWER
 
 # cluster common properties (all nodes must have same values) #
-nifi.cluster.protocol.heartbeat.interval=5 sec
+nifi.cluster.protocol.heartbeat.interval=2 sec
 nifi.cluster.protocol.is.secure=false
 
 # cluster node properties (only configure for cluster nodes) #
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow1.json.gz
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow1.json.gz
index 334d720c69..7ab675905b 100644
Binary files 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow1.json.gz
 and 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow1.json.gz
 differ
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow2.json.gz
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow2.json.gz
index 6116ecdbcd..ecad5801bc 100644
Binary files 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow2.json.gz
 and 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/mismatched-flows/flow2.json.gz
 differ

Reply via email to