This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 00d90033d9 NIFI-14616 Added logging when Components are not being 
restarted when updating Flow (#9977)
00d90033d9 is described below

commit 00d90033d9f6aae969691ab56c92869094c0233c
Author: Mark Payne <[email protected]>
AuthorDate: Thu May 29 09:03:57 2025 -0400

    NIFI-14616 Added logging when Components are not being restarted when 
updating Flow (#9977)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/controller/StandardProcessorNode.java     |  14 +-
 .../registry/flow/mapping/FlowMappingOptions.java  |   5 +-
 .../serialization/AffectedComponentSet.java        | 141 +++++++++++++--------
 .../serialization/ComponentSetFilter.java          |   2 +-
 .../serialization/VersionedFlowSynchronizer.java   |  34 +++--
 .../serialization/TestAffectedComponentSet.java    |  86 +++++++++++++
 6 files changed, 212 insertions(+), 70 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 6afb9492b3..1eb7a05dee 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1380,7 +1380,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public void enable() {
-        desiredState = ScheduledState.STOPPED;
+        setDesiredState(ScheduledState.STOPPED);
         final boolean updated = 
scheduledState.compareAndSet(ScheduledState.DISABLED, ScheduledState.STOPPED);
 
         if (updated) {
@@ -1392,7 +1392,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
     @Override
     public void disable() {
-        desiredState = ScheduledState.DISABLED;
+        setDesiredState(ScheduledState.DISABLED);
         final boolean updated = 
scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.DISABLED);
 
         if (updated) {
@@ -1402,6 +1402,10 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
         }
     }
 
+    private void setDesiredState(final ScheduledState desiredState) {
+        this.desiredState = desiredState;
+        LOG.info("Desired State for {} now set to {}", this, desiredState);
+    }
 
     /**
      * Will idempotently start the processor using the following sequence: <i>
@@ -1480,10 +1484,10 @@ public class StandardProcessorNode extends 
ProcessorNode implements Connectable
             if (currentState == ScheduledState.STOPPED) {
                 starting = 
this.scheduledState.compareAndSet(ScheduledState.STOPPED, scheduledState);
                 if (starting) {
-                    this.desiredState = desiredState;
+                    setDesiredState(desiredState);
                 }
             } else if (currentState == ScheduledState.STOPPING && 
!failIfStopping) {
-                this.desiredState = desiredState;
+                setDesiredState(desiredState);
                 return;
             } else {
                 starting = false;
@@ -1800,7 +1804,7 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
 
         final Processor processor = processorRef.get().getProcessor();
         LOG.debug("Stopping processor: {}", this);
-        desiredState = ScheduledState.STOPPED;
+        setDesiredState(ScheduledState.STOPPED);
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java
index 8a5b24deb0..b2bab4f777 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/mapping/FlowMappingOptions.java
@@ -177,6 +177,7 @@ public class FlowMappingOptions {
 
         /**
          * Specifies whether or not the identifier of a Flow Registry Client 
should be included in the VersionedFlowCoordinates of a Versioned Process Group
+         *
          * @param mapFlowRegistryClientId <code>true</code> if the Registry ID 
of the Flow Registry Client should be mapped, <code>false</code> otherwise
          * @return the builder
          */
@@ -195,8 +196,8 @@ public class FlowMappingOptions {
          *
          * @return the FlowMappingOptions
          * @throws NullPointerException if the {@link 
#stateLookup(VersionedComponentStateLookup) StateLookup} is not set, the
-         * {@link #componentIdLookup(ComponentIdLookup) ComponentIdLookup} is 
not set, or if {@link #mapSensitiveConfiguration(boolean) 
mapSensitiveConfiguration}
-         * is set to true but the {@link 
#sensitiveValueEncryptor(SensitiveValueEncryptor) SensitiveValueEncryptor} has 
not been set
+         *                              {@link 
#componentIdLookup(ComponentIdLookup) ComponentIdLookup} is not set, or if 
{@link #mapSensitiveConfiguration(boolean) mapSensitiveConfiguration}
+         *                              is set to true but the {@link 
#sensitiveValueEncryptor(SensitiveValueEncryptor) SensitiveValueEncryptor} has 
not been set
          */
         public FlowMappingOptions build() {
             requireNonNull(stateLookup, "State Lookup must be set");
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
index d3b6e0b6f9..1122c2b32c 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/AffectedComponentSet.java
@@ -59,6 +59,7 @@ import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -455,22 +456,18 @@ public class AffectedComponentSet {
         }
 
         final ExecutionEngine executionEngine = start.getExecutionEngine();
-        switch (executionEngine) {
-            case STATELESS:
-                return start;
-            case INHERITED:
-                return getStatelessGroup(start.getParent());
-            case STANDARD:
-            default:
-                return null;
-        }
+        return switch (executionEngine) {
+            case STATELESS -> start;
+            case INHERITED -> getStatelessGroup(start.getParent());
+            default -> null;
+        };
     }
 
     private void addComponentsForParameterUpdate(final FlowDifference 
difference) {
         final DifferenceType differenceType = difference.getDifferenceType();
 
         final Optional<String> optionalParameterName = 
difference.getFieldName();
-        if (!optionalParameterName.isPresent()) {
+        if (optionalParameterName.isEmpty()) {
             logger.warn("Encountered a Flow Difference {} with Difference Type 
of {} but no indication as to which parameter was updated.", difference, 
differenceType);
             return;
         }
@@ -505,21 +502,13 @@ public class AffectedComponentSet {
     }
 
     private Connectable getConnectable(final ConnectableComponentType type, 
final String identifier) {
-        switch (type) {
-            case FUNNEL:
-                return flowManager.getFunnel(identifier);
-            case INPUT_PORT:
-                return flowManager.getInputPort(identifier);
-            case OUTPUT_PORT:
-                return flowManager.getOutputPort(identifier);
-            case PROCESSOR:
-                return flowManager.getProcessorNode(identifier);
-            case REMOTE_INPUT_PORT:
-            case REMOTE_OUTPUT_PORT:
-                return 
flowManager.getRootGroup().findRemoteGroupPort(identifier);
-            default:
-                return null;
-        }
+        return switch (type) {
+            case FUNNEL -> flowManager.getFunnel(identifier);
+            case INPUT_PORT -> flowManager.getInputPort(identifier);
+            case OUTPUT_PORT -> flowManager.getOutputPort(identifier);
+            case PROCESSOR -> flowManager.getProcessorNode(identifier);
+            case REMOTE_INPUT_PORT, REMOTE_OUTPUT_PORT -> 
flowManager.getRootGroup().findRemoteGroupPort(identifier);
+        };
     }
 
     private void addAffectedComponents(final VersionedComponent 
versionedComponent) {
@@ -623,20 +612,71 @@ public class AffectedComponentSet {
         processors.forEach(processor -> 
processor.getProcessGroup().startProcessor(processor, false));
         reportingTasks.forEach(flowController::startReportingTask);
         flowAnalysisRules.forEach(flowController::enableFlowAnalysisRule);
-        statelessProcessGroups.forEach(group -> group.startProcessing());
+        statelessProcessGroups.forEach(ProcessGroup::startProcessing);
+    }
+
+    public int getComponentCount() {
+        return inputPorts.size() + outputPorts.size() + 
remoteInputPorts.size() + remoteOutputPorts.size() +
+               processors.size() + reportingTasks.size() + 
flowAnalysisRules.size() + controllerServices.size() +
+               flowRegistryClients.size() + statelessProcessGroups.size();
+    }
+
+    public AffectedComponentSet removeComponents(final ComponentSetFilter 
filter) {
+        final AffectedComponentSet removed = new 
AffectedComponentSet(flowController);
+        removeMatching(inputPorts, 
filter::testInputPort).forEach(removed::addInputPort);
+        removeMatching(outputPorts, 
filter::testOutputPort).forEach(removed::addOutputPort);
+        removeMatching(remoteInputPorts, 
filter::testRemoteInputPort).forEach(removed::addRemoteInputPort);
+        removeMatching(remoteOutputPorts, 
filter::testRemoteOutputPort).forEach(removed::addRemoteOutputPort);
+        removeMatching(processors, 
filter::testProcessor).forEach(removed::addProcessor);
+        removeMatching(controllerServices, 
filter::testControllerService).forEach(removed::addControllerServiceWithoutReferences);
+        removeMatching(reportingTasks, 
filter::testReportingTask).forEach(removed::addReportingTask);
+        removeMatching(flowAnalysisRules, 
filter::testFlowAnalysisRule).forEach(removed::addFlowAnalysisRule);
+        removeMatching(flowRegistryClients, 
filter::testFlowRegistryClient).forEach(removed::addFlowRegistryClient);
+        removeMatching(statelessProcessGroups, 
filter::testStatelessGroup).forEach(removed::addStatelessGroup);
+        return removed;
+    }
+
+    private <T> Set<T> removeMatching(final Set<T> set, final Predicate<T> 
test) {
+        final Set<T> toRemove = new HashSet<>();
+        for (final T element : set) {
+            if (test.test(element)) {
+                toRemove.add(element);
+            }
+        }
+
+        set.removeAll(toRemove);
+        return toRemove;
     }
 
-    public void removeComponents(final ComponentSetFilter filter) {
-        inputPorts.removeIf(filter::testInputPort);
-        outputPorts.removeIf(filter::testOutputPort);
-        remoteInputPorts.removeIf(filter::testRemoteInputPort);
-        remoteOutputPorts.removeIf(filter::testRemoteOutputPort);
-        processors.removeIf(filter::testProcessor);
-        controllerServices.removeIf(filter::testControllerService);
-        reportingTasks.removeIf(filter::testReportingTask);
-        flowAnalysisRules.removeIf(filter::testFlowAnalysisRule);
-        flowRegistryClients.removeIf(filter::testFlowRegistryClient);
-        statelessProcessGroups.removeIf(filter::testStatelessGroup);
+    /**
+     * Returns an AffectedComponentSet that contains all components that are 
in this but not in the given AffectedComponentSet
+     * @param other the AffectedComponentSet to subtract from this
+     * @return the AffectedComponentSet representing the difference
+     */
+    public AffectedComponentSet minus(final AffectedComponentSet other) {
+        final AffectedComponentSet result = new 
AffectedComponentSet(flowController);
+        inputPorts.stream().filter(port -> 
!other.inputPorts.contains(port)).forEach(result::addInputPort);
+        outputPorts.stream().filter(port -> 
!other.outputPorts.contains(port)).forEach(result::addOutputPort);
+        remoteInputPorts.stream().filter(port -> 
!other.remoteInputPorts.contains(port)).forEach(result::addRemoteInputPort);
+        remoteOutputPorts.stream().filter(port -> 
!other.remoteOutputPorts.contains(port)).forEach(result::addRemoteOutputPort);
+        processors.stream().filter(processor -> 
!other.processors.contains(processor)).forEach(result::addProcessor);
+        controllerServices.stream().filter(controllerService -> 
!other.controllerServices.contains(controllerService)).forEach(result::addControllerService);
+        reportingTasks.stream().filter(task -> 
!other.reportingTasks.contains(task)).forEach(result::addReportingTask);
+        flowAnalysisRules.stream().filter(rule -> 
!other.flowAnalysisRules.contains(rule)).forEach(result::addFlowAnalysisRule);
+        flowRegistryClients.stream().filter(client -> 
!other.flowRegistryClients.contains(client)).forEach(result::addFlowRegistryClient);
+        statelessProcessGroups.stream().filter(group -> 
!other.statelessProcessGroups.contains(group)).forEach(result::addStatelessGroup);
+
+        return result;
+    }
+
+    /**
+     * Returns a boolean indicating whether or not this AffectedComponentSet 
is empty
+     * @return <code>true</code> if the AffectedComponentSet is empty, 
<code>false</code> otherwise
+     */
+    public boolean isEmpty() {
+        return inputPorts.isEmpty() && outputPorts.isEmpty() && 
remoteInputPorts.isEmpty() && remoteOutputPorts.isEmpty()
+            && processors.isEmpty() && controllerServices.isEmpty() && 
reportingTasks.isEmpty() && flowAnalysisRules.isEmpty()
+            && flowRegistryClients.isEmpty() && 
statelessProcessGroups.isEmpty();
     }
 
     /**
@@ -644,7 +684,7 @@ public class AffectedComponentSet {
      * that one or more components referred to by the AffectedComponentSet no 
longer exist (for example, there was a dataflow update that removed a 
Processor, so that Processor no longer exists).
      *
      * @return an AffectedComponentSet that represents all components within 
this AffectedComponentSet that currently exist within the NiFi instance. The 
components contained by the returned
-     * AffectedComponentSetwill always be a subset or equal to the set of 
components contained by this.
+     * AffectedComponentSet will always be a subset or equal to the set of 
components contained by this.
      */
     public AffectedComponentSet toExistingSet() {
         final ControllerServiceProvider serviceProvider = 
flowController.getControllerServiceProvider();
@@ -688,18 +728,12 @@ public class AffectedComponentSet {
     }
 
     private boolean isStartable(final ComponentNode componentNode) {
-        if (componentNode == null) {
-            return false;
-        }
-
-        if (componentNode instanceof ProcessorNode) {
-            return ((ProcessorNode) componentNode).getScheduledState() != 
ScheduledState.DISABLED;
-        }
-        if (componentNode instanceof ReportingTaskNode) {
-            return ((ReportingTaskNode) componentNode).getScheduledState() != 
ScheduledState.DISABLED;
-        }
-
-        return true;
+        return switch (componentNode) {
+            case null -> false;
+            case ProcessorNode processorNode -> 
processorNode.getScheduledState() != ScheduledState.DISABLED;
+            case ReportingTaskNode reportingTaskNode -> 
reportingTaskNode.getScheduledState() != ScheduledState.DISABLED;
+            default -> true;
+        };
     }
 
     private boolean isStartable(final ProcessGroup group) {
@@ -726,7 +760,7 @@ public class AffectedComponentSet {
     }
 
     public void stop() {
-        logger.info("Stopping the following components: {}", this);
+        logger.info("Stopping the following {} components: {}", 
getComponentCount(), this);
         final long start = System.currentTimeMillis();
 
         inputPorts.forEach(port -> port.getProcessGroup().stopInputPort(port));
@@ -736,7 +770,7 @@ public class AffectedComponentSet {
         processors.forEach(processor -> 
processor.getProcessGroup().stopProcessor(processor));
         reportingTasks.forEach(flowController::stopReportingTask);
         flowAnalysisRules.forEach(flowController::disableFlowAnalysisRule);
-        statelessProcessGroups.forEach(group -> group.stopProcessing());
+        statelessProcessGroups.forEach(ProcessGroup::stopProcessing);
 
         waitForConnectablesStopped();
 
@@ -817,6 +851,7 @@ public class AffectedComponentSet {
         return true;
     }
 
+
     @Override
     public String toString() {
         return "AffectedComponentSet[" +
@@ -826,7 +861,7 @@ public class AffectedComponentSet {
             ", remoteOutputPorts=" + remoteOutputPorts +
             ", processors=" + processors +
             ", parameterProviders=" + parameterProviders +
-                ", flowRegistryClients=" + flowRegistryClients +
+            ", flowRegistryClients=" + flowRegistryClients +
             ", controllerServices=" + controllerServices +
             ", reportingTasks=" + reportingTasks +
             ", flowAnalysisRules=" + flowAnalysisRules +
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
index 027ad0b19d..3dab40ea51 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ComponentSetFilter.java
@@ -48,7 +48,7 @@ public interface ComponentSetFilter {
     boolean testStatelessGroup(ProcessGroup group);
 
 
-    default ComponentSetFilter reverse() {
+    default ComponentSetFilter invert() {
         final ComponentSetFilter original = this;
 
         return new ComponentSetFilter() {
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 76b71636b2..678dec7ea3 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -220,16 +220,8 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
 
             synchronizeFlow(controller, existingDataFlow, proposedFlow, 
affectedComponents);
         } finally {
-            // We have to call toExistingSet() here because some of the 
components that existed in the active set may no longer exist,
-            // so attempting to start them will fail.
-
             if (!existingFlowEmpty) {
-                final AffectedComponentSet startable = 
activeSet.toExistingSet().toStartableSet();
-
-                final ComponentSetFilter runningComponentFilter = new 
RunningComponentSetFilter(proposedFlow.getVersionedDataflow());
-                final ComponentSetFilter stoppedComponentFilter = 
runningComponentFilter.reverse();
-                startable.removeComponents(stoppedComponentFilter);
-                startable.start();
+                restart(activeSet, proposedFlow.getVersionedDataflow());
             }
         }
 
@@ -237,6 +229,30 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
         logger.info("Successfully synchronized dataflow with the proposed flow 
in {} millis", millis);
     }
 
+    private void restart(final AffectedComponentSet activeSet, final 
VersionedDataflow versionedDataflow) {
+        final AffectedComponentSet existing = activeSet.toExistingSet();
+        final AffectedComponentSet noLongerExisting = 
activeSet.minus(existing);
+        if (!noLongerExisting.isEmpty()) {
+            logger.info("After synchronizing flow, the followinging components 
will not be restarted because they no longer exist: {}", noLongerExisting);
+        }
+
+        final AffectedComponentSet startable = existing.toStartableSet();
+        final AffectedComponentSet notStartable = existing.minus(startable);
+        if (!notStartable.isEmpty()) {
+            logger.info("After synchronizing flow, the following components 
will not be restarted because they are not in a startable state: {}", 
notStartable);
+        }
+
+        final ComponentSetFilter runningComponentFilter = new 
RunningComponentSetFilter(versionedDataflow);
+        final ComponentSetFilter stoppedComponentFilter = 
runningComponentFilter.invert();
+        final AffectedComponentSet stoppedComponents = 
startable.removeComponents(stoppedComponentFilter);
+        if (!stoppedComponents.isEmpty()) {
+            logger.info("After synchronizing flow, the following components 
will not be restarted because the proposed flow indicates that they are not 
running: {}", stoppedComponents);
+        }
+
+        logger.info("After synchronizing flow, restarting {} components", 
startable.getComponentCount());
+        startable.start();
+    }
+
     private void verifyNoConnectionsWithDataRemoved(final DataFlow 
existingFlow, final DataFlow proposedFlow, final FlowController controller, 
final FlowComparison flowComparison) {
         logger.debug("Checking that no connections were removed that have 
data");
         final FlowInheritabilityCheck processGroupInheritableCheck = new 
ConnectionMissingCheck(flowComparison);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/TestAffectedComponentSet.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/TestAffectedComponentSet.java
new file mode 100644
index 0000000000..69db20bd76
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/TestAffectedComponentSet.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+public class TestAffectedComponentSet {
+
+    private static final FlowController controller = 
Mockito.mock(FlowController.class);
+    private static final FlowManager flowManager = 
Mockito.mock(FlowManager.class);
+    static {
+        when(controller.getFlowManager()).thenReturn(flowManager);
+    }
+
+    @Test
+    public void testMinus() {
+        final AffectedComponentSet setA = new AffectedComponentSet(controller);
+        final AffectedComponentSet setB = new AffectedComponentSet(controller);
+
+        for (int i = 0; i < 10; i++) {
+            final ProcessorNode processorNode = 
Mockito.mock(ProcessorNode.class);
+            setA.addProcessor(processorNode);
+
+            if (i < 8) {
+                setB.addProcessor(processorNode);
+            }
+        }
+
+        final AffectedComponentSet difference = setA.minus(setB);
+        assertEquals(10, setA.getComponentCount());
+        assertEquals(8, setB.getComponentCount());
+        assertEquals(2, difference.getComponentCount());
+    }
+
+    @Test
+    public void testRemoveComponents() {
+        final AffectedComponentSet setA = new AffectedComponentSet(controller);
+        final List<ProcessorNode> toRemove = new ArrayList<>();
+
+        for (int i = 0; i < 10; i++) {
+            final ProcessorNode processorNode = 
Mockito.mock(ProcessorNode.class);
+            setA.addProcessor(processorNode);
+
+            if (i < 8) {
+                toRemove.add(processorNode);
+            }
+        }
+
+        final ComponentSetFilter componentSetFilter = 
Mockito.mock(ComponentSetFilter.class);
+        doAnswer(invocation -> {
+            final ProcessorNode toTest = invocation.getArgument(0, 
ProcessorNode.class);
+            return toRemove.contains(toTest);
+        }).when(componentSetFilter).testProcessor(any(ProcessorNode.class));
+
+        final AffectedComponentSet removed = 
setA.removeComponents(componentSetFilter);
+        assertEquals(2, setA.getComponentCount());
+        assertEquals(8, removed.getComponentCount());
+    }
+}

Reply via email to